Skip to content

Commit

Permalink
[sync-api] rewotk handling of dataset messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jbygdell committed Aug 1, 2024
1 parent 33fa35e commit edc3b62
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 77 deletions.
91 changes: 16 additions & 75 deletions sda/cmd/syncapi/syncapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"syscall"
"time"

"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/neicnordic/sensitive-data-archive/internal/broker"
"github.com/neicnordic/sensitive-data-archive/internal/config"
Expand All @@ -25,18 +24,6 @@ import (
var Conf *config.Config
var err error

type syncDataset struct {
DatasetID string `json:"dataset_id"`
DatasetFiles []datasetFiles `json:"dataset_files"`
User string `json:"user"`
}

type datasetFiles struct {
FilePath string `json:"filepath"`
FileID string `json:"file_id"`
ShaSum string `json:"sha256"`
}

func main() {
Conf, err = config.NewConfig("sync-api")
if err != nil {
Expand Down Expand Up @@ -138,74 +125,28 @@ func dataset(w http.ResponseWriter, r *http.Request) {
}
defer r.Body.Close()

if err := schema.ValidateJSON(fmt.Sprintf("%s/../bigpicture/file-sync.json", Conf.Broker.SchemasPath), b); err != nil {
respondWithError(w, http.StatusBadRequest, fmt.Sprintf("eror on JSON validation: %s", err.Error()))

return
var d struct {
Type string `json:"type"`
DatasetID string `json:"dataset_id"`
AccessionIDs []string `json:"accession_ids,omitempty"`
}

if err := parseDatasetMessage(b); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_ = json.Unmarshal(b, &d)

var action string
switch d.Type {
case "mapping":
action = "mapping"
case "release":
action = "release"
}

w.WriteHeader(http.StatusOK)
}

// parseDatasetMessage parses the JSON blob and sends the relevant messages
func parseDatasetMessage(msg []byte) error {
log.Debugf("incoming blob %s", msg)
blob := syncDataset{}
_ = json.Unmarshal(msg, &blob)

var accessionIDs []string
for _, files := range blob.DatasetFiles {
ingest := schema.IngestionTrigger{
Type: "ingest",
User: blob.User,
FilePath: files.FilePath,
}
ingestMsg, err := json.Marshal(ingest)
if err != nil {
return fmt.Errorf("failed to marshal json messge: Reason %v", err)
}
corrID := uuid.New().String()
if err := Conf.API.MQ.SendMessage(corrID, Conf.Broker.Exchange, Conf.SyncAPI.IngestRouting, ingestMsg); err != nil {
return fmt.Errorf("failed to send ingest messge: Reason %v", err)
}

accessionIDs = append(accessionIDs, files.FileID)
finalize := schema.IngestionAccession{
Type: "accession",
User: blob.User,
FilePath: files.FilePath,
AccessionID: files.FileID,
DecryptedChecksums: []schema.Checksums{{Type: "sha256", Value: files.ShaSum}},
}
finalizeMsg, err := json.Marshal(finalize)
if err != nil {
return fmt.Errorf("failed to marshal json messge: Reason %v", err)
}

if err := Conf.API.MQ.SendMessage(corrID, Conf.Broker.Exchange, Conf.SyncAPI.AccessionRouting, finalizeMsg); err != nil {
return fmt.Errorf("failed to send mapping messge: Reason %v", err)
}
}

mappings := schema.DatasetMapping{
Type: "mapping",
DatasetID: blob.DatasetID,
AccessionIDs: accessionIDs,
}
mappingMsg, err := json.Marshal(mappings)
if err != nil {
return fmt.Errorf("failed to marshal json messge: Reason %v", err)
}
if err := schema.ValidateJSON(fmt.Sprintf("%s/dataset-%s.json", Conf.Broker.SchemasPath, action), b); err != nil {
respondWithError(w, http.StatusBadRequest, fmt.Sprintf("eror on JSON validation: %s", err.Error()))

if err := Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, Conf.SyncAPI.MappingRouting, mappingMsg); err != nil {
return fmt.Errorf("failed to send mapping messge: Reason %v", err)
return
}

return nil
w.WriteHeader(http.StatusOK)
}

func respondWithError(w http.ResponseWriter, code int, message string) {
Expand Down
10 changes: 8 additions & 2 deletions sda/cmd/syncapi/syncapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,14 @@ func (suite *SyncAPITest) TestDatasetRoute() {
Conf.API.MQ, err = broker.NewMQ(Conf.Broker)
assert.NoError(suite.T(), err)

Conf.Broker.SchemasPath = "../../schemas/isolated/"
Conf.Broker.SchemasPath = "../../schemas/bigpicture/"

r := mux.NewRouter()
r.HandleFunc("/dataset", dataset)
ts := httptest.NewServer(r)
defer ts.Close()

goodJSON := []byte(`{"user": "[email protected]", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6", "dataset_files": [{"filepath": "inbox/user/file-1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`)
goodJSON := []byte(`{"type": "mapping", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6", "accession_ids": ["5fe7b660-afea-4c3a-88a9-3daabf055ebb", "ed6af454-d910-49e3-8cda-488a6f246e76"]}`)
good, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(goodJSON))
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), http.StatusOK, good.StatusCode)
Expand All @@ -224,6 +224,12 @@ func (suite *SyncAPITest) TestDatasetRoute() {
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), http.StatusBadRequest, bad.StatusCode)
defer bad.Body.Close()

goodJSON2 := []byte(`{"type": "release", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6"}`)
good2, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(goodJSON2))
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), http.StatusOK, good2.StatusCode)
defer good2.Body.Close()
}

func (suite *SyncAPITest) TestMetadataRoute() {
Expand Down

0 comments on commit edc3b62

Please sign in to comment.