Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
chore: use timeout for recursive lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Oct 31, 2023
1 parent 0dec09b commit e1d3468
Showing 1 changed file with 58 additions and 36 deletions.
94 changes: 58 additions & 36 deletions rpc/app_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"strconv"
"time"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc"
Expand Down Expand Up @@ -101,22 +102,29 @@ func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Con
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

currentHeight := height
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
for currentHeight >= 1 {
var header metadata.MD
atResp, err := queryClient.AttestationRequestByNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request
&celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce},
grpc.Header(&header), // Retrieve header from response
)
if err == nil {
unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation)
if err != nil {
return nil, err
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
var header metadata.MD
atResp, err := queryClient.AttestationRequestByNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request
&celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce},
grpc.Header(&header), // Retrieve header from response
)
if err == nil {
unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation)
if err != nil {
return nil, err
}
return unmarshalledAttestation, nil
}
return unmarshalledAttestation, nil
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
return nil, ErrNotFound
}
Expand Down Expand Up @@ -249,23 +257,30 @@ func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Val
// QueryRecursiveLatestValset query the latest recorded valset in the state machine in history.
func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uint64) (*celestiatypes.Valset, error) {
currentHeight := height
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
for currentHeight >= 1 {
latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight)
if err != nil {
return nil, err
}

if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil {
return vs, nil
}
if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil {
return vs, nil
}

latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight)
if err == nil {
return latestValset, nil
}
latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight)
if err == nil {
return latestValset, nil
}

aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
}
return nil, ErrNotFound
}
Expand Down Expand Up @@ -308,18 +323,25 @@ func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context.
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

currentHeight := height
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
for currentHeight >= 1 {
var header metadata.MD
resp, err := queryClient.LatestValsetRequestBeforeNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)),
&celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce},
grpc.Header(&header),
)
if err == nil {
return resp.Valset, err
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
var header metadata.MD
resp, err := queryClient.LatestValsetRequestBeforeNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)),
&celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce},
grpc.Header(&header),
)
if err == nil {
return resp.Valset, err
}
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
return nil, ErrNotFound
}
Expand Down

0 comments on commit e1d3468

Please sign in to comment.