Skip to content

Commit

Permalink
Introduced thawing objects
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewpatto committed Nov 30, 2023
1 parent a7f4af0 commit 35af4c2
Show file tree
Hide file tree
Showing 34 changed files with 1,900 additions and 207 deletions.
2 changes: 1 addition & 1 deletion dev/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async function makeTestObject(

async function createTestData() {
const sourceObjects = {
[`${testFolderSrc}/1.bin`]: StorageClass.STANDARD,
[`${testFolderSrc}/1.bin`]: StorageClass.DEEP_ARCHIVE,
[`${testFolderSrc}/2.bin`]: StorageClass.STANDARD,
[`${testFolderSrc}/3.bin`]: StorageClass.GLACIER,
};
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,24 @@ import (
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
)

// NOTE: we use a prefix of RB (rclone-batch) just so we don't accidentally clash with a real
// env variable that has meaning to rclone (for example)

const rcloneBinaryEnvName = "RB_RCLONE_BINARY"
const destinationEnvName = "RB_DESTINATION"

// our parent ECS task (when a SPOT instance) can be sent a TERM signal - we then have a hard
// limit of 120 seconds before the process is hard killed
// this value here is the seconds to wait after receiving the TERM in the hope that our
// jobs might finish
const postTermCleanupSeconds = 90

/**
* A ternaryish operator
*/
Expand Down Expand Up @@ -72,21 +81,23 @@ func main() {
// a task token that ECS/Steps can pass us so we can return data
taskToken, taskTokenOk := os.LookupEnv("RB_TASK_TOKEN")

// now that we know whether we want to use the task token - we will definitely need AWS config to work
// - so no need starting copying if we will fail at the end
cfg, cfgErr := config.LoadDefaultConfig(context.TODO())
// now that we know whether we want to use the task token - we will definitely need AWS config to work
// - so no need starting copying if we will fail at the end
cfg, err := config.LoadDefaultConfig(context.TODO())

if taskTokenOk {
if cfgErr != nil {
log.Fatalf("Unable to load AWS config, %v", cfgErr)
}
}
if taskTokenOk {
if err != nil {
log.Fatalf("Unable to load AWS config, %v", err)
}
}

// special environment variables that we can use for some debug/testing
debugBandwidth, debugBandwidthOk := os.LookupEnv("RB_DEBUG_BANDWIDTH")
debugSignalWait, debugSignalWaitOk := os.LookupEnv("RB_DEBUG_SIGNAL_WAIT")

// we end up with a result array entry for each object we have been asked to copy
results := make([]any, len(os.Args)-1)
var resultErrorCount int64 = 0

signalChannel := make(chan os.Signal)

Expand Down Expand Up @@ -123,52 +134,136 @@ func main() {
sig := <-signalChannel
switch sig {
case syscall.SIGTERM:
// terminate the currently running rclone
cmd.Process.Signal(syscall.SIGTERM)
// indicate we don't want future rclones to run
interrupted = true

// we do however have a 120 second (hard) window in which we might want
// to let the current rclone finish
// so lets sleep for a bit before we self-terminate
// (we have a little debug settable value here to make our tests run quicker)
if debugSignalWaitOk {
i, err := strconv.Atoi(debugSignalWait)
if err == nil {
time.Sleep(time.Duration(i) * time.Second)
}
} else {
time.Sleep(postTermCleanupSeconds * time.Second)
}

// terminate the currently running rclone
// NOTE we ignore the error here - if the process has already gone away then the
// signal possibly fails (by which point we should be exiting the process anyhow)
cmd.Process.Signal(syscall.SIGTERM)
}
}()

err := cmd.Run()
runErr := cmd.Run()

if err != nil {
log.Printf("rclone Run() failed with %v", err)
results[which] = map[string]any{
"lastError": "Interrupted by SIGTERM",
"systemError": fmt.Sprintf("%v", err),
"source": source}
if runErr != nil {
log.Printf("rclone Run() failed with %v", runErr)
} else {
log.Printf("rclone Run() succeeded")
}

foundStats := false

// no matter what the exit code/status of the run is - we are going to (safely!) trawl
// through the stderr
// each line of stderr output is stats in JSON format or possibly other random messages
stderrStringLines := strings.Split(strings.TrimSuffix(stderrStringBuilder.String(), "\n"), "\n")

// attempt to process each line of log output to stderr as JSON (if not then ignore)
// attempt to process each line of log output to stderr as JSON (if not then log it ourselves)
for _, line := range stderrStringLines {
var logLineJson map[string]any

logLineJsonErr := json.Unmarshal([]byte(line), &logLineJson)
decoder := json.NewDecoder(strings.NewReader(line))
decoder.UseNumber()

if logLineJsonErr == nil {
decoderErr := decoder.Decode(&logLineJson)

if decoderErr == nil {
statsValue, statsOk := logLineJson["stats"].(map[string]any)

if statsOk {
// insert information about the file we were copying
statsValue["source"] = source
results[which] = statsValue
// an rclone stats block will definitely have a "errors" count
// so we test for this and then use it
errorsValue, errorsOk := statsValue["errors"].(json.Number)

if errorsOk {
errorsIntValue, errorsIntOk := errorsValue.Int64()

if errorsIntOk == nil {
resultErrorCount += errorsIntValue

// insert information about the file we were copying into the rclone stats block
statsValue["source"] = source

// record the stats block
results[which] = statsValue

foundStats = true
}
}
}
} else {
// we couldn't parse the line as JSON so it is probably a stderr msg from rclone
log.Printf("rclone stderr -> %s", line)
}
}

// if`no valid stats block was output by rclone we need to make our own "compatible" one
if !foundStats {
// if we get a well structured runtime error result we can work out some
// specific error messages

// keep in mind we *only* get here if rclone itself didn't provide JSON stats
// (which is itself a bug - as rclone does provide stats on every copy)
if runErr != nil {
if runExitErr, runExitOk := runErr.(*exec.ExitError); runExitOk {
// https://rclone.org/docs/#list-of-exit-codes
switch runExitErr.ExitCode() {
case 143:
results[which] = map[string]any{
"errors": 1,
"lastError": "Interrupted by SIGTERM",
"source": source}
resultErrorCount++
default:
results[which] = map[string]any{
"errors": 1,
"lastError": fmt.Sprintf("Exit of rclone with code %v but no JSON statistics block generated", runExitErr.ExitCode()),
"systemError": fmt.Sprintf("%#v", runExitErr),
"source": source}
resultErrorCount++
}
}
}
}

} else {
// if we have previously received a SIGTERM - then for the rest we have been asked to copy we just need to skip
// create a fake "compatible" stats block
results[which] = map[string]any{
"lastError": "Skipped due to SIGTERM received",
"errors": 1,
"lastError": "Skipped due to previous SIGTERM received",
"source": source}
resultErrorCount++
}

// if we have fallen through all the way to here without any details then we put in
// something generic - but we want to make sure every copy operation has a "result" block
if results[which] == nil {
results[which] = map[string]any{
"errors": 1,
"lastError": "Exit of rclone but no JSON statistics block generated or reason detected",
"source": source}
resultErrorCount++
}
}

// we have now attempted to copy every file and generated a stats dictionary in results[]

// we need to report this back as JSON though
resultsJson, err := json.MarshalIndent(results, "", " ")

if err != nil {
Expand All @@ -186,12 +281,24 @@ func main() {
// The JSON output of the task. Length constraints apply to the payload size, and are expressed as bytes in UTF-8 encoding.
// Type: String
// Length Constraints: Maximum length of 262144.
sfnSvc.SendTaskSuccess(context.TODO(), &sfn.SendTaskSuccessInput{
Output: aws.String(resultsString),
TaskToken: aws.String(taskToken),
})

// if we got any errors - we want to signal that up to the steps
//if resultErrorCount > 0 {
// sfnSvc.SendTaskFailure(context.TODO(), &sfn.SendTaskFailureInput{
// Output: aws.String(resultsString),
// TaskToken: aws.String(taskToken),
// })
//} else {
sfnSvc.SendTaskSuccess(context.TODO(), &sfn.SendTaskSuccessInput{
Output: aws.String(resultsString),
TaskToken: aws.String(taskToken),
})
//}

} else {
// if no task token was given then we just print the results
fmt.Println(resultsString)
}

os.Exit(int(resultErrorCount))
}
Loading

0 comments on commit 35af4c2

Please sign in to comment.