From 67db896b6fe628184a3124f4ffa00e9968072f4b Mon Sep 17 00:00:00 2001 From: Ruslan Rotaru Date: Mon, 24 Jun 2024 17:25:47 +0100 Subject: [PATCH] added cooldown a d batches features updated README --- README.md | 8 +++++++ internal/sync/main.go | 51 ++++++++++++++++++++++++++++++++--------- internal/utils/utils.go | 6 ++++- 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index c8e2691..495160a 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,14 @@ ipfs-mgm sync --help ipfs-mgm sync -s -d ``` +#### Transfer all files using batches and cooldown + +```bash +ipfs-mgm sync -s -d -f cids -c 5 -b 100 +``` +*INFO: `-b 100` will transfer the files in batches of 100 in paralel and wait 5 seconds (`-c 5`) in between the batches to avoid overloading the IPFS nodes* + + #### Transfer only specific files from one IPFS node to another: ```bash diff --git a/internal/sync/main.go b/internal/sync/main.go index 44fca33..9cf9701 100644 --- a/internal/sync/main.go +++ b/internal/sync/main.go @@ -23,14 +23,14 @@ var SyncCmd = &cobra.Command{ }, } -var workerItemCount int = 50 - func init() { SyncCmd.Flags().StringP("source", "s", "", "IPFS source endpoint") SyncCmd.MarkFlagRequired("source") SyncCmd.Flags().StringP("destination", "d", "", "IPFS destination endpoint") SyncCmd.MarkFlagRequired("destination") SyncCmd.Flags().StringP("from-file", "f", "", "Sync CID's from file") + SyncCmd.Flags().IntP("batch", "b", 100, "Batch files to sync in paralel") + SyncCmd.Flags().IntP("cooldown", "c", 0, "Cooldown in seconds between the batches, by default not used. Only used with baches options, ignored otherwise") } func Sync(cmd *cobra.Command) { @@ -40,7 +40,13 @@ func Sync(cmd *cobra.Command) { var cids []utils.IPFSCIDResponse - // Get all command flags + // check if syncing only the CIDS specified in the file + fromFile, err := cmd.Flags().GetString("from-file") + if err != nil { + fmt.Println(err) + } + + // get source to sync from src, err := cmd.Flags().GetString("source") if err != nil { log.Println(err) @@ -51,21 +57,36 @@ func Sync(cmd *cobra.Command) { log.Println(err) } - fromFile, err := cmd.Flags().GetString("from-file") + cooldown, err := cmd.Flags().GetInt("cooldown") + if err != nil { + log.Println(err) + } + + if cooldown < 0 { + fmt.Printf("The specified cooldown is not valid, it must be greater or equal to 0. Specified %d", cooldown) + os.Exit(1) + } + + batch, err := cmd.Flags().GetInt("batch") if err != nil { fmt.Println(err) } + if batch <= 0 { + fmt.Printf("The specified batch is not valid, it must be greater than 0. Specified %d", batch) + cooldown = 0 + os.Exit(1) + } // Will use the file only if specified if len(fromFile) > 0 { - log.Printf("Syncing from %s to %s using the file <%s> as input\n", src, dst, fromFile) + log.Printf("Syncing from <%s> to <%s> using as input the file <%s>\n", src, dst, fromFile) c, err := utils.ReadCIDFromFile(fromFile) if err != nil { fmt.Println(err) os.Exit(1) } - // Create our structure with the CIDS's + // Create our structure with the CID's cids, err = utils.SliceToCIDSStruct(c) if err != nil { fmt.Println(err) @@ -97,29 +118,37 @@ func Sync(cmd *cobra.Command) { } counter := 1 + length := len(cids) + log.Printf("There are %d CIDs to be synced", length) // Adjust for the number of CID's - if length < workerItemCount { - workerItemCount = length + if batch > length { + batch = length + log.Printf("Using %d batch calls as there are %d CIDs to sync\n", batch, length) + } else { + log.Printf("Using %d batch calls\n", batch) } for i := 0; i < length; { // Create a channel with buffer of workerItemCount size - workChan := make(chan utils.HTTPResult, workerItemCount) + workChan := make(chan utils.HTTPResult, batch) var wg sync.WaitGroup - for j := 0; j < workerItemCount; j++ { + for j := 0; j < batch; j++ { wg.Add(1) go func(c int, cidID string) { defer wg.Done() AsyncCall(src, dst, cidID, &c, length, &failed, &synced) - }(counter, cids[i].Cid) counter += 1 i++ } + if cooldown > 0 { + time.Sleep(time.Duration(cooldown) * time.Second) + } + close(workChan) wg.Wait() } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 90f1706..6126e89 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -120,6 +120,7 @@ func PostCID(dst string, payload []byte, fPath string) (*http.Response, error) { // Set custom User-Agent for cloudflare WAF policies req.Header.Set("User-Agent", "graphprotocol/ipfs-mgm") req.Header.Set("Content-Type", writer.FormDataContentType()) + // req.Header.Set("Content-Type", "multipart/form-data") // Set Directory Headers if len(fPath) != 0 { @@ -232,7 +233,10 @@ func ReadCIDFromFile(f string) ([]string, error) { var s []string scanner := bufio.NewScanner(file) for scanner.Scan() { - s = append(s, scanner.Text()) + txt := scanner.Text() + if len(txt) > 0 { + s = append(s, scanner.Text()) + } } return s, nil