Skip to content

Commit

Permalink
cli fixes (#2138)
Browse files Browse the repository at this point in the history
  • Loading branch information
evq authored Oct 13, 2023
1 parent b1b1257 commit 5bc6388
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 31 deletions.
9 changes: 4 additions & 5 deletions tools/payments/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type SettlementClient interface {
ConfigureWorker(context.Context, string, *payments.WorkerConfig) error
PrepareTransactions(context.Context, httpsignature.ParameterizedSignator, ...payments.PrepareRequest) error
SubmitTransactions(context.Context, httpsignature.ParameterizedSignator, ...payments.SubmitRequest) error
WaitForResponses(ctx context.Context, payoutID string, numTransactions int) error
WaitForResponses(ctx context.Context, payoutID string, numTransactions int, cg string) error
GetStatus(ctx context.Context, payoutID string) (*PayoutReportStatus, error)
}

Expand Down Expand Up @@ -253,15 +253,14 @@ func (rc *redisClient) HandlePrepareResponse(ctx context.Context, stream, id str
return nil
}

func (rc *redisClient) WaitForResponses(ctx context.Context, payoutID string, numTransactions int) error {
func (rc *redisClient) WaitForResponses(ctx context.Context, payoutID string, numTransactions int, cg string) error {
logger, err := appctx.GetLogger(ctx)
if err != nil {
return err
}

stream := payments.PreparePrefix + payoutID + payments.ResponseSuffix
// FIXME use public key as consumer group
consumerGroup := stream + "-cli"
consumerGroup := stream + "-" + cg
consumerID := "0"

consumerCtx, cancelFunc := context.WithCancel(ctx)
Expand All @@ -281,10 +280,10 @@ wait:
if err != nil {
logger.Error().Err(err).Msg("failed to get unacknowledged count")
}
logger.Info().Int64("lag", lag).Int64("pending", pending).Msg("waiting for responses to be processed")
if lag+pending == 0 {
break wait
}
logger.Info().Int64("lag", lag).Int64("pending", pending).Msg("waiting for responses to be processed")

}
time.Sleep(10 * time.Second)
Expand Down
53 changes: 28 additions & 25 deletions tools/payments/cmd/prepare/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func main() {
"p", "",
"payout id")

cg := flag.String(
"cg", "cli",
"consumer group suffix")

resubmit := flag.Bool(
"resubmit", false,
"resubmit to prepare stream")
Expand Down Expand Up @@ -126,37 +130,36 @@ func main() {
}

totalTransactions += len(report)
if !firstRun && !*resubmit {
return
}
if firstRun || *resubmit {
priv, err := paymentscli.GetOperatorPrivateKey(*key)
if err != nil {
log.Fatalf("failed to parse operator key file: %v\n", err)
}

priv, err := paymentscli.GetOperatorPrivateKey(*key)
if err != nil {
log.Fatalf("failed to parse operator key file: %v\n", err)
}
if err := report.Prepare(ctx, priv, client); err != nil {
log.Fatalf("failed to read report from stdin: %v\n", err)
}

if err := report.Prepare(ctx, priv, client); err != nil {
log.Fatalf("failed to read report from stdin: %v\n", err)
}
wc := &payments.WorkerConfig{
PayoutID: *payoutID,
ConsumerGroup: payments.PreparePrefix + *payoutID + "-cg",
Stream: payments.PreparePrefix + *payoutID,
Count: len(report),
}

wc := &payments.WorkerConfig{
PayoutID: *payoutID,
ConsumerGroup: payments.PreparePrefix + *payoutID + "-cg",
Stream: payments.PreparePrefix + *payoutID,
Count: len(report),
}
err = client.ConfigureWorker(ctx, payments.PrepareConfigStream, wc)
if err != nil {
log.Fatalf("failed to write to prepare config stream: %v\n", err)
}
if *verbose {
log.Printf("prepare transactions loaded for %+v\n", payoutID)
}

err = client.ConfigureWorker(ctx, payments.PrepareConfigStream, wc)
if err != nil {
log.Fatalf("failed to write to prepare config stream: %v\n", err)
}
if *verbose {
log.Printf("prepare transactions loaded for %+v\n", payoutID)
os.Create(responseFile)
}

os.Create(responseFile)

err = client.WaitForResponses(ctx, *payoutID, totalTransactions)
// FIXME default to public key as consumer group?
err = client.WaitForResponses(ctx, *payoutID, totalTransactions, *cg)
if err != nil {
log.Fatalf("failed to wait for prepare responses: %v\n", err)
}
Expand Down
7 changes: 6 additions & 1 deletion tools/payments/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,21 @@ func ReadReport(report any, reader io.Reader) error {
// ReadReportFromResponses reads a report from the reader
func ReadReportFromResponses(report *AttestedReport, reader io.Reader) error {
scanner := bufio.NewScanner(reader)
tmp := make(map[string]payments.PrepareResponse)
for scanner.Scan() {
var resp payments.PrepareResponse
if err := json.Unmarshal(scanner.Bytes(), &resp); err != nil {
return err
}
*report = append(*report, resp)
// dedupe by idempotency key
tmp[resp.IdempotencyKey().String()] = resp
}
if err := scanner.Err(); err != nil {
return err
}
for _, resp := range tmp {
*report = append(*report, resp)
}
return nil
}

Expand Down

0 comments on commit 5bc6388

Please sign in to comment.