Skip to content

Commit

Permalink
Add batch upload capability to bcda_fetch
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 463180777
  • Loading branch information
suyashkumar authored and copybara-github committed Jul 25, 2022
1 parent 03a40c6 commit 5cb168e
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 8 deletions.
4 changes: 4 additions & 0 deletions cmd/bcda_fetch/bcda_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var (
fhirStoreGCPDatasetID = flag.String("fhir_store_gcp_dataset_id", "", "The dataset ID for the FHIR Store.")
fhirStoreID = flag.String("fhir_store_id", "", "The FHIR Store ID.")
fhirStoreUploadErrorFileDir = flag.String("fhir_store_upload_error_file_dir", "", "An optional path to a directory where an upload errors file should be written. This file will contain the FHIR NDJSON and error information of FHIR resources that fail to upload to FHIR store.")
fhirStoreEnableBatchUpload = flag.Bool("fhir_store_enable_batch_upload", false, "If true, uploads FHIR resources to FHIR Store in batch bundles.")
fhirStoreBatchUploadSize = flag.Int("fhir_store_batch_upload_size", 0, "If set, this is the batch size used to upload FHIR batch bundles to FHIR store. If this flag is not set and fhir_store_enable_batch_upload is true, a default batch size is used.")
serverURL = flag.String("bcda_server_url", "https://sandbox.bcda.cms.gov", "The BCDA server to communicate with. By deafult this is https://sandbox.bcda.cms.gov")
since = flag.String("since", "", "The optional timestamp after which data should be fetched for. If not specified, fetches all available data. This should be a FHIR instant in the form of YYYY-MM-DDThh:mm:ss.sss+zz:zz.")
sinceFile = flag.String("since_file", "", "Optional. If specified, the fetch program will read the latest since timestamp in this file to use when fetching data from BCDA. DO NOT run simultaneous fetch programs with the same since file. Once the fetch is completed successfully, fetch will write the BCDA transaction timestamp for this fetch operation to the end of the file specified here, to be used in the subsequent run (to only fetch new data since the last successful run). The first time fetch is run with this flag set, it will fetch all data.")
Expand Down Expand Up @@ -270,6 +272,8 @@ func rectifyAndWrite(r io.Reader, filePrefix string, cfg mainWrapperConfig) {
MaxWorkers: *maxFHIRStoreUploadWorkers,
ErrorCounter: fhirStoreUploadErrorCounter,
ErrorFileOutputPath: *fhirStoreUploadErrorFileDir,
BatchUpload: *fhirStoreEnableBatchUpload,
BatchSize: *fhirStoreBatchUploadSize,
})

