diff --git a/fhevm-engine/fhevm-go-native/fhevm/api.go b/fhevm-engine/fhevm-go-native/fhevm/api.go index 9423850..21b2fd7 100644 --- a/fhevm-engine/fhevm-go-native/fhevm/api.go +++ b/fhevm-engine/fhevm-go-native/fhevm/api.go @@ -8,6 +8,7 @@ import ( "math/big" "os" "sort" + "strconv" "strings" "sync" "time" @@ -202,9 +203,21 @@ type ApiImpl struct { executorUrl string contractStorageAddress common.Address cache *CiphertextCache - logger ProxyLogger + // The offset from the current block number for committing the FHE computations. + // If set to 0, the computations are committed in the current block. + commitBlockOffset uint8 + logger ProxyLogger +} + +// Notify the executor that there is work available +func (s *ApiImpl) notifyWorkAvailable() { + select { + case s.cache.workAvailableChan <- true: + default: + } } + type SessionImpl struct { sessionStore *SessionComputationStore apiImpl *ApiImpl @@ -263,7 +276,9 @@ type SessionComputationStore struct { blockNumber int64 cache *CiphertextCache contractStorageAddress common.Address + commitBlockOffset uint8 logger ProxyLogger + } type EvmStorageComputationStore struct { @@ -286,6 +301,7 @@ func (executorApi *ApiImpl) CreateSession(blockNumber int64, hostLogger FHELogge blockNumber: blockNumber, cache: executorApi.cache, contractStorageAddress: executorApi.contractStorageAddress, + commitBlockOffset: executorApi.commitBlockOffset, logger: executorApi.logger, }, } @@ -413,6 +429,20 @@ func (sessionApi *SessionImpl) Commit(blockNumber int64, storage ChainStorageApi return err } + // Compute pending computations + if sessionApi.apiImpl.commitBlockOffset == 0 { + // Late commit is disabled, send compute gRPC request and waits for it to finish + err := executorProcessPendingComputations(sessionApi.apiImpl) + if err != nil { + log.Error("Executor failed", "block", blockNumber, "error", err) + return err + } + } else { + // Signal the executor thread that work is ready. + sessionApi.apiImpl.notifyWorkAvailable() + } + + // Flush the computation results to the state err = sessionApi.apiImpl.flushFheResultsToState(blockNumber, storage) if err != nil { return err @@ -510,7 +540,7 @@ func (dbApi *SessionComputationStore) InsertComputation(computation ComputationT // hardcode late commit for now to be 5 blocks from current block // in future we can implement dynamic compute, if user pays more // he can have faster commit - computation.CommitBlockId = dbApi.blockNumber + 5 + computation.CommitBlockId = dbApi.blockNumber + int64(dbApi.commitBlockOffset) dbApi.inserts = append(dbApi.inserts, computation) log.Info("Insert computation", "inserts count", len(dbApi.inserts), "computation", computation) @@ -760,12 +790,6 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain } - // notify about work available - select { - case dbApi.cache.workAvailableChan <- true: - default: - } - return nil } @@ -1007,6 +1031,17 @@ func InitExecutor(hostLogger FHELogger) (ExecutorApi, error) { // pick hardcoded value in the beginning, we can change later storageAddress := common.HexToAddress("0x0000000000000000000000000000000000000070") + + commitBlockOffset := uint8(0) + offset, hasOffset := os.LookupEnv("FHEVM_COMMIT_BLOCK_OFFSET") + if hasOffset { + parsedOffset, err := strconv.ParseUint(offset, 10, 8) + if err != nil { + logger("init").Crit("Invalid FHEVM_COMMIT_BLOCK_OFFSET", "error", err.Error()) + } + commitBlockOffset = uint8(parsedOffset) + } + log.Info("FHEVM initialized", "Executor addr", executorUrl, "FHEVM contract", contractAddr, @@ -1029,10 +1064,13 @@ func InitExecutor(hostLogger FHELogger) (ExecutorApi, error) { contractStorageAddress: storageAddress, executorUrl: executorUrl, cache: cache, + commitBlockOffset: uint8(commitBlockOffset), } // run executor worker in the background - go executorWorkerThread(apiImpl) + if commitBlockOffset > 0 { + go executorWorkerThread(apiImpl) + } return apiImpl, nil }