Skip to content

Commit

Permalink
improve log subscription unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed May 15, 2024
1 parent d0a93ef commit 373f6b9
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 10 deletions.
4 changes: 2 additions & 2 deletions go/common/subscription/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func ForwardFromChannels[R any](inputChannels []chan R, onMessage func(R) error, onBackendDisconnect func(), backendDisconnected *atomic.Bool, stopped *atomic.Bool, timeoutInterval time.Duration, logger gethlog.Logger) {
inputCases := make([]reflect.SelectCase, len(inputChannels)+1)

cleanupTicker := time.NewTicker(2 * time.Second)
cleanupTicker := time.NewTicker(1 * time.Second)
defer cleanupTicker.Stop()
// create a ticker to handle cleanup, check the "stopped" flag and exit the goroutine
inputCases[0] = reflect.SelectCase{
Expand All @@ -36,7 +36,7 @@ loop:
// this mechanism removes closed input channels. When there is none left, the subscription is considered "disconnected".
_, value, ok := reflect.Select(inputCases)
if !ok {
logger.Error("Failed to read from the channel")
logger.Debug("Failed to read from the channel")
break loop
}

Expand Down
6 changes: 5 additions & 1 deletion go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,11 @@ func (e *enclaveImpl) StopClient() common.SystemError {
}

func (e *enclaveImpl) sendBatch(batch *core.Batch, outChannel chan common.StreamL2UpdatesResponse) {
e.logger.Info("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo())
if batch.SeqNo().Uint64()%10 == 0 {
e.logger.Info("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo())
} else {
e.logger.Debug("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo())
}
extBatch, err := batch.ToExtBatch(e.dataEncryptionService, e.dataCompressionService)
if err != nil {
// this error is unrecoverable
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/l2chain/l2_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (oc *obscuroChain) GetBalanceAtBlock(ctx context.Context, accountAddr gethc
func (oc *obscuroChain) ObsCall(ctx context.Context, apiArgs *gethapi.TransactionArgs, blockNumber *gethrpc.BlockNumber) (*gethcore.ExecutionResult, error) {
result, err := oc.ObsCallAtBlock(ctx, apiArgs, blockNumber)
if err != nil {
oc.logger.Info(fmt.Sprintf("Obs_Call: failed to execute contract %s.", apiArgs.To), log.CtrErrKey, err.Error())
oc.logger.Debug(fmt.Sprintf("Obs_Call: failed to execute contract %s.", apiArgs.To), log.CtrErrKey, err.Error())
return nil, err
}

Expand Down
12 changes: 7 additions & 5 deletions go/host/events/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,18 @@ func (l *LogEventManager) Subscribe(id rpc.ID, encryptedLogSubscription common.E
func (l *LogEventManager) Unsubscribe(id rpc.ID) {
enclaveUnsubErr := l.sl.Enclaves().Unsubscribe(id)
if enclaveUnsubErr != nil {
// this can happen when the client passes a invalid subscription id
// this can happen when the client passes an invalid subscription id
l.logger.Debug("Could not terminate enclave subscription", log.SubIDKey, id, log.ErrKey, enclaveUnsubErr)
}
l.subscriptionMutex.Lock()
defer l.subscriptionMutex.Unlock()

l.subscriptionMutex.RLock()
logSubscription, found := l.subscriptions[id]
close(logSubscription.ch)
l.subscriptionMutex.RUnlock()

if found {
close(logSubscription.ch)
l.subscriptionMutex.Lock()
delete(l.subscriptions, id)
l.subscriptionMutex.Unlock()
if enclaveUnsubErr != nil {
l.logger.Error("The subscription management between the host and the enclave is out of sync", log.SubIDKey, id, log.ErrKey, enclaveUnsubErr)
}
Expand Down
5 changes: 4 additions & 1 deletion go/host/rpc/clientapi/client_api_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ func (api *FilterAPI) Logs(ctx context.Context, encryptedParams common.Encrypted
api.logger,
)
go subscriptioncommon.HandleUnsubscribe(subscription, func() {
api.host.UnsubscribeLogs(subscription.ID)
// first exit the forwarding go-routine
unsubscribed.Store(true)
time.Sleep(100 * time.Millisecond)
// and then close the channel
api.host.UnsubscribeLogs(subscription.ID)
})
return subscription, nil
}
Expand Down

0 comments on commit 373f6b9

Please sign in to comment.