From b91deb2838fb128f2e7bb297d94625a0570692c0 Mon Sep 17 00:00:00 2001 From: Ruslan Rotaru Date: Wed, 27 Sep 2023 09:31:58 +0100 Subject: [PATCH] added async cals using worker queue --- internal/sync/main.go | 121 ++++++++++++++++++++++++---------------- internal/utils/utils.go | 2 +- 2 files changed, 75 insertions(+), 48 deletions(-) diff --git a/internal/sync/main.go b/internal/sync/main.go index 545146d..51cbd01 100644 --- a/internal/sync/main.go +++ b/internal/sync/main.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "sync" "time" "github.com/graphprotocol/ipfs-mgm/internal/utils" @@ -21,6 +22,8 @@ var SyncCmd = &cobra.Command{ }, } +var workerItemCount int = 50 + func init() { SyncCmd.Flags().StringP("source", "s", "", "IPFS source endpoint") SyncCmd.MarkFlagRequired("source") @@ -95,64 +98,88 @@ func Sync(cmd *cobra.Command) { counter := 1 length := len(cids) - for _, k := range cids { - // Get IPFS CID from source - srcCID := srcGet + k.Cid - log.Printf("%d/%d: Syncing the CID: %s\n",counter, length, k.Cid) - // Get CID from source - resG, err := utils.GetIPFS(srcCID, nil) - if err != nil { - log.Printf("%d/%d: %s",counter, length, err) - failed += 1 - counter += 1 - continue - } - defer resG.Body.Close() + // Adjust for the number of CID's + if length < workerItemCount { + workerItemCount = length + } - cidV := utils.GetCIDVersion(k.Cid) - // Create the API URL fo the POST on destination - apiADD := fmt.Sprintf("%s%s?cid-version=%s", dst, utils.IPFS_PIN_ENDPOINT, cidV) + for i := 0; i < length; { + // Create a channel with buffer of workerItemCount size + workChan := make(chan utils.HTTPResult, workerItemCount) + var wg sync.WaitGroup - newBody, err := utils.GetHTTPBody(resG) - if err != nil { - log.Printf("%d/%d: %s",counter, length, err) - } + for j := 0; j < workerItemCount; j++ { + wg.Add(1) + go func(c int, cidID string) { + defer wg.Done() + AsyncPostIPFS(srcGet, dst, cidID, &c, length, &failed, &synced) - // Sync IPFS CID into destination - // TODO: implement retry backoff with pester - var m utils.IPFSResponse - resP, err := utils.PostIPFS(apiADD, newBody) - if err != nil { - log.Printf("%d/%d: %s", counter, length, err) - failed += 1 - } else { - defer resP.Body.Close() + }(counter, cids[i].Cid) + counter += 1 - // Generic function to parse the response and create a struct - err := utils.UnmarshalToStruct[utils.IPFSResponse](resP.Body, &m) - if err != nil { - log.Printf("%d/%d: %s", counter, length, err) - } + i++ } - // Check if the IPFS Hash is the same as the source one - // If not the syncing didn't work - ok, err := utils.TestIPFSHash(k.Cid, m.Hash) - if err != nil { - log.Printf("%d/%d: %s",counter, length, err) - failed += 1 - } else { - // Print success message - log.Printf("%d/%d: %s",counter, length, ok) - synced += 1 - } - counter += 1 + close(workChan) + wg.Wait() } - // Print Final statistics log.Printf("Total number of objects: %d; Synced: %d; Failed: %d\n", len(cids), synced, failed) log.Printf("Total time: %s\n", time.Since(timeStart)) } +func AsyncPostIPFS(src string, dst string, cidID string, counter *int, length int, failed *int, synced *int) { + // Get IPFS CID from source + srcCID := src + cidID + log.Printf("%d/%d: Syncing the CID: %s\n", *counter, length, cidID) + + // Get CID from source + resG, err := utils.GetIPFS(srcCID, nil) + if err != nil { + log.Printf("%d/%d: %s; CID: %s", *counter, length, err, cidID) + *failed += 1 + *counter += 1 + return + } + defer resG.Body.Close() + + cidV := utils.GetCIDVersion(cidID) + // Create the API URL fo the POST on destination + apiADD := fmt.Sprintf("%s%s?cid-version=%s", dst, utils.IPFS_PIN_ENDPOINT, cidV) + + newBody, err := utils.GetHTTPBody(resG) + if err != nil { + log.Printf("%d/%d: %s", *counter, length, err) + } + + // Sync IPFS CID into destination + // TODO: implement retry backoff with pester + var m utils.IPFSResponse + resP, err := utils.PostIPFS(apiADD, newBody) + if err != nil { + log.Printf("%d/%d: %s", *counter, length, err) + *failed += 1 + } else { + defer resP.Body.Close() + + // Generic function to parse the response and create a struct + err := utils.UnmarshalToStruct[utils.IPFSResponse](resP.Body, &m) + if err != nil { + log.Printf("%d/%d: %s", *counter, length, err) + } + } + + // Check if the IPFS Hash is the same as the source one + // If not the syncing didn't work + ok, err := utils.TestIPFSHash(cidID, m.Hash) + if err != nil { + log.Printf("%d/%d: %s", *counter, length, err) + *failed += 1 + } else { + // Print success message + log.Printf("%d/%d: %s", *counter, length, ok) + *synced += 1 + } +} diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 4cf04e1..fe4c60f 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -142,7 +142,7 @@ func TestIPFSHash(s string, d string) (string, error) { return "", fmt.Errorf("The source Hash %s is different from the destination hash %s", s, d) } - return fmt.Sprint("Successfully synced to destination IPFS"), nil + return fmt.Sprintf("Successfully synced to destination IPFS, CID: %s", s), nil } func SliceToCIDSStruct(s []string) ([]IPFSCIDResponse, error) {