if err != nil {
Expand Down
154 changes: 150 additions & 4 deletions cmd/bcda_fetch/bcda_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func TestMainWrapper(t *testing.T) {
sinceFileExpectedContent []byte
// fhirStoreFailures causes the test fhir store server to return errors if
// set to true.
fhirStoreFailures bool
noFailOnUploadErrors bool
bcdaJobID string
fhirStoreFailures bool
noFailOnUploadErrors bool
bcdaJobID string
fhirStoreEnableBatchUpload bool
// unsetOutputPrefix sets the outputPrefix to empty string if true.
unsetOutputPrefix bool
// disableFHIRStoreUploadChecks will disable the portion of the test that
Expand Down Expand Up @@ -350,6 +351,21 @@ func TestMainWrapper(t *testing.T) {
bcdaJobID: "999",
unsetOutputPrefix: true,
},
// Batch upload tests cases
{
name: "BatchUploadWithBCDAV1",
enableFHIRStore: true,
fhirStoreEnableBatchUpload: true,
apiVersion: bcda.V1,
rectify: true,
},
{
name: "BatchUploadWithBCDAV2",
enableFHIRStore: true,
fhirStoreEnableBatchUpload: true,
apiVersion: bcda.V2,
rectify: true,
},
// TODO(b/226375559): see if we can generate some of the test cases above
// instead of having to spell them out explicitly.
// TODO(b/213365276): test that bcda V1 with rectify = true results in an
Expand Down Expand Up @@ -472,6 +488,10 @@ func TestMainWrapper(t *testing.T) {
flag.Set("fhir_store_id", gcpFHIRStoreID)
flag.Set("bcda_job_id", tc.bcdaJobID)

if tc.fhirStoreEnableBatchUpload {
flag.Set("fhir_store_enable_batch_upload", "true")
}

if tc.enableFHIRStore {
flag.Set("enable_fhir_store", "true")
}
Expand Down Expand Up @@ -544,9 +564,16 @@ func TestMainWrapper(t *testing.T) {
if tc.fhirStoreFailures {
fhirStoreEndpoint = serverAlwaysFails(t)
} else {
if !tc.disableFHIRStoreUploadChecks {
if !tc.disableFHIRStoreUploadChecks && !tc.fhirStoreEnableBatchUpload {
fhirStoreEndpoint = testhelpers.FHIRStoreServer(t, fhirStoreTests, gcpProject, gcpLocation, gcpDatasetID, gcpFHIRStoreID)
}
if !tc.disableFHIRStoreUploadChecks && tc.fhirStoreEnableBatchUpload {
expectedResources := [][]byte{file1Data, file2DataRectified, file3Data}
// Note that differing batch upload sizes are tested more
// extensively in the TestMainWrapper_BatchUploadSize test below.
expectedBatchSize := 1 // this is because we have one uploader per data file, and per data file there's only one resource.
fhirStoreEndpoint = testhelpers.FHIRStoreServerBatch(t, expectedResources, expectedBatchSize, gcpProject, gcpLocation, gcpDatasetID, gcpFHIRStoreID)
}
}
}

Expand Down Expand Up @@ -966,6 +993,125 @@ func TestMainWrapper_GetDataRetry(t *testing.T) {
}
}

func TestMainWrapper_BatchUploadSize(t *testing.T) {
// This test more comprehensively checks setting different batch sizes in
// bcda_fetch.
cases := []struct {
name string
apiVersion bcda.Version
batchUploadSize int
}{
{
name: "BCDAV1WithBatchSize2",
apiVersion: bcda.V1,
batchUploadSize: 2,
},
{
name: "BCDAV2WithBatchSize2",
apiVersion: bcda.V2,
batchUploadSize: 2,
},
{
name: "BCDAV1WithBatchSize3",
apiVersion: bcda.V1,
batchUploadSize: 3,
},
{
name: "BCDAV2WithBatchSize3",
apiVersion: bcda.V2,
batchUploadSize: 3,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// File 1 contains 3 Patient resources.
patient1 := `{"resourceType":"Patient","id":"PatientID1"}`
patient2 := `{"resourceType":"Patient","id":"PatientID2"}`
patient3 := `{"resourceType":"Patient","id":"PatientID3"}`
file1Data := []byte(patient1 + "\n" + patient2 + "\n" + patient3)
exportEndpoint := "/api/v1/Group/all/$export"
jobsEndpoint := "/api/v1/jobs/1234"
if tc.apiVersion == bcda.V2 {
exportEndpoint = "/api/v2/Group/all/$export"
jobsEndpoint = "/api/v2/jobs/1234"
}
serverTransactionTime := "2020-12-09T11:00:00.123+00:00"

// Setup BCDA test servers:

// A seperate resource server is needed during testing, so that we can send
// the jobsEndpoint response in the bcdaServer that includes a URL for the
// bcdaResourceServer in it.
bcdaResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(file1Data)
}))
defer bcdaResourceServer.Close()

bcdaServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/auth/token":
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"access_token": "token"}`))
case exportEndpoint:
// Check that since is empty
if got := len(req.URL.Query()["_since"]); got != 0 {
t.Errorf("got unexpected _since URL param length. got: %v, want: %v", got, 0)
w.WriteHeader(http.StatusBadRequest)
return
}
w.Header()["Content-Location"] = []string{"some/info/1234"}
w.WriteHeader(http.StatusAccepted)
case jobsEndpoint:
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf("{\"output\": [{\"type\": \"Patient\", \"url\": \"%s/data/10.ndjson\"}], \"transactionTime\": \"%s\"}", bcdaResourceServer.URL, serverTransactionTime)))
default:
w.WriteHeader(http.StatusBadRequest)
}
}))
defer bcdaServer.Close()

// Set minimal flags for this test case:
outputPrefix := t.TempDir()
defer SaveFlags().Restore()
flag.Set("client_id", "id")
flag.Set("client_secret", "secret")
flag.Set("output_prefix", outputPrefix)
flag.Set("bcda_server_url", bcdaServer.URL)
flag.Set("rectify", "true")

flag.Set("max_fhir_store_upload_workers", "1")
flag.Set("fhir_store_enable_batch_upload", "true")
flag.Set("fhir_store_batch_upload_size", fmt.Sprintf("%d", tc.batchUploadSize))

gcpProject := "project"
gcpLocation := "location"
gcpDatasetID := "dataset"
gcpFHIRStoreID := "fhirID"
flag.Set("fhir_store_gcp_project", gcpProject)
flag.Set("fhir_store_gcp_location", gcpLocation)
flag.Set("fhir_store_gcp_dataset_id", gcpDatasetID)
flag.Set("fhir_store_id", gcpFHIRStoreID)
flag.Set("enable_fhir_store", "true")

if tc.apiVersion == bcda.V2 {
flag.Set("use_v2", "true")
}

expectedResources := [][]byte{[]byte(patient1), []byte(patient2), []byte(patient3)}
fhirStoreEndpoint := testhelpers.FHIRStoreServerBatch(t, expectedResources, tc.batchUploadSize, gcpProject, gcpLocation, gcpDatasetID, gcpFHIRStoreID)

// Run mainWrapper:
cfg := mainWrapperConfig{fhirStoreEndpoint: fhirStoreEndpoint}
if err := mainWrapper(cfg); err != nil {
t.Errorf("mainWrapper(%v) error: %v", cfg, err)
}

})
}
}

// serverAlwaysFails returns a server that always fails with a 500 error code.
func serverAlwaysFails(t *testing.T) string {
t.Helper()
Expand Down
8 changes: 4 additions & 4 deletions internal/testhelpers/fhirstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func FHIRStoreServerBatch(t *testing.T, expectedFHIRResources [][]byte, expected
}
for _, gotEntry := range gotBundle.Entry {
gotResource := []byte(gotEntry.Resource)
expectedResourceIdx, ok := getIndexOf(gotResource, expectedFHIRResources)
expectedResourceIdx, ok := getIndexOf(t, gotResource, expectedFHIRResources)
if !ok {
t.Errorf("server received unexpected FHIR resource")
t.Errorf("server received unexpected FHIR resource: %s", gotResource)
}

// Update the corresponding index in expectedResourceWasUploaded slice.
Expand Down Expand Up @@ -199,9 +199,9 @@ func validateURLAndMatchResource(callURL string, expectedResources []FHIRStoreTe
return nil, 0
}

func getIndexOf(fhirResource []byte, fhirResources [][]byte) (int, bool) {
func getIndexOf(t *testing.T, fhirResource []byte, fhirResources [][]byte) (int, bool) {
for idx, r := range fhirResources {
if cmp.Equal(fhirResource, r) {
if cmp.Equal(NormalizeJSON(t, fhirResource), NormalizeJSON(t, r)) {
return idx, true
}
}
Expand Down

0 comments on commit 5cb168e

Please sign in to comment.