From f10d7f96de61be141d613c323599c87a81ef15d0 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 21 Mar 2022 23:04:44 -0400 Subject: [PATCH 1/4] Communicate Errors and Add RedemptionStatus Append unmarshaling errors to the result set Instead of using hard coded constants, create an enum to contain various redemption statuses. This change does not remap the existing values and so does not require an ads-serve change. --- kafka/signed_token_redeem_handler.go | 207 ++++++++++++++++++++------- 1 file changed, 153 insertions(+), 54 deletions(-) diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index a3e32049..5885ab6d 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -15,6 +15,37 @@ import ( "github.com/segmentio/kafka-go" ) +/* + RedemptionStatus is an enum that represents the redemption status of a given token. + These values are part of a contract with downstream consumers and the value assignment + for a given variant must never change without a change to ads-serve to accomodate. +*/ +type RedemptionStatus int64 + +const ( + Verified RedemptionStatus = iota + Duplicate + Unverified + Error + Unknown +) + +func (r RedemptionStatus) String() string { + switch r { + case Verified: + return "verified" + case Duplicate: + return "duplicate" + case Unverified: + return "unverified" + case Error: + return "error" + case Unknown: + return "unknown" + } + return "undefined" +} + /* BlindedTokenRedeemHandler emits payment tokens that correspond to the signed confirmation tokens provided. @@ -25,19 +56,19 @@ func SignedTokenRedeemHandler( server *cbpServer.Server, logger *zerolog.Logger, ) error { - const ( - OK = 0 - DUPLICATE_REDEMPTION = 1 - UNVERIFIED = 2 - ERROR = 3 - ) tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed Avro deserialization: %e", tokenRedeemRequestSet.Request_id, err)) + return errors.New(fmt.Sprintf( + "Request %s: Failed Avro deserialization: %e", + tokenRedeemRequestSet.Request_id, err, + )) } defer func() { if recover() != nil { - err = errors.New(fmt.Sprintf("Request %s: Redeem attempt panicked", tokenRedeemRequestSet.Request_id)) + err = errors.New(fmt.Sprintf( + "Request %s: Redeem attempt panicked", + tokenRedeemRequestSet.Request_id, + )) } }() var redeemedTokenResults []avroSchema.RedeemResult @@ -48,31 +79,40 @@ func SignedTokenRedeemHandler( } issuers, err := server.FetchAllIssuers() if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to fetch all issuers", tokenRedeemRequestSet.Request_id)) + return errors.New(fmt.Sprintf( + "Request %s: Failed to fetch all issuers", + tokenRedeemRequestSet.Request_id, + )) } for _, request := range tokenRedeemRequestSet.Data { var ( - verified = false - verifiedIssuer = &cbpServer.Issuer{} - verifiedCohort int32 = 0 + redemptionStatus RedemptionStatus = Unknown + verifiedIssuer = &cbpServer.Issuer{} + verifiedCohort int32 = 0 ) if request.Public_key == "" { - logger.Error().Msg(fmt.Sprintf("Request %s: Missing public key", tokenRedeemRequestSet.Request_id)) + logger.Error().Msg(fmt.Sprintf( + "Request %s: Missing public key", + tokenRedeemRequestSet.Request_id, + )) redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: ERROR, + Status: Error, Associated_data: request.Associated_data, }) continue } if request.Token_preimage == "" || request.Signature == "" || request.Binding == "" { - logger.Error().Msg(fmt.Sprintf("Request %s: Empty request", tokenRedeemRequestSet.Request_id)) + logger.Error().Msg(fmt.Sprintf( + "Request %s: Empty request", + tokenRedeemRequestSet.Request_id, + )) redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: ERROR, + Status: Error, Associated_data: request.Associated_data, }) continue @@ -81,57 +121,101 @@ func SignedTokenRedeemHandler( tokenPreimage := crypto.TokenPreimage{} err = tokenPreimage.UnmarshalText([]byte(request.Token_preimage)) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal text into preimage: %e", tokenRedeemRequestSet.Request_id, err)) + message := fmt.Sprintf( + "Request %s: Could not unmarshal text into preimage", + tokenRedeemRequestSet.Request_id, + ) + logger.Error().Err(err).Msg(message) + redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: -1, + Status: Unverified, + Associated_data: request.Associated_data, + }) + continue } verificationSignature := crypto.VerificationSignature{} err = verificationSignature.UnmarshalText([]byte(request.Signature)) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal text into verification signature: %e", tokenRedeemRequestSet.Request_id, err)) + message := fmt.Sprintf("Request %s: Could not unmarshal text into verification signature", tokenRedeemRequestSet.Request_id) + logger.Error().Err(err).Msg(message) + redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: -1, + Status: Unverified, + Associated_data: request.Associated_data, + }) + continue } - for _, issuer := range *issuers { - if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) { - continue - } - // Only attempt token verification with the issuer that was provided. - issuerPublicKey := issuer.SigningKey.PublicKey() - marshaledPublicKey, err := issuerPublicKey.MarshalText() - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal issuer public key into text: %e", tokenRedeemRequestSet.Request_id, err)) - } - logger.Trace().Msg(fmt.Sprintf("Request %s: Issuer: %s, Request: %s", tokenRedeemRequestSet.Request_id, string(marshaledPublicKey), request.Public_key)) - if string(marshaledPublicKey) == request.Public_key { - if err := btd.VerifyTokenRedemption( - &tokenPreimage, - &verificationSignature, - string(request.Binding), - []*crypto.SigningKey{issuer.SigningKey}, - ); err != nil { - verified = false - } else { - verified = true - verifiedIssuer = &issuer - verifiedCohort = int32(issuer.IssuerCohort) - break + if redemptionStatus == Unknown { + for _, issuer := range *issuers { + if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) { + continue + } + // Only attempt token verification with the issuer that was provided. + issuerPublicKey := issuer.SigningKey.PublicKey() + marshaledPublicKey, err := issuerPublicKey.MarshalText() + if err != nil { + return errors.New(fmt.Sprintf( + "Request %s: Could not unmarshal issuer public key into text: %e", + tokenRedeemRequestSet.Request_id, + err, + )) + } + logger.Trace().Msg(fmt.Sprintf( + "Request %s: Issuer: %s, Request: %s", + tokenRedeemRequestSet.Request_id, + string(marshaledPublicKey), + request.Public_key, + )) + if string(marshaledPublicKey) == request.Public_key { + if err := btd.VerifyTokenRedemption( + &tokenPreimage, + &verificationSignature, + string(request.Binding), + []*crypto.SigningKey{issuer.SigningKey}, + ); err != nil { + redemptionStatus = Unverified + } else { + redemptionStatus = Verified + verifiedIssuer = &issuer + verifiedCohort = int32(issuer.IssuerCohort) + break + } } } } - if !verified { - logger.Error().Msg(fmt.Sprintf("Request %s: Could not verify that the token redemption is valid", tokenRedeemRequestSet.Request_id)) + if !redemptionStatus == Verified { + logger.Error().Msg(fmt.Sprintf( + "Request %s: Could not verify that the token redemption is valid", + tokenRedeemRequestSet.Request_id, + )) redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: UNVERIFIED, + Status: Unverified, Associated_data: request.Associated_data, }) continue } else { - logger.Trace().Msg(fmt.Sprintf("Request %s: Validated", tokenRedeemRequestSet.Request_id)) + logger.Trace().Msg(fmt.Sprintf( + "Request %s: Validated", + tokenRedeemRequestSet.Request_id, + )) } if err := server.RedeemToken(verifiedIssuer, &tokenPreimage, string(request.Binding)); err != nil { - logger.Error().Err(err).Msg(fmt.Sprintf("Request %s: Token redemption failed: %e", tokenRedeemRequestSet.Request_id, err)) + logger.Error().Err(err).Msg(fmt.Sprintf( + "Request %s: Token redemption failed: %e", + tokenRedeemRequestSet.Request_id, + err, + )) if strings.Contains(err.Error(), "Duplicate") { - logger.Error().Msg(fmt.Sprintf("Request %s: Duplicate redemption: %e", tokenRedeemRequestSet.Request_id, err)) + logger.Error().Msg(fmt.Sprintf( + "Request %s: Duplicate redemption: %e", + tokenRedeemRequestSet.Request_id, + err, + )) redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, @@ -139,21 +223,27 @@ func SignedTokenRedeemHandler( Associated_data: request.Associated_data, }) } - logger.Error().Msg(fmt.Sprintf("Request %s: Could not mark token redemption", tokenRedeemRequestSet.Request_id)) + logger.Error().Msg(fmt.Sprintf( + "Request %s: Could not mark token redemption", + tokenRedeemRequestSet.Request_id, + )) redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: ERROR, + Status: Error, Associated_data: request.Associated_data, }) continue } - logger.Trace().Msg(fmt.Sprintf("Request %s: Redeemed", tokenRedeemRequestSet.Request_id)) + logger.Trace().Msg(fmt.Sprintf( + "Request %s: Redeemed", + tokenRedeemRequestSet.Request_id, + )) issuerName := verifiedIssuer.IssuerType redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: issuerName, Issuer_cohort: verifiedCohort, - Status: OK, + Status: redemptionStatus, Associated_data: request.Associated_data, }) } @@ -164,12 +254,21 @@ func SignedTokenRedeemHandler( var resultSetBuffer bytes.Buffer err = resultSet.Serialize(&resultSetBuffer) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %e", tokenRedeemRequestSet.Request_id, err)) + return errors.New(fmt.Sprintf( + "Request %s: Failed to serialize ResultSet: %e", + tokenRedeemRequestSet.Request_id, + err, + )) } err = Emit(producer, resultSetBuffer.Bytes(), logger) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", tokenRedeemRequestSet.Request_id, producer.Topic, err)) + return errors.New(fmt.Sprintf( + "Request %s: Failed to emit results to topic %s: %e", + tokenRedeemRequestSet.Request_id, + producer.Topic, + err, + )) } return nil } From 84f2eb20af3120d7120195096b1f482586ae09c2 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 21 Mar 2022 23:37:00 -0400 Subject: [PATCH 2/4] Switch Enum Rather than using a new enum, use the enum generated by gogen-avro for status management. Also make log lines shorter by splitting across multiple lines. --- kafka/signed_token_redeem_handler.go | 116 ++++++++++++++------------- 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index 91c1b110..3930ddbb 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -14,37 +14,6 @@ import ( "github.com/segmentio/kafka-go" ) -/* - RedemptionStatus is an enum that represents the redemption status of a given token. - These values are part of a contract with downstream consumers and the value assignment - for a given variant must never change without a change to ads-serve to accomodate. -*/ -type RedemptionStatus int64 - -const ( - Verified RedemptionStatus = iota - Duplicate - Unverified - Error - Unknown -) - -func (r RedemptionStatus) String() string { - switch r { - case Verified: - return "verified" - case Duplicate: - return "duplicate" - case Unverified: - return "unverified" - case Error: - return "error" - case Unknown: - return "unknown" - } - return "undefined" -} - /* BlindedTokenRedeemHandler emits payment tokens that correspond to the signed confirmation tokens provided. If it encounters an error, it returns a utils.ProcessingError that indicates @@ -63,7 +32,10 @@ func SignedTokenRedeemHandler( // Deserialize request into usable struct tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { - message := fmt.Sprintf("Request %s: Failed Avro deserialization", tokenRedeemRequestSet.Request_id) + message := fmt.Sprintf( + "Request %s: Failed Avro deserialization", + tokenRedeemRequestSet.Request_id, + ) return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger) } var redeemedTokenResults []avroSchema.RedeemResult @@ -77,7 +49,10 @@ func SignedTokenRedeemHandler( } issuers, err := server.FetchAllIssuers() if err != nil { - message := fmt.Sprintf("Request %s: Failed to fetch all issuers", tokenRedeemRequestSet.Request_id) + message := fmt.Sprintf( + "Request %s: Failed to fetch all issuers", + tokenRedeemRequestSet.Request_id, + ) return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger) } @@ -85,9 +60,9 @@ func SignedTokenRedeemHandler( // in the future if needed) for _, request := range tokenRedeemRequestSet.Data { var ( - redemptionStatus RedemptionStatus = Unknown - verifiedIssuer = &cbpServer.Issuer{} - verifiedCohort int32 = 0 + redemptionStatus = avroSchema.RedeemResultStatusOk + verifiedIssuer = &cbpServer.Issuer{} + verifiedCohort int32 = 0 ) if request.Public_key == "" { logger.Error().Msg(fmt.Sprintf( @@ -97,7 +72,7 @@ func SignedTokenRedeemHandler( redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: Error, + Status: avroSchema.RedeemResultStatusError, Associated_data: request.Associated_data, }) continue @@ -112,7 +87,7 @@ func SignedTokenRedeemHandler( redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: Error, + Status: avroSchema.RedeemResultStatusError, Associated_data: request.Associated_data, }) continue @@ -130,21 +105,25 @@ func SignedTokenRedeemHandler( redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: -1, - Status: Unverified, + Status: avroSchema.RedeemResultStatusUnverified, Associated_data: request.Associated_data, }) continue } + verificationSignature := crypto.VerificationSignature{} err = verificationSignature.UnmarshalText([]byte(request.Signature)) // Unmarshaling failure is a data issue and is probably permanent. if err != nil { - message := fmt.Sprintf("Request %s: Could not unmarshal text into verification signature", tokenRedeemRequestSet.Request_id) + message := fmt.Sprintf( + "Request %s: Could not unmarshal text into verification signature", + tokenRedeemRequestSet.Request_id, + ) logger.Error().Err(err).Msg(message) redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: -1, - Status: Unverified, + Status: avroSchema.RedeemResultStatusUnverified, Associated_data: request.Associated_data, }) continue @@ -159,9 +138,20 @@ func SignedTokenRedeemHandler( marshaledPublicKey, err := issuerPublicKey.MarshalText() // Unmarshaling failure is a data issue and is probably permanent. if err != nil { - message := fmt.Sprintf("Request %s: Could not unmarshal issuer public key into text", tokenRedeemRequestSet.Request_id) - return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger) + message := fmt.Sprintf( + "Request %s: Could not unmarshal issuer public key into text", + tokenRedeemRequestSet.Request_id, + ) + logger.Error().Err(err).Msg(message) + redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: -1, + Status: avroSchema.RedeemResultStatusUnverified, + Associated_data: request.Associated_data, + }) + continue } + if string(marshaledPublicKey) == request.Public_key { if err := btd.VerifyTokenRedemption( &tokenPreimage, @@ -169,9 +159,9 @@ func SignedTokenRedeemHandler( string(request.Binding), []*crypto.SigningKey{issuer.SigningKey}, ); err != nil { - redemptionStatus = Unverified + redemptionStatus = avroSchema.RedeemResultStatusUnverified } else { - redemptionStatus = Verified + redemptionStatus = avroSchema.RedeemResultStatusOk verifiedIssuer = &issuer verifiedCohort = int32(issuer.IssuerCohort) break @@ -179,7 +169,7 @@ func SignedTokenRedeemHandler( } } - if !redemptionStatus == Verified { + if !redemptionStatus == avroSchema.RedeemResultStatusOk { logger.Error().Msg(fmt.Sprintf( "Request %s: Could not verify that the token redemption is valid", tokenRedeemRequestSet.Request_id, @@ -187,40 +177,57 @@ func SignedTokenRedeemHandler( redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: Unverified, + Status: avroSchema.RedeemResultStatusUnverified, Associated_data: request.Associated_data, }) continue } else { - logger.Info().Msg(fmt.Sprintf("Request %s: Validated", tokenRedeemRequestSet.Request_id)) + logger.Info().Msg(fmt.Sprintf( + "Request %s: Validated", + tokenRedeemRequestSet.Request_id, + )) } + redemption, equivalence, err := server.CheckRedeemedTokenEquivalence(verifiedIssuer, &tokenPreimage, string(request.Binding), msg.Offset) if err != nil { - message := fmt.Sprintf("Request %s: Failed to check redemption equivalence", tokenRedeemRequestSet.Request_id) + message := fmt.Sprintf( + "Request %s: Failed to check redemption equivalence", + tokenRedeemRequestSet.Request_id, + ) return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger) } + // If the discovered equivalence is not one of the tolerableEquivalence // options this redemption is considered a duplicate. if !containsEquivalnce(tolerableEquivalence, equivalence) { - logger.Error().Msg(fmt.Sprintf("Request %s: Duplicate redemption: %e", tokenRedeemRequestSet.Request_id, err)) + logger.Error().Msg(fmt.Sprintf( + "Request %s: Duplicate redemption: %e", + tokenRedeemRequestSet.Request_id, + err, + )) redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: Duplicate, + Status: avroSchema.RedeemResultStatusDuplicate_redemption, Associated_data: request.Associated_data, }) continue } + if err := server.PersistRedemption(*redemption); err != nil { - logger.Error().Err(err).Msg(fmt.Sprintf("Request %s: Token redemption failed", tokenRedeemRequestSet.Request_id)) + logger.Error().Err(err).Msg(fmt.Sprintf( + "Request %s: Token redemption failed", + tokenRedeemRequestSet.Request_id, + )) redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: Error, + Status: avroSchema.RedeemResultStatusError, Associated_data: request.Associated_data, }) continue } + logger.Info().Msg(fmt.Sprintf( "Request %s: Redeemed", tokenRedeemRequestSet.Request_id, @@ -229,10 +236,11 @@ func SignedTokenRedeemHandler( redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ Issuer_name: issuerName, Issuer_cohort: verifiedCohort, - Status: redemptionStatus, + Status: avroSchema.RedeemResultStatusOk, Associated_data: request.Associated_data, }) } + resultSet := avroSchema.RedeemResultSet{ Request_id: tokenRedeemRequestSet.Request_id, Data: redeemedTokenResults, From 582c9e5fd06585efdf17110593b804e45492de8c Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 21 Mar 2022 23:40:23 -0400 Subject: [PATCH 3/4] Switch default redemptionStatus --- kafka/signed_token_redeem_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index 3930ddbb..98f119d5 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -60,7 +60,7 @@ func SignedTokenRedeemHandler( // in the future if needed) for _, request := range tokenRedeemRequestSet.Data { var ( - redemptionStatus = avroSchema.RedeemResultStatusOk + redemptionStatus = avroSchema.RedeemResultStatusUnverified verifiedIssuer = &cbpServer.Issuer{} verifiedCohort int32 = 0 ) From d771c1584d0cdc1e083fee48c7a3701921ff4e57 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Thu, 24 Mar 2022 00:37:46 -0400 Subject: [PATCH 4/4] Fix if condition --- kafka/signed_token_redeem_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index 98f119d5..5ebaf9ab 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -169,7 +169,7 @@ func SignedTokenRedeemHandler( } } - if !redemptionStatus == avroSchema.RedeemResultStatusOk { + if redemptionStatus != avroSchema.RedeemResultStatusOk { logger.Error().Msg(fmt.Sprintf( "Request %s: Could not verify that the token redemption is valid", tokenRedeemRequestSet.Request_id,