Skip to content

Commit

Permalink
Merge branch 'georgi/logging-improvements' into georgi/configurable-l…
Browse files Browse the repository at this point in the history
…ate-commit
  • Loading branch information
goshawk-3 authored Dec 19, 2024
2 parents 01356c7 + 6e7a043 commit 662a74e
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 31 deletions.
69 changes: 38 additions & 31 deletions fhevm-engine/fhevm-go-native/fhevm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"

_ "github.com/mattn/go-sqlite3"
grpc "google.golang.org/grpc"
Expand Down Expand Up @@ -126,7 +125,10 @@ type ExecutorApi interface {
// the cache prepared to be inserted when commit block comes.
// We pass current block number to know at which
// block ciphertext should be materialized inside blockchain state.
CreateSession(blockNumber int64) ExecutorSession
//
// HostLogger is an implementation of FHELogger from the host chain,
// used to delegate logging. If set to nil, logging will be disabled.
CreateSession(blockNumber int64, hostLogger FHELogger) ExecutorSession
// Preload ciphertexts into cache and perform initial computations,
// should be called once after blockchain node initialization
PreloadCiphertexts(blockNumber int64, api ChainStorageApi) error
Expand Down Expand Up @@ -201,10 +203,10 @@ type ApiImpl struct {
executorUrl string
contractStorageAddress common.Address
cache *CiphertextCache

// The offset from the current block number for committing the FHE computations.
// If set to 0, the computations are committed in the current block.
commitBlockOffset uint8
logger ProxyLogger
}

// Notify the executor that there is work available
Expand All @@ -215,6 +217,7 @@ func (s *ApiImpl) notifyWorkAvailable() {
}
}


