diff --git a/sda/cmd/syncapi/syncapi.go b/sda/cmd/syncapi/syncapi.go index 27b8a6858..7d5355a10 100644 --- a/sda/cmd/syncapi/syncapi.go +++ b/sda/cmd/syncapi/syncapi.go @@ -13,7 +13,6 @@ import ( "syscall" "time" - "github.com/google/uuid" "github.com/gorilla/mux" "github.com/neicnordic/sensitive-data-archive/internal/broker" "github.com/neicnordic/sensitive-data-archive/internal/config" @@ -25,18 +24,6 @@ import ( var Conf *config.Config var err error -type syncDataset struct { - DatasetID string `json:"dataset_id"` - DatasetFiles []datasetFiles `json:"dataset_files"` - User string `json:"user"` -} - -type datasetFiles struct { - FilePath string `json:"filepath"` - FileID string `json:"file_id"` - ShaSum string `json:"sha256"` -} - func main() { Conf, err = config.NewConfig("sync-api") if err != nil { @@ -138,74 +125,28 @@ func dataset(w http.ResponseWriter, r *http.Request) { } defer r.Body.Close() - if err := schema.ValidateJSON(fmt.Sprintf("%s/../bigpicture/file-sync.json", Conf.Broker.SchemasPath), b); err != nil { - respondWithError(w, http.StatusBadRequest, fmt.Sprintf("eror on JSON validation: %s", err.Error())) - - return + var d struct { + Type string `json:"type"` + DatasetID string `json:"dataset_id"` + AccessionIDs []string `json:"accession_ids,omitempty"` } - - if err := parseDatasetMessage(b); err != nil { - w.WriteHeader(http.StatusInternalServerError) + _ = json.Unmarshal(b, &d) + + var action string + switch d.Type { + case "mapping": + action = "mapping" + case "release": + action = "release" } - w.WriteHeader(http.StatusOK) -} - -// parseDatasetMessage parses the JSON blob and sends the relevant messages -func parseDatasetMessage(msg []byte) error { - log.Debugf("incoming blob %s", msg) - blob := syncDataset{} - _ = json.Unmarshal(msg, &blob) - - var accessionIDs []string - for _, files := range blob.DatasetFiles { - ingest := schema.IngestionTrigger{ - Type: "ingest", - User: blob.User, - FilePath: files.FilePath, - } - ingestMsg, err := json.Marshal(ingest) - if err != nil { - return fmt.Errorf("failed to marshal json messge: Reason %v", err) - } - corrID := uuid.New().String() - if err := Conf.API.MQ.SendMessage(corrID, Conf.Broker.Exchange, Conf.SyncAPI.IngestRouting, ingestMsg); err != nil { - return fmt.Errorf("failed to send ingest messge: Reason %v", err) - } - - accessionIDs = append(accessionIDs, files.FileID) - finalize := schema.IngestionAccession{ - Type: "accession", - User: blob.User, - FilePath: files.FilePath, - AccessionID: files.FileID, - DecryptedChecksums: []schema.Checksums{{Type: "sha256", Value: files.ShaSum}}, - } - finalizeMsg, err := json.Marshal(finalize) - if err != nil { - return fmt.Errorf("failed to marshal json messge: Reason %v", err) - } - - if err := Conf.API.MQ.SendMessage(corrID, Conf.Broker.Exchange, Conf.SyncAPI.AccessionRouting, finalizeMsg); err != nil { - return fmt.Errorf("failed to send mapping messge: Reason %v", err) - } - } - - mappings := schema.DatasetMapping{ - Type: "mapping", - DatasetID: blob.DatasetID, - AccessionIDs: accessionIDs, - } - mappingMsg, err := json.Marshal(mappings) - if err != nil { - return fmt.Errorf("failed to marshal json messge: Reason %v", err) - } + if err := schema.ValidateJSON(fmt.Sprintf("%s/dataset-%s.json", Conf.Broker.SchemasPath, action), b); err != nil { + respondWithError(w, http.StatusBadRequest, fmt.Sprintf("eror on JSON validation: %s", err.Error())) - if err := Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, Conf.SyncAPI.MappingRouting, mappingMsg); err != nil { - return fmt.Errorf("failed to send mapping messge: Reason %v", err) + return } - return nil + w.WriteHeader(http.StatusOK) } func respondWithError(w http.ResponseWriter, code int, message string) { diff --git a/sda/cmd/syncapi/syncapi_test.go b/sda/cmd/syncapi/syncapi_test.go index 8c26b1992..c358acb2c 100644 --- a/sda/cmd/syncapi/syncapi_test.go +++ b/sda/cmd/syncapi/syncapi_test.go @@ -206,14 +206,14 @@ func (suite *SyncAPITest) TestDatasetRoute() { Conf.API.MQ, err = broker.NewMQ(Conf.Broker) assert.NoError(suite.T(), err) - Conf.Broker.SchemasPath = "../../schemas/isolated/" + Conf.Broker.SchemasPath = "../../schemas/bigpicture/" r := mux.NewRouter() r.HandleFunc("/dataset", dataset) ts := httptest.NewServer(r) defer ts.Close() - goodJSON := []byte(`{"user": "test.user@example.com", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6", "dataset_files": [{"filepath": "inbox/user/file-1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`) + goodJSON := []byte(`{"type": "mapping", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6", "accession_ids": ["5fe7b660-afea-4c3a-88a9-3daabf055ebb", "ed6af454-d910-49e3-8cda-488a6f246e76"]}`) good, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(goodJSON)) assert.NoError(suite.T(), err) assert.Equal(suite.T(), http.StatusOK, good.StatusCode) @@ -224,6 +224,12 @@ func (suite *SyncAPITest) TestDatasetRoute() { assert.NoError(suite.T(), err) assert.Equal(suite.T(), http.StatusBadRequest, bad.StatusCode) defer bad.Body.Close() + + goodJSON2 := []byte(`{"type": "release", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6"}`) + good2, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(goodJSON2)) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), http.StatusOK, good2.StatusCode) + defer good2.Body.Close() } func (suite *SyncAPITest) TestMetadataRoute() {