From 5bc6388346ecc491a609bc42e42a42faa17a4e76 Mon Sep 17 00:00:00 2001 From: eV <8796196+evq@users.noreply.github.com> Date: Fri, 13 Oct 2023 20:49:56 +0000 Subject: [PATCH] cli fixes (#2138) --- tools/payments/client.go | 9 +++-- tools/payments/cmd/prepare/main.go | 53 ++++++++++++++++-------------- tools/payments/report.go | 7 +++- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/tools/payments/client.go b/tools/payments/client.go index 38aa534d5..79d94363a 100644 --- a/tools/payments/client.go +++ b/tools/payments/client.go @@ -46,7 +46,7 @@ type SettlementClient interface { ConfigureWorker(context.Context, string, *payments.WorkerConfig) error PrepareTransactions(context.Context, httpsignature.ParameterizedSignator, ...payments.PrepareRequest) error SubmitTransactions(context.Context, httpsignature.ParameterizedSignator, ...payments.SubmitRequest) error - WaitForResponses(ctx context.Context, payoutID string, numTransactions int) error + WaitForResponses(ctx context.Context, payoutID string, numTransactions int, cg string) error GetStatus(ctx context.Context, payoutID string) (*PayoutReportStatus, error) } @@ -253,15 +253,14 @@ func (rc *redisClient) HandlePrepareResponse(ctx context.Context, stream, id str return nil } -func (rc *redisClient) WaitForResponses(ctx context.Context, payoutID string, numTransactions int) error { +func (rc *redisClient) WaitForResponses(ctx context.Context, payoutID string, numTransactions int, cg string) error { logger, err := appctx.GetLogger(ctx) if err != nil { return err } stream := payments.PreparePrefix + payoutID + payments.ResponseSuffix - // FIXME use public key as consumer group - consumerGroup := stream + "-cli" + consumerGroup := stream + "-" + cg consumerID := "0" consumerCtx, cancelFunc := context.WithCancel(ctx) @@ -281,10 +280,10 @@ wait: if err != nil { logger.Error().Err(err).Msg("failed to get unacknowledged count") } + logger.Info().Int64("lag", lag).Int64("pending", pending).Msg("waiting for responses to be processed") if lag+pending == 0 { break wait } - logger.Info().Int64("lag", lag).Int64("pending", pending).Msg("waiting for responses to be processed") } time.Sleep(10 * time.Second) diff --git a/tools/payments/cmd/prepare/main.go b/tools/payments/cmd/prepare/main.go index 20a420d26..69046d33b 100644 --- a/tools/payments/cmd/prepare/main.go +++ b/tools/payments/cmd/prepare/main.go @@ -69,6 +69,10 @@ func main() { "p", "", "payout id") + cg := flag.String( + "cg", "cli", + "consumer group suffix") + resubmit := flag.Bool( "resubmit", false, "resubmit to prepare stream") @@ -126,37 +130,36 @@ func main() { } totalTransactions += len(report) - if !firstRun && !*resubmit { - return - } + if firstRun || *resubmit { + priv, err := paymentscli.GetOperatorPrivateKey(*key) + if err != nil { + log.Fatalf("failed to parse operator key file: %v\n", err) + } - priv, err := paymentscli.GetOperatorPrivateKey(*key) - if err != nil { - log.Fatalf("failed to parse operator key file: %v\n", err) - } + if err := report.Prepare(ctx, priv, client); err != nil { + log.Fatalf("failed to read report from stdin: %v\n", err) + } - if err := report.Prepare(ctx, priv, client); err != nil { - log.Fatalf("failed to read report from stdin: %v\n", err) - } + wc := &payments.WorkerConfig{ + PayoutID: *payoutID, + ConsumerGroup: payments.PreparePrefix + *payoutID + "-cg", + Stream: payments.PreparePrefix + *payoutID, + Count: len(report), + } - wc := &payments.WorkerConfig{ - PayoutID: *payoutID, - ConsumerGroup: payments.PreparePrefix + *payoutID + "-cg", - Stream: payments.PreparePrefix + *payoutID, - Count: len(report), - } + err = client.ConfigureWorker(ctx, payments.PrepareConfigStream, wc) + if err != nil { + log.Fatalf("failed to write to prepare config stream: %v\n", err) + } + if *verbose { + log.Printf("prepare transactions loaded for %+v\n", payoutID) + } - err = client.ConfigureWorker(ctx, payments.PrepareConfigStream, wc) - if err != nil { - log.Fatalf("failed to write to prepare config stream: %v\n", err) - } - if *verbose { - log.Printf("prepare transactions loaded for %+v\n", payoutID) + os.Create(responseFile) } - os.Create(responseFile) - - err = client.WaitForResponses(ctx, *payoutID, totalTransactions) + // FIXME default to public key as consumer group? + err = client.WaitForResponses(ctx, *payoutID, totalTransactions, *cg) if err != nil { log.Fatalf("failed to wait for prepare responses: %v\n", err) } diff --git a/tools/payments/report.go b/tools/payments/report.go index fdcab2995..d476a892f 100644 --- a/tools/payments/report.go +++ b/tools/payments/report.go @@ -112,16 +112,21 @@ func ReadReport(report any, reader io.Reader) error { // ReadReportFromResponses reads a report from the reader func ReadReportFromResponses(report *AttestedReport, reader io.Reader) error { scanner := bufio.NewScanner(reader) + tmp := make(map[string]payments.PrepareResponse) for scanner.Scan() { var resp payments.PrepareResponse if err := json.Unmarshal(scanner.Bytes(), &resp); err != nil { return err } - *report = append(*report, resp) + // dedupe by idempotency key + tmp[resp.IdempotencyKey().String()] = resp } if err := scanner.Err(); err != nil { return err } + for _, resp := range tmp { + *report = append(*report, resp) + } return nil }