Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/fix unmarshall error #110

Open
wants to merge 7 commits into
base: ads-stage
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 106 additions & 37 deletions kafka/signed_token_redeem_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,21 @@ import (
permanent and the attempt should be abandoned.
*/
func SignedTokenRedeemHandler(

msg kafka.Message,
producer *kafka.Writer,
tolerableEquivalence []cbpServer.Equivalence,
server *cbpServer.Server,
results chan *utils.ProcessingError,
logger *zerolog.Logger,
) *utils.ProcessingError {
const (
OK = 0
DUPLICATE_REDEMPTION = 1
UNVERIFIED = 2
ERROR = 3
)
data := msg.Value
// Deserialize request into usable struct
tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data))
if err != nil {
message := fmt.Sprintf("Request %s: Failed Avro deserialization", tokenRedeemRequestSet.Request_id)
message := fmt.Sprintf(
"Request %s: Failed Avro deserialization",
tokenRedeemRequestSet.Request_id,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger)
}
var redeemedTokenResults []avroSchema.RedeemResult
Expand All @@ -53,36 +49,45 @@ func SignedTokenRedeemHandler(
}
issuers, err := server.FetchAllIssuers()
if err != nil {
message := fmt.Sprintf("Request %s: Failed to fetch all issuers", tokenRedeemRequestSet.Request_id)
message := fmt.Sprintf(
"Request %s: Failed to fetch all issuers",
tokenRedeemRequestSet.Request_id,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger)
}

// Iterate over requests (only one at this point but the schema can support more
// in the future if needed)
for _, request := range tokenRedeemRequestSet.Data {
var (
verified = false
verifiedIssuer = &cbpServer.Issuer{}
verifiedCohort int32 = 0
redemptionStatus = avroSchema.RedeemResultStatusUnverified
verifiedIssuer = &cbpServer.Issuer{}
verifiedCohort int32 = 0
)
if request.Public_key == "" {
logger.Error().Msg(fmt.Sprintf("Request %s: Missing public key", tokenRedeemRequestSet.Request_id))
logger.Error().Msg(fmt.Sprintf(
"Request %s: Missing public key",
tokenRedeemRequestSet.Request_id,
))
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: "",
Issuer_cohort: 0,
Status: ERROR,
Status: avroSchema.RedeemResultStatusError,
Associated_data: request.Associated_data,
})
continue
}

// preimage, signature, and binding are all required to proceed
if request.Token_preimage == "" || request.Signature == "" || request.Binding == "" {
logger.Error().Msg(fmt.Sprintf("Request %s: Empty request", tokenRedeemRequestSet.Request_id))
logger.Error().Msg(fmt.Sprintf(
"Request %s: Empty request",
tokenRedeemRequestSet.Request_id,
))
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: "",
Issuer_cohort: 0,
Status: ERROR,
Status: avroSchema.RedeemResultStatusError,
Associated_data: request.Associated_data,
})
continue
Expand All @@ -92,16 +97,38 @@ func SignedTokenRedeemHandler(
err = tokenPreimage.UnmarshalText([]byte(request.Token_preimage))
// Unmarshaling failure is a data issue and is probably permanent.
if err != nil {
message := fmt.Sprintf("Request %s: Could not unmarshal text into preimage", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger)
message := fmt.Sprintf(
"Request %s: Could not unmarshal text into preimage",
tokenRedeemRequestSet.Request_id,
)
logger.Error().Err(err).Msg(message)
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: "",
Issuer_cohort: -1,
Status: avroSchema.RedeemResultStatusUnverified,
Associated_data: request.Associated_data,
})
continue
}

verificationSignature := crypto.VerificationSignature{}
err = verificationSignature.UnmarshalText([]byte(request.Signature))
// Unmarshaling failure is a data issue and is probably permanent.
if err != nil {
message := fmt.Sprintf("Request %s: Could not unmarshal text into verification signature", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger)
message := fmt.Sprintf(
"Request %s: Could not unmarshal text into verification signature",
tokenRedeemRequestSet.Request_id,
)
logger.Error().Err(err).Msg(message)
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: "",
Issuer_cohort: -1,
Status: avroSchema.RedeemResultStatusUnverified,
Associated_data: request.Associated_data,
})
continue
}

for _, issuer := range *issuers {
if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) {
continue
Expand All @@ -111,88 +138,130 @@ func SignedTokenRedeemHandler(
marshaledPublicKey, err := issuerPublicKey.MarshalText()
// Unmarshaling failure is a data issue and is probably permanent.
if err != nil {
message := fmt.Sprintf("Request %s: Could not unmarshal issuer public key into text", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger)
message := fmt.Sprintf(
"Request %s: Could not unmarshal issuer public key into text",
tokenRedeemRequestSet.Request_id,
)
logger.Error().Err(err).Msg(message)
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: "",
Issuer_cohort: -1,
Status: avroSchema.RedeemResultStatusUnverified,
Associated_data: request.Associated_data,
})
continue
}

