Skip to content

Commit

Permalink
[Ingest] fileID => correlationID after file has been registered
Browse files Browse the repository at this point in the history
This is specifically for files not uploaded with the s3inbox
  • Loading branch information
jbygdell committed Aug 8, 2024
1 parent e3cf8e3 commit 159cb43
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions sda/cmd/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func main() {
continue
}

if err = db.UpdateFileEventLog(fileID, "submitted", delivered.CorrelationId, message.User, "{}", string(delivered.Body)); err != nil {
if err = db.UpdateFileEventLog(fileID, "submitted", fileID, message.User, "{}", string(delivered.Body)); err != nil {
log.Errorf("failed to set ingestion status for file from message: %v", delivered.CorrelationId)
}

Expand Down Expand Up @@ -366,7 +366,7 @@ func main() {
header, err := tryDecrypt(key, readBuffer)
if err != nil {
log.Errorf("Trying to decrypt start of file failed, reason: (%s)", err.Error())
if err := db.UpdateFileEventLog(fileID, "error", delivered.CorrelationId, "ingest", fmt.Sprintf("{\"error\" : \"%s\"}", err.Error()), string(delivered.Body)); err != nil {
if err := db.UpdateFileEventLog(fileID, "error", fileID, "ingest", fmt.Sprintf("{\"error\" : \"%s\"}", err.Error()), string(delivered.Body)); err != nil {
log.Errorf("failed to set ingestion status for file from message: %v", delivered.CorrelationId)
}

Expand All @@ -381,7 +381,7 @@ func main() {
OriginalMessage: message,
}
body, _ := json.Marshal(fileError)
if err := mq.SendMessage(delivered.CorrelationId, conf.Broker.Exchange, "error", body); err != nil {
if err := mq.SendMessage(fileID, conf.Broker.Exchange, "error", body); err != nil {
log.Errorf("failed to publish message, reason: (%s)", err.Error())
}

Expand Down Expand Up @@ -460,28 +460,28 @@ func main() {
log.Debugf("Wrote archived file (corr-id: %s, user: %s, filepath: %s, archivepath: %s, archivedsize: %d)",
delivered.CorrelationId, message.User, message.FilePath, fileID, fileInfo.Size)

status, err = db.GetFileStatus(delivered.CorrelationId)
status, err = db.GetFileStatus(fileID)
if err != nil {
log.Errorf("failed to get file status, reason: (%s)", err.Error())
if err = delivered.Nack(false, true); err != nil {
log.Errorf("Failed to Nack message, reason: (%s)", err.Error())
}
}
if status == "disabled" {
log.Infof("file with correlation ID: %s is disabled, stopping ingestion", delivered.CorrelationId)
log.Infof("file with correlation ID: %s is disabled, stopping ingestion", fileID)
if err := delivered.Ack(false); err != nil {
log.Errorf("Failed acking canceled work, reason: (%s)", err.Error())
}

continue
}

if err := db.SetArchived(fileInfo, fileID, delivered.CorrelationId); err != nil {
if err := db.SetArchived(fileInfo, fileID, fileID); err != nil {
log.Errorf("SetArchived failed, reason: (%s)", err.Error())
}

log.Debugf("File marked as archived (corr-id: %s, user: %s, filepath: %s, archivepath: %s)",
delivered.CorrelationId, message.User, message.FilePath, fileID)
fileID, message.User, message.FilePath, fileID)

// Send message to archived
msg := schema.IngestionVerification{
Expand All @@ -502,7 +502,7 @@ func main() {
continue
}

if err := mq.SendMessage(delivered.CorrelationId, conf.Broker.Exchange, conf.Broker.RoutingKey, archivedMsg); err != nil {
if err := mq.SendMessage(fileID, conf.Broker.Exchange, conf.Broker.RoutingKey, archivedMsg); err != nil {
// TODO fix resend mechanism
log.Errorf("failed to publish message, reason: (%s)", err.Error())

Expand Down

0 comments on commit 159cb43

Please sign in to comment.