type SessionImpl struct {
sessionStore *SessionComputationStore
apiImpl *ApiImpl
Expand Down Expand Up @@ -274,15 +277,20 @@ type SessionComputationStore struct {
cache *CiphertextCache
contractStorageAddress common.Address
commitBlockOffset uint8
logger ProxyLogger

}

type EvmStorageComputationStore struct {
currentBlockNumber int64
contractStorageAddress common.Address
cache *CiphertextCache
logger ProxyLogger
}

func (executorApi *ApiImpl) CreateSession(blockNumber int64) ExecutorSession {
func (executorApi *ApiImpl) CreateSession(blockNumber int64, hostLogger FHELogger) ExecutorSession {
executorApi.logger = log(hostLogger, "module::fhevm")

return &SessionImpl{
apiImpl: executorApi,
sessionStore: &SessionComputationStore{
Expand All @@ -294,12 +302,13 @@ func (executorApi *ApiImpl) CreateSession(blockNumber int64) ExecutorSession {
cache: executorApi.cache,
contractStorageAddress: executorApi.contractStorageAddress,
commitBlockOffset: executorApi.commitBlockOffset,
logger: executorApi.logger,
},
}
}

func (executorApi *ApiImpl) PreloadCiphertexts(blockNumber int64, api ChainStorageApi) error {
log := logger("preload")
log := log(&executorApi.logger, "preload")

computations := executorApi.loadComputationsFromStateToCache(blockNumber, api)
log.Info("Preload ciphertexts", "block", blockNumber, "length", computations)
Expand All @@ -314,7 +323,7 @@ func (executorApi *ApiImpl) loadComputationsFromStateToCache(startBlockNumber in
loadStartTime := time.Now()
computations := 0
defer func() {
log := logger("preload")
log := log(&executorApi.logger, "preload")
duration := time.Since(loadStartTime)
log.Info("Preload done", "computations", computations, "duration", duration)
}()
Expand Down Expand Up @@ -411,7 +420,7 @@ func (executorApi *ApiImpl) loadComputationsFromStateToCache(startBlockNumber in
}

func (sessionApi *SessionImpl) Commit(blockNumber int64, storage ChainStorageApi) error {
log := logger("commit")
log := log(&sessionApi.apiImpl.logger, "commit")

log.Debug("Session store ciphertexts", "block", blockNumber)
err := sessionApi.sessionStore.Commit(storage)
Expand Down Expand Up @@ -442,7 +451,7 @@ func (sessionApi *SessionImpl) Commit(blockNumber int64, storage ChainStorageApi
}

func (sessionApi *SessionImpl) Execute(dataOrig []byte, ed ExtraData, outputOrig []byte) error {
log := logger("session::execute")
log := log(&sessionApi.apiImpl.logger, "session::execute")

if len(dataOrig) < 4 {
err := fmt.Errorf("input data must be at least 4 bytes for signature, got %d", len(dataOrig))
Expand Down Expand Up @@ -521,7 +530,7 @@ func (dbApi *SessionComputationStore) InsertComputationBatch(computations []Comp
}

func (dbApi *SessionComputationStore) InsertComputation(computation ComputationToInsert) error {
log := logger("store")
log := log(&dbApi.logger, "session::execute")

_, found := dbApi.insertedHandles[string(computation.OutputHandle)]
if !found {
Expand Down Expand Up @@ -681,16 +690,16 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain
// We create buckets, how many blocks in the future user wants
// his ciphertexts to be evaluated

log := logger("evm_store")
log := log(&dbApi.logger, "evm_store")
log.Info("Processing computations", "count", len(computations))

buckets := make(map[int64][]*ComputationToInsert)
// index the buckets
for _, comp := range computations {
for ind, comp := range computations {
if buckets[comp.CommitBlockId] == nil {
buckets[comp.CommitBlockId] = make([]*ComputationToInsert, 0)
}
buckets[comp.CommitBlockId] = append(buckets[comp.CommitBlockId], &comp)
buckets[comp.CommitBlockId] = append(buckets[comp.CommitBlockId], &computations[ind])
}

if len(buckets) != 0 {
Expand Down Expand Up @@ -785,7 +794,7 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain
}

func (dbApi *EvmStorageComputationStore) hydrateComputationFromEvmState(evmStorage ChainStorageApi, comp *ComputationToInsert) error {
log := logger("evm_store")
log := log(&dbApi.logger, "evm_store")

// hydrate operands from storage
for idx := range comp.Operands {
Expand Down Expand Up @@ -872,7 +881,7 @@ func ReadBytesToAddress(api ChainStorageApi, contractAddress common.Address, add
}

func (executorApi *ApiImpl) flushFheResultsToState(blockNumber int64, api ChainStorageApi) error {
log := logger("flush")
log := log(&executorApi.logger, "flush")

// cleanup the queue for the block number
countAddress := blockNumberToQueueItemCountAddress(blockNumber)
Expand Down Expand Up @@ -937,6 +946,8 @@ func (executorApi *ApiImpl) materializeHandlesInStorage(blockNumber int64, handl
executorApi.cache.lock.Unlock()
}()

log := log(&executorApi.logger, "materialize")

executorApi.cache.latestBlockFlushed = blockNumber

contractAddr := executorApi.contractStorageAddress
Expand All @@ -947,7 +958,6 @@ func (executorApi *ApiImpl) materializeHandlesInStorage(blockNumber int64, handl
return nil
}

log := logger("materialize")
for _, handle := range handles {
ciphertext, ok := blockData.materializedCiphertexts[string(handle[:])]
if !ok {
Expand Down Expand Up @@ -998,7 +1008,9 @@ func ciphertextCacheGc(cache *CiphertextCache) {
cache.lastCacheGc = time.Now()
}

func InitExecutor() (ExecutorApi, error) {
func InitExecutor(hostLogger FHELogger) (ExecutorApi, error) {
log := log(hostLogger, "module::fhevm")

executorUrl, hasUrl := os.LookupEnv("FHEVM_EXECUTOR_URL")
if !hasUrl {
return nil, errors.New("FHEVM_EXECUTOR_URL is not configured")
Expand All @@ -1019,6 +1031,7 @@ func InitExecutor() (ExecutorApi, error) {
// pick hardcoded value in the beginning, we can change later
storageAddress := common.HexToAddress("0x0000000000000000000000000000000000000070")


commitBlockOffset := uint8(0)
offset, hasOffset := os.LookupEnv("FHEVM_COMMIT_BLOCK_OFFSET")
if hasOffset {
Expand All @@ -1029,7 +1042,7 @@ func InitExecutor() (ExecutorApi, error) {
commitBlockOffset = uint8(parsedOffset)
}

logger("init").Info("FHEVM initialized",
log.Info("FHEVM initialized",
"Executor addr", executorUrl,
"FHEVM contract", contractAddr,
"ACL contract", aclContractAddressHex,
Expand Down Expand Up @@ -1063,7 +1076,8 @@ func InitExecutor() (ExecutorApi, error) {
}

func executorWorkerThread(impl *ApiImpl) {
log := logger("worker")
log := log(&impl.logger, "worker")

for {
// try reading notification from channel
<-impl.cache.workAvailableChan
Expand All @@ -1080,23 +1094,15 @@ func executorWorkerThread(impl *ApiImpl) {
}

func executorProcessPendingComputations(impl *ApiImpl) error {
log := logger("sync compute")
log := log(&impl.logger, "sync_compute")

startTime := time.Now()
impl.cache.lock.Lock()
defer func() {
impl.cache.lock.Unlock()
}()

availableCts := len(impl.cache.ciphertextsToCompute)

defer func() {
if availableCts > 0 {
duration := time.Since(startTime).Milliseconds()
log.Debug("Computations completed", "duration", duration)
}
}()

// empty channel from multiple notifications before processing
for len(impl.cache.workAvailableChan) > 0 {
<-impl.cache.workAvailableChan
Expand Down Expand Up @@ -1175,6 +1181,7 @@ func executorProcessPendingComputations(impl *ApiImpl) error {
}
}

startTime := time.Now()
client := NewFhevmExecutorClient(conn)
response, err := client.SyncCompute(context.Background(), &request)
if err != nil {
Expand All @@ -1186,6 +1193,10 @@ func executorProcessPendingComputations(impl *ApiImpl) error {
return errors.New(response.GetError().String())
}

if availableCts > 0 {
log.Debug("Computations completed", "duration", time.Since(startTime))
}

log.Info("Response", "ciphertexts count", len(ciphertexts.Ciphertexts))

outCts := ciphertexts.Ciphertexts
Expand Down Expand Up @@ -1213,7 +1224,3 @@ func executorProcessPendingComputations(impl *ApiImpl) error {

return nil
}

func logger(ctx string) log.Logger {
return log.Root().With("module", fmt.Sprintf("fhevm:%s", ctx))
}
128 changes: 128 additions & 0 deletions fhevm-engine/fhevm-go-native/fhevm/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package fhevm

import (
"fmt"
"os"
)

// Map FheOp to its string representations
var fheOpNames = map[FheOp]string{
FheAdd: "FheAdd",
FheSub: "FheSub",
FheMul: "FheMul",
FheDiv: "FheDiv",
FheRem: "FheRem",
FheBitAnd: "FheBitAnd",
FheBitOr: "FheBitOr",
FheBitXor: "FheBitXor",
FheShl: "FheShl",
FheShr: "FheShr",
FheRotl: "FheRotl",
FheRotr: "FheRotr",
FheEq: "FheEq",
FheNe: "FheNe",
FheGe: "FheGe",
FheGt: "FheGt",
FheLe: "FheLe",
FheLt: "FheLt",
FheMin: "FheMin",
FheMax: "FheMax",
FheNeg: "FheNeg",
FheNot: "FheNot",
FheCast: "FheCast",
TrivialEncrypt: "TrivialEncrypt",
FheIfThenElse: "FheIfThenElse",
FheRand: "FheRand",
FheRandBounded: "FheRandBounded",
}

// String implements the fmt.Stringer interface for FheOp
func (op FheOp) String() string {
if name, ok := fheOpNames[op]; ok {
return name
}
return fmt.Sprintf("UnknownFheOp(%d)", op)
}

// A FHELogger writes key/value pairs to a Handler
type FHELogger interface {
Trace(msg string, ctx ...interface{})
Debug(msg string, ctx ...interface{})
Info(msg string, ctx ...interface{})
Warn(msg string, ctx ...interface{})
Error(msg string, ctx ...interface{})
Crit(msg string, ctx ...interface{})
}

// ProxyLogger is a concrete implementation of FHELogger that adds extra context to all calls.
type ProxyLogger struct {
// logger is the underlying logger that ProxyLogger delegates to.
// This should be the concrete logger implementation of the Host
logger FHELogger
ctx []interface{}
}

// log creates a new ProxyLogger instance with the given logger and context.
func log(logger FHELogger, ctx ...interface{}) ProxyLogger {
return ProxyLogger{
logger: logger,
ctx: ctx,
}
}

// Trace adds extra context and delegates the call.
func (p *ProxyLogger) Trace(msg string, ctx ...interface{}) {
if p.logger == nil {
return
}

p.logger.Trace(msg, append(p.ctx, ctx...)...)
}

// Debug adds extra context and delegates the call.
func (p *ProxyLogger) Debug(msg string, ctx ...interface{}) {
if p.logger == nil {
return
}

p.logger.Debug(msg, append(p.ctx, ctx...)...)
}

// Info adds extra context and delegates the call.
func (p *ProxyLogger) Info(msg string, ctx ...interface{}) {
if p.logger == nil {
return
}

p.logger.Info(msg, append(p.ctx, ctx...)...)
}

// Warn adds extra context and delegates the call.
func (p *ProxyLogger) Warn(msg string, ctx ...interface{}) {
if p.logger == nil {
return
}

p.logger.Warn(msg, append(p.ctx, ctx...)...)
}

// Error adds extra context and delegates the call.
func (p *ProxyLogger) Error(msg string, ctx ...interface{}) {
if p.logger == nil {
return
}

p.logger.Error(msg, append(p.ctx, ctx...)...)
}

// Crit adds extra context and delegates the call.
// It terminates the process after logging the message.
// This is useful for fatal errors.
func (p *ProxyLogger) Crit(msg string, ctx ...interface{}) {
if p.logger != nil {
p.logger.Crit(msg, append(p.ctx, ctx...)...)
}

// Exit the process
os.Exit(1)
}

0 comments on commit 662a74e

Please sign in to comment.