From e1d3468166e024ffb410b25b27d239210027c8b3 Mon Sep 17 00:00:00 2001 From: rachid Date: Tue, 31 Oct 2023 16:16:02 +0100 Subject: [PATCH] chore: use timeout for recursive lookup --- rpc/app_querier.go | 94 ++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 36 deletions(-) diff --git a/rpc/app_querier.go b/rpc/app_querier.go index ac556d06..86047244 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -3,6 +3,7 @@ package rpc import ( "context" "strconv" + "time" "github.com/celestiaorg/celestia-app/pkg/appconsts" cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc" @@ -101,22 +102,29 @@ func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Con queryClient := celestiatypes.NewQueryClient(aq.clientConn) currentHeight := height + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() for currentHeight >= 1 { - var header metadata.MD - atResp, err := queryClient.AttestationRequestByNonce( - metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request - &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, - grpc.Header(&header), // Retrieve header from response - ) - if err == nil { - unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) - if err != nil { - return nil, err + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + var header metadata.MD + atResp, err := queryClient.AttestationRequestByNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request + &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, + grpc.Header(&header), // Retrieve header from response + ) + if err == nil { + unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) + if err != nil { + return nil, err + } + return unmarshalledAttestation, nil } - return unmarshalledAttestation, nil + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) - currentHeight -= uint64(BlocksIn20DaysPeriod) } return nil, ErrNotFound } @@ -249,23 +257,30 @@ func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Val // QueryRecursiveLatestValset query the latest recorded valset in the state machine in history. func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uint64) (*celestiatypes.Valset, error) { currentHeight := height + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() for currentHeight >= 1 { - latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight) - if err != nil { - return nil, err - } + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight) + if err != nil { + return nil, err + } - if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { - return vs, nil - } + if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { + return vs, nil + } - latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) - if err == nil { - return latestValset, nil - } + latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) + if err == nil { + return latestValset, nil + } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) - currentHeight -= uint64(BlocksIn20DaysPeriod) + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) + } } return nil, ErrNotFound } @@ -308,18 +323,25 @@ func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context. queryClient := celestiatypes.NewQueryClient(aq.clientConn) currentHeight := height + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() for currentHeight >= 1 { - var header metadata.MD - resp, err := queryClient.LatestValsetRequestBeforeNonce( - metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), - &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, - grpc.Header(&header), - ) - if err == nil { - return resp.Valset, err + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + var header metadata.MD + resp, err := queryClient.LatestValsetRequestBeforeNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, + grpc.Header(&header), + ) + if err == nil { + return resp.Valset, err + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) - currentHeight -= uint64(BlocksIn20DaysPeriod) } return nil, ErrNotFound }