diff --git a/go/common/subscription/utils.go b/go/common/subscription/utils.go index 4179cf476c..cbd055f580 100644 --- a/go/common/subscription/utils.go +++ b/go/common/subscription/utils.go @@ -17,7 +17,7 @@ import ( func ForwardFromChannels[R any](inputChannels []chan R, onMessage func(R) error, onBackendDisconnect func(), backendDisconnected *atomic.Bool, stopped *atomic.Bool, timeoutInterval time.Duration, logger gethlog.Logger) { inputCases := make([]reflect.SelectCase, len(inputChannels)+1) - cleanupTicker := time.NewTicker(2 * time.Second) + cleanupTicker := time.NewTicker(1 * time.Second) defer cleanupTicker.Stop() // create a ticker to handle cleanup, check the "stopped" flag and exit the goroutine inputCases[0] = reflect.SelectCase{ @@ -36,7 +36,7 @@ loop: // this mechanism removes closed input channels. When there is none left, the subscription is considered "disconnected". _, value, ok := reflect.Select(inputCases) if !ok { - logger.Error("Failed to read from the channel") + logger.Debug("Failed to read from the channel") break loop } diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index bbc7e46210..e8013a3e8a 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -347,7 +347,11 @@ func (e *enclaveImpl) StopClient() common.SystemError { } func (e *enclaveImpl) sendBatch(batch *core.Batch, outChannel chan common.StreamL2UpdatesResponse) { - e.logger.Info("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo()) + if batch.SeqNo().Uint64()%10 == 0 { + e.logger.Info("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo()) + } else { + e.logger.Debug("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo()) + } extBatch, err := batch.ToExtBatch(e.dataEncryptionService, e.dataCompressionService) if err != nil { // this error is unrecoverable diff --git a/go/enclave/l2chain/l2_chain.go b/go/enclave/l2chain/l2_chain.go index f01d825a9b..b0c34c4c32 100644 --- a/go/enclave/l2chain/l2_chain.go +++ b/go/enclave/l2chain/l2_chain.go @@ -104,7 +104,7 @@ func (oc *obscuroChain) GetBalanceAtBlock(ctx context.Context, accountAddr gethc func (oc *obscuroChain) ObsCall(ctx context.Context, apiArgs *gethapi.TransactionArgs, blockNumber *gethrpc.BlockNumber) (*gethcore.ExecutionResult, error) { result, err := oc.ObsCallAtBlock(ctx, apiArgs, blockNumber) if err != nil { - oc.logger.Info(fmt.Sprintf("Obs_Call: failed to execute contract %s.", apiArgs.To), log.CtrErrKey, err.Error()) + oc.logger.Debug(fmt.Sprintf("Obs_Call: failed to execute contract %s.", apiArgs.To), log.CtrErrKey, err.Error()) return nil, err } diff --git a/go/host/events/logs.go b/go/host/events/logs.go index b0f25a246d..ac57b69fdf 100644 --- a/go/host/events/logs.go +++ b/go/host/events/logs.go @@ -63,16 +63,18 @@ func (l *LogEventManager) Subscribe(id rpc.ID, encryptedLogSubscription common.E func (l *LogEventManager) Unsubscribe(id rpc.ID) { enclaveUnsubErr := l.sl.Enclaves().Unsubscribe(id) if enclaveUnsubErr != nil { - // this can happen when the client passes a invalid subscription id + // this can happen when the client passes an invalid subscription id l.logger.Debug("Could not terminate enclave subscription", log.SubIDKey, id, log.ErrKey, enclaveUnsubErr) } - l.subscriptionMutex.Lock() - defer l.subscriptionMutex.Unlock() - + l.subscriptionMutex.RLock() logSubscription, found := l.subscriptions[id] + close(logSubscription.ch) + l.subscriptionMutex.RUnlock() + if found { - close(logSubscription.ch) + l.subscriptionMutex.Lock() delete(l.subscriptions, id) + l.subscriptionMutex.Unlock() if enclaveUnsubErr != nil { l.logger.Error("The subscription management between the host and the enclave is out of sync", log.SubIDKey, id, log.ErrKey, enclaveUnsubErr) } diff --git a/go/host/rpc/clientapi/client_api_filter.go b/go/host/rpc/clientapi/client_api_filter.go index f061a9bee0..64746208af 100644 --- a/go/host/rpc/clientapi/client_api_filter.go +++ b/go/host/rpc/clientapi/client_api_filter.go @@ -77,8 +77,11 @@ func (api *FilterAPI) Logs(ctx context.Context, encryptedParams common.Encrypted api.logger, ) go subscriptioncommon.HandleUnsubscribe(subscription, func() { - api.host.UnsubscribeLogs(subscription.ID) + // first exit the forwarding go-routine unsubscribed.Store(true) + time.Sleep(100 * time.Millisecond) + // and then close the channel + api.host.UnsubscribeLogs(subscription.ID) }) return subscription, nil }