Skip to content

Commit

Permalink
Added sending of updates for importer as lambda runs. Generic framewo…
Browse files Browse the repository at this point in the history
…rk which should also work with quants. Needs new data-formats
  • Loading branch information
Peter Nemere committed Dec 4, 2023
1 parent a00c2ce commit 0533b06
Show file tree
Hide file tree
Showing 16 changed files with 1,242 additions and 791 deletions.
12 changes: 12 additions & 0 deletions api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type APIConfig struct {
// Local file caching (from S3 to where API is running)
MaxFileCacheAgeSec uint
MaxFileCacheSizeBytes uint

ImportJobMaxTimeSec uint32
PIQUANTJobMaxTimeSec uint32
}

func homeDir() string {
Expand Down Expand Up @@ -189,6 +192,15 @@ func Init() (APIConfig, error) {
if nodeCountOverride != nil && *nodeCountOverride > 0 {
cfg.NodeCountOverride = int32(*nodeCountOverride)
}

if cfg.ImportJobMaxTimeSec <= 0 {
cfg.ImportJobMaxTimeSec = uint32(10 * 60)
}

if cfg.PIQUANTJobMaxTimeSec <= 0 {
cfg.PIQUANTJobMaxTimeSec = uint32(15 * 60)
}

cfg.KubeConfig = *kubeconfig

return cfg, nil
Expand Down
10 changes: 5 additions & 5 deletions api/dataimport/for-trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func Example_ImportForTrigger_OCS_DatasetEdit() {

trigger := `{
"datasetID": "048300551",
"logID": "dataimport-unittest123"
"jobID": "dataimport-unittest123"
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)
Expand Down Expand Up @@ -392,7 +392,7 @@ func Example_ImportForTrigger_Manual_JPL() {

trigger := `{
"datasetID": "test1234",
"logID": "dataimport-unittest123"
"jobID": "dataimport-unittest123"
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)
Expand Down Expand Up @@ -424,7 +424,7 @@ func Example_ImportForTrigger_Manual_SBU() {

trigger := `{
"datasetID": "test1234sbu",
"logID": "dataimport-unittest123sbu"
"jobID": "dataimport-unittest123sbu"
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)
Expand Down Expand Up @@ -456,7 +456,7 @@ func Test_ImportForTrigger_Manual_SBU_NoAutoShare(t *testing.T) {

trigger := `{
"datasetID": "test1234sbu",
"logID": "dataimport-unittest123sbu"
"jobID": "dataimport-unittest123sbu"
}`

_, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)
Expand All @@ -473,7 +473,7 @@ func Example_ImportForTrigger_Manual_EM() {

trigger := `{
"datasetID": "048300551",
"logID": "dataimport-unittest048300551"
"jobID": "dataimport-unittest048300551"
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)
Expand Down
38 changes: 14 additions & 24 deletions api/dataimport/reprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,22 @@ import (
"github.com/pixlise/core/v3/api/dataimport/internal/datasetArchive"
"github.com/pixlise/core/v3/core/awsutil"
"github.com/pixlise/core/v3/core/errorwithstatus"
"github.com/pixlise/core/v3/core/idgen"
"github.com/pixlise/core/v3/core/utils"
)

// One of the 2 SNS messages we accept. The other is an AWS S3 event message
type datasetReprocessSNSRequest struct {
DatasetID string `json:"datasetID"`
LogID string `json:"logID"`
JobID string `json:"jobID"`
}

// Decoding trigger message
// Returns: sourceBucket (optional), sourceFilePath (optional), datasetID, logID
func decodeImportTrigger(triggerMessageBody []byte) (string, string, string, string, error) {
datasetID := ""

// Log ID to use - this forms part of the log stream in cloudwatch
logID := ""
// job ID to use - we save DB updates about our status using this id
jobID := ""

// But if we're being triggered due to new data arriving, these will be filled out
sourceFilePath := ""
Expand All @@ -68,10 +67,10 @@ func decodeImportTrigger(triggerMessageBody []byte) (string, string, string, str
return "", "", "", "", fmt.Errorf("Failed to find dataset ID in reprocess trigger")
}

if len(triggerSNS.LogID) > 0 {
logID = triggerSNS.LogID
if len(triggerSNS.JobID) > 0 {
jobID = triggerSNS.JobID
} else {
return "", "", "", "", fmt.Errorf("Failed to find log ID in reprocess trigger")
return "", "", "", "", fmt.Errorf("Failed to find job ID in reprocess trigger")
}
} else {
// Maybe it's a packaged S3 object inside an SNS message
Expand All @@ -97,28 +96,22 @@ func decodeImportTrigger(triggerMessageBody []byte) (string, string, string, str
}

// So this is basically a new dataset download, generate a fresh log ID
logID = fmt.Sprintf("auto-import-%v (%v)", time.Now().Format("02-Jan-2006 15-04-05"), utils.RandStringBytesMaskImpr(8))
jobID = fmt.Sprintf("auto-import-%v (%v)", time.Now().Format("02-Jan-2006 15-04-05"), utils.RandStringBytesMaskImpr(8))
}

return sourceBucket, sourceFilePath, datasetID, logID, nil
return sourceBucket, sourceFilePath, datasetID, jobID, nil
}

// Firing a trigger message. Anything calling this is triggering a dataset reimport via a lambda function
func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, idGen idgen.IDGenerator, datasetID string, snsTopic string) (*sns.PublishOutput, string, error) {
// Generate a new log ID that this reprocess job will write to
// which we also return to the caller, so they can track what happens
// with this async task

reprocessId := fmt.Sprintf("dataimport-%s", idGen.GenObjectID())

func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, jobId string, scanId string, snsTopic string) (*sns.PublishOutput, error) {
snsReq := datasetReprocessSNSRequest{
DatasetID: datasetID,
LogID: reprocessId,
DatasetID: scanId,
JobID: jobId,
}

snsReqJSON, err := json.Marshal(snsReq)
if err != nil {
return nil, "", errorwithstatus.MakeStatusError(http.StatusInternalServerError, fmt.Errorf("Failed to trigger dataset reprocess: %v", err))
return nil, errorwithstatus.MakeStatusError(http.StatusInternalServerError, fmt.Errorf("Failed to trigger dataset reprocess: %v", err))
}

result, err := snsSvc.Publish(&sns.PublishInput{
Expand All @@ -127,11 +120,8 @@ func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, idGen idgen.IDGe
})

if err != nil {
return nil, "", errorwithstatus.MakeStatusError(http.StatusInternalServerError, fmt.Errorf("Failed to publish SNS topic for dataset regeneration: %v", err))
return nil, errorwithstatus.MakeStatusError(http.StatusInternalServerError, fmt.Errorf("Failed to publish SNS topic for dataset regeneration: %v", err))
}

// The actual log stream name that gets generated is prefixed with the dataset ID
// so return that one here, any users of our function should only need to be given
// the stream name, log group is more fixed.
return result, datasetID + "-" + reprocessId, nil
return result, nil
}
48 changes: 24 additions & 24 deletions api/dataimport/reprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ import (
func Example_decodeImportTrigger_Manual() {
trigger := `{
"datasetID": "189137412",
"logID": "dataimport-zmzddoytch2krd7n"
"jobID": "dataimport-zmzddoytch2krd7n"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
sourceBucket, sourceFilePath, datasetID, jobId, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobId, err)

// Output:
// Source Bucket: ""
// Source file: ""
// Dataset: "189137412"
// Log: "dataimport-zmzddoytch2krd7n"
// Job: "dataimport-zmzddoytch2krd7n"
// Err: "<nil>"
}

Expand Down Expand Up @@ -134,16 +134,16 @@ func Example_decodeImportTrigger_OCS2() {
]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))

// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(jobID), err)

// Output:
// Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
// Source file: "197329413-25-09-2022-14-33-39.zip"
// Dataset: "197329413"
// Log Str Len: "43"
// Job Str Len: "43"
// Err: "<nil>"
}

Expand All @@ -152,31 +152,31 @@ func Example_decodeImportTrigger_ManualBadMsg() {
"weird": "message"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)

// Output:
// Source Bucket: ""
// Source file: ""
// Dataset: ""
// Log: ""
// Job: ""
// Err: "Unexpected or no message type embedded in triggering SNS message"
}

func Example_decodeImportTrigger_ManualBadDatasetID() {
trigger := `{
"datasetID": "",
"logID": "dataimport-zmzddoytch2krd7n"
"jobID": "dataimport-zmzddoytch2krd7n"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)

// Output:
// Source Bucket: ""
// Source file: ""
// Dataset: ""
// Log: ""
// Job: ""
// Err: "Failed to find dataset ID in reprocess trigger"
}

Expand All @@ -185,29 +185,29 @@ func Example_decodeImportTrigger_ManualBadLogID() {
"datasetID": "qwerty"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)

// Output:
// Source Bucket: ""
// Source file: ""
// Dataset: ""
// Log: ""
// Err: "Failed to find log ID in reprocess trigger"
// Job: ""
// Err: "Failed to find job ID in reprocess trigger"
}

func Example_decodeImportTrigger_OCS_Error() {
trigger := `{
"Records": []
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)

// Output:
// Source Bucket: ""
// Source file: ""
// Dataset: ""
// Log: ""
// Job: ""
// Err: "Unexpected or no message type embedded in triggering SNS message"
}

Expand All @@ -234,13 +234,13 @@ func Example_decodeImportTrigger_OCS_BadEventType() {
]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)

// Output:
// Source Bucket: ""
// Source file: ""
// Dataset: ""
// Log: ""
// Job: ""
// Err: "Failed to decode dataset import trigger: Failed to decode sqs body to an S3 event: unexpected end of JSON input"
}
1 change: 1 addition & 0 deletions api/dbCollections/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const ExpressionGroupsName = "expressionGroups"
const ExpressionsName = "expressions"
const ImagesName = "images"
const ImageBeamLocationsName = "imageBeamLocations"
const JobStatusName = "jobStatuses"
const ModulesName = "modules"
const ModuleVersionsName = "moduleVersions"
const NotificationsName = "notifications"
Expand Down
Loading

0 comments on commit 0533b06

Please sign in to comment.