if string(marshaledPublicKey) == request.Public_key {
if err := btd.VerifyTokenRedemption(
&tokenPreimage,
&verificationSignature,
string(request.Binding),
[]*crypto.SigningKey{issuer.SigningKey},
); err != nil {
verified = false
redemptionStatus = avroSchema.RedeemResultStatusUnverified
} else {
verified = true
redemptionStatus = avroSchema.RedeemResultStatusOk
verifiedIssuer = &issuer
verifiedCohort = int32(issuer.IssuerCohort)
break
}
}
}

if !verified {
logger.Error().Msg(fmt.Sprintf("Request %s: Could not verify that the token redemption is valid", tokenRedeemRequestSet.Request_id))
if redemptionStatus != avroSchema.RedeemResultStatusOk {
logger.Error().Msg(fmt.Sprintf(
"Request %s: Could not verify that the token redemption is valid",
tokenRedeemRequestSet.Request_id,
))
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: "",
Issuer_cohort: 0,
Status: UNVERIFIED,
Status: avroSchema.RedeemResultStatusUnverified,
Associated_data: request.Associated_data,
})
continue
} else {
logger.Info().Msg(fmt.Sprintf("Request %s: Validated", tokenRedeemRequestSet.Request_id))
logger.Info().Msg(fmt.Sprintf(
"Request %s: Validated",
tokenRedeemRequestSet.Request_id,
))
}

redemption, equivalence, err := server.CheckRedeemedTokenEquivalence(verifiedIssuer, &tokenPreimage, string(request.Binding), msg.Offset)
if err != nil {
message := fmt.Sprintf("Request %s: Failed to check redemption equivalence", tokenRedeemRequestSet.Request_id)
message := fmt.Sprintf(
"Request %s: Failed to check redemption equivalence",
tokenRedeemRequestSet.Request_id,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger)
}

// If the discovered equivalence is not one of the tolerableEquivalence
// options this redemption is considered a duplicate.
if !containsEquivalnce(tolerableEquivalence, equivalence) {
logger.Error().Msg(fmt.Sprintf("Request %s: Duplicate redemption: %e", tokenRedeemRequestSet.Request_id, err))
logger.Error().Msg(fmt.Sprintf(
"Request %s: Duplicate redemption: %e",
tokenRedeemRequestSet.Request_id,
err,
))
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: "",
Issuer_cohort: 0,
Status: DUPLICATE_REDEMPTION,
Status: avroSchema.RedeemResultStatusDuplicate_redemption,
Associated_data: request.Associated_data,
})
continue
}

if err := server.PersistRedemption(*redemption); err != nil {
logger.Error().Err(err).Msg(fmt.Sprintf("Request %s: Token redemption failed", tokenRedeemRequestSet.Request_id))
logger.Error().Err(err).Msg(fmt.Sprintf(
"Request %s: Token redemption failed",
tokenRedeemRequestSet.Request_id,
))
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: "",
Issuer_cohort: 0,
Status: ERROR,
Status: avroSchema.RedeemResultStatusError,
Associated_data: request.Associated_data,
})
continue
}
logger.Info().Msg(fmt.Sprintf("Request %s: Redeemed", tokenRedeemRequestSet.Request_id))

logger.Info().Msg(fmt.Sprintf(
"Request %s: Redeemed",
tokenRedeemRequestSet.Request_id,
))
issuerName := verifiedIssuer.IssuerType
redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{
Issuer_name: issuerName,
Issuer_cohort: verifiedCohort,
Status: OK,
Status: avroSchema.RedeemResultStatusOk,
Associated_data: request.Associated_data,
})
}

resultSet := avroSchema.RedeemResultSet{
Request_id: tokenRedeemRequestSet.Request_id,
Data: redeemedTokenResults,
}
var resultSetBuffer bytes.Buffer
err = resultSet.Serialize(&resultSetBuffer)
if err != nil {
message := fmt.Sprintf("Request %s: Failed to serialize ResultSet", tokenRedeemRequestSet.Request_id)
message := fmt.Sprintf(
"Request %s: Failed to serialize ResultSet",
tokenRedeemRequestSet.Request_id,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger)
}

err = Emit(producer, resultSetBuffer.Bytes(), logger)
if err != nil {
message := fmt.Sprintf("Request %s: Failed to emit results to topic %s", tokenRedeemRequestSet.Request_id, producer.Topic)
message := fmt.Sprintf(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should increase Prometheus counters on these unexpected cases and wire alert manager to notify us.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right before all return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger) instructions

"Request %s: Failed to emit results to topic %s",
tokenRedeemRequestSet.Request_id,
producer.Topic,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, msg, logger)
}
return nil
Expand Down