From 509bbb2325c2c92d3842e0ea22805fa1c12e6dfc Mon Sep 17 00:00:00 2001 From: Bogdan Prodan Date: Sun, 8 Sep 2024 13:21:36 +0300 Subject: [PATCH 1/4] Added solver part --- Makefile | 12 +++ go.mod | 8 +- go.sum | 12 ++- logger/logger.go | 51 ++++++++-- relay/server/dapp.go | 29 +++--- relay/server/model.go | 9 ++ relay/server/router.go | 2 +- relay/server/server.go | 46 +++++++-- relay/server/solver.go | 21 +++- relay/server/solver_handler.go | 169 +++++++++++++++++++++++++++++++++ relay/service/intent.go | 56 ++++++++--- relay/service/subscription.go | 121 +++++++++++++++++++++++ 12 files changed, 487 insertions(+), 49 deletions(-) create mode 100644 Makefile create mode 100644 relay/server/model.go create mode 100644 relay/server/solver_handler.go create mode 100644 relay/service/subscription.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b60f68b --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +GOLANGCI_LINT_VERSION=v1.59.1 + +.PHONY: lint +lint: golangci-lint + @golangci-lint run --timeout 10m0s + +.PHONY: golangci-lint +golangci-lint: + @if ! command -v golangci-lint >/dev/null 2>&1; then \ + echo "golangci-lint not found, installing..."; \ + $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@${GOLANGCI_LINT_VERSION}; \ + fi \ No newline at end of file diff --git a/go.mod b/go.mod index b99fc24..61a435e 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,14 @@ go 1.22.5 require ( github.com/FastLane-Labs/atlas-sdk-go v0.0.0-20240905084332-938389daf445 - github.com/bloXroute-Labs/bloxroute-sdk-go v1.5.0 + github.com/bloXroute-Labs/bloxroute-sdk-go v1.5.1 + github.com/cornelk/hashmap v1.0.8 github.com/ethereum/go-ethereum v1.14.8 github.com/go-playground/validator/v10 v10.19.0 + github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.1 + github.com/sourcegraph/jsonrpc2 v0.2.0 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 github.com/valyala/fastjson v1.6.4 @@ -20,7 +23,7 @@ require ( github.com/andybalholm/brotli v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.13.0 // indirect - github.com/bloXroute-Labs/gateway/v2 v2.129.2 // indirect + github.com/bloXroute-Labs/gateway/v2 v2.129.19 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -65,7 +68,6 @@ require ( github.com/satori/go.uuid v1.2.1-0.20181016170032-d91630c85102 // indirect github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 // indirect github.com/sourcegraph/conc v0.3.0 // indirect - github.com/sourcegraph/jsonrpc2 v0.2.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index 0ac24a3..f412126 100644 --- a/go.sum +++ b/go.sum @@ -10,10 +10,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE= github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/bloXroute-Labs/bloxroute-sdk-go v1.5.0 h1:ntvWgXSMYqNIct0RBp7mhdHRGXNQEENpncFP7ql6IDI= -github.com/bloXroute-Labs/bloxroute-sdk-go v1.5.0/go.mod h1:mHSTWuy3XgU+V2l1gFRjSKCe5zinZmnGZp6cM++6MO0= -github.com/bloXroute-Labs/gateway/v2 v2.129.2 h1:cMvcktKaAXqb5LzGaqOjKI5g+RDs4+vc2KoTjpDDICk= -github.com/bloXroute-Labs/gateway/v2 v2.129.2/go.mod h1:rd5kyusOE0Q+HeIGi8ii8BzzmVSdHRflLl5B0Ps2vZQ= +github.com/bloXroute-Labs/bloxroute-sdk-go v1.5.1 h1:EdeXQuXmxyjadtA/CPaYdyFXHjIRMBDfexQUozhFWRE= +github.com/bloXroute-Labs/bloxroute-sdk-go v1.5.1/go.mod h1:YkrHVnCJaJaic3hAXVrKy3v7derreK2LbFmUWphukjY= +github.com/bloXroute-Labs/gateway/v2 v2.129.19 h1:KSDIynNIAAFwIWvNj3OITQNTlEQZTKQS2QAZ3K6e4tA= +github.com/bloXroute-Labs/gateway/v2 v2.129.19/go.mod h1:rd5kyusOE0Q+HeIGi8ii8BzzmVSdHRflLl5B0Ps2vZQ= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurTXGPFfiQ= @@ -41,6 +41,8 @@ github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= +github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c h1:uQYC5Z1mdLRPrZhHjHxufI8+2UG/i25QG92j0Er9p6I= github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c/go.mod h1:geZJZH3SzKCqnz5VT0q/DyIG/tvu/dZk+VIfXicupJs= @@ -100,6 +102,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/logger/logger.go b/logger/logger.go index d06c000..976355f 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -1,6 +1,7 @@ package logger import ( + "fmt" "log/slog" "os" "strings" @@ -13,20 +14,54 @@ func InitLogger(level string) { log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stdout, parseLevel(strings.ToLower(level)), true))) } -func Debug(format string, v ...interface{}) { - log.Debug(format, v...) +func Debug(msg string, v ...interface{}) { + log.Debug(msg, v...) } -func Info(format string, v ...interface{}) { - log.Info(format, v...) +func Info(msg string, v ...interface{}) { + log.Info(msg, v...) } -func Warn(format string, v ...interface{}) { - log.Warn(format, v...) +func Warn(msg string, v ...interface{}) { + log.Warn(msg, v...) } -func Error(format string, v ...interface{}) { - log.Error(format, v...) +func Error(msg string, v ...interface{}) { + log.Error(msg, v...) +} + +type Instance struct{} + +func (l *Instance) Debug(args ...interface{}) { + Debug(fmt.Sprint(args...)) +} + +func (l *Instance) Debugf(msg string, args ...interface{}) { + Debug(fmt.Sprintf(msg, args...)) +} + +func (l *Instance) Info(args ...interface{}) { + Info(fmt.Sprint(args...)) +} + +func (l *Instance) Infof(msg string, args ...interface{}) { + Info(fmt.Sprintf(msg, args...)) +} + +func (l *Instance) Warn(args ...interface{}) { + Warn(fmt.Sprint(args...)) +} + +func (l *Instance) Warnf(msg string, args ...interface{}) { + Warn(fmt.Sprintf(msg, args...)) +} + +func (l *Instance) Error(args ...interface{}) { + Error(fmt.Sprint(args...)) +} + +func (l *Instance) Errorf(msg string, args ...interface{}) { + Error(fmt.Sprintf(msg, args...)) } func parseLevel(level string) slog.Level { diff --git a/relay/server/dapp.go b/relay/server/dapp.go index 62f59ba..7fec42a 100644 --- a/relay/server/dapp.go +++ b/relay/server/dapp.go @@ -1,6 +1,8 @@ package server import ( + "encoding/json" + "fmt" "net/http" "github.com/FastLane-Labs/atlas-sdk-go/types" @@ -11,24 +13,30 @@ func (s *Server) userOperation(w http.ResponseWriter, r *http.Request) { var req types.UserOperationWithHintsRaw err := parseRequest(r, &req) if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) + log.Error("failed to parse request", "error", err) + writeErrResponse(w, http.StatusBadRequest, fmt.Sprintf("invalid request: %v", err)) return } chainID, userOp, hints := req.Decode() partialOperation, err := types.NewUserOperationPartialRaw(chainID, userOp, hints) if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) + log.Error("failed to create user operation partial", "error", err) + writeErrResponse(w, http.StatusBadRequest, fmt.Sprintf("invalid user operation parameters: %v", err)) return } - intentID, err := s.intentService.SubmitIntent(r.Context(), partialOperation) + data, err := json.Marshal(partialOperation) + if err != nil { + log.Error("failed to marshal user operation partial", "error", err) + writeInternalErrResponse(w) + return + } + + intentID, err := s.intentService.SubmitIntent(r.Context(), data) if err != nil { log.Error("failed to submit intent", "error", err) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) + writeInternalErrResponse(w) return } @@ -41,16 +49,15 @@ func (s *Server) solverOperations(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() intentID := q.Get("intent_id") if intentID == "" { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("intent_id is required")) + log.Error("intent_id is required") + writeErrResponse(w, http.StatusBadRequest, "intent_id is required") return } resp, err := s.intentService.GetIntentSolutions(r.Context(), intentID) if err != nil { log.Error("failed to get intent solutions", "error", err) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) + writeInternalErrResponse(w) return } diff --git a/relay/server/model.go b/relay/server/model.go new file mode 100644 index 0000000..e63474a --- /dev/null +++ b/relay/server/model.go @@ -0,0 +1,9 @@ +package server + +type pingResponse struct { + Pong string `json:"pong"` +} + +type subscribeResponse struct { + SubscriptionID string `json:"subscription_id"` +} diff --git a/relay/server/router.go b/relay/server/router.go index 47f383d..556af07 100644 --- a/relay/server/router.go +++ b/relay/server/router.go @@ -65,7 +65,7 @@ func (s *Server) buildRoutes() []route { func (s *Server) dAppRoutes() []route { return []route{ { - name: "SubmitUserOperation", + name: "SubmitIntent", method: http.MethodPost, pattern: "/userOperation", handlerFunc: s.userOperation, diff --git a/relay/server/server.go b/relay/server/server.go index 9a6ac48..0b81e68 100644 --- a/relay/server/server.go +++ b/relay/server/server.go @@ -17,21 +17,30 @@ import ( // Server handler http calls type Server struct { - server *http.Server - cfg *config.Config - intentService *service.Intent + server *http.Server + cfg *config.Config + intentService *service.Intent + subscriptionService *service.SubscriptionManager } // NewServer creates and returns a new websocket server managed by feedManager func NewServer(ctx context.Context, cfg *config.Config) (*Server, error) { - intentService, err := service.NewIntent(ctx, cfg) + subsManager := service.NewSubscriptionManager() + intentService, err := service.NewIntent(ctx, cfg, subsManager) if err != nil { return nil, fmt.Errorf("failed to create intent service: %v", err) } + // subscribe to intents right away + err = intentService.SubscribeToIntents(ctx) + if err != nil { + return nil, fmt.Errorf("failed to subscribe to intents: %v", err) + } + return &Server{ - cfg: cfg, - intentService: intentService, + cfg: cfg, + intentService: intentService, + subscriptionService: subsManager, }, nil } @@ -76,13 +85,32 @@ func (s *Server) Shutdown() { if err != nil { logger.Error("failed to close intent service", "error", err) } + + s.subscriptionService.Close() } func writeResponseData(w http.ResponseWriter, data interface{}) { b, err := json.Marshal(data) if err != nil { logger.Error("failed to marshal response data", "error", err) + writeErrResponse(w, http.StatusInternalServerError, err.Error()) + } + + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + _, err = w.Write(b) + if err != nil { + logger.Error("failed to write response", "error", err) + } +} + +func writeErrResponse(w http.ResponseWriter, status int, errMessage string) { + resp := map[string]string{ + "error": errMessage, + } + b, err := json.Marshal(resp) + if err != nil { + logger.Error("failed to marshal error response data", "error", err) w.WriteHeader(http.StatusInternalServerError) _, err = w.Write([]byte(err.Error())) if err != nil { @@ -91,13 +119,17 @@ func writeResponseData(w http.ResponseWriter, data interface{}) { return } - w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.WriteHeader(status) _, err = w.Write(b) if err != nil { logger.Error("failed to write response", "error", err) } } +func writeInternalErrResponse(w http.ResponseWriter) { + writeErrResponse(w, http.StatusInternalServerError, "something went wrong, please try again later") +} + func parseRequest(r *http.Request, v interface{}) error { err := json.NewDecoder(r.Body).Decode(v) if err != nil { diff --git a/relay/server/solver.go b/relay/server/solver.go index 496d307..79346a2 100644 --- a/relay/server/solver.go +++ b/relay/server/solver.go @@ -4,6 +4,10 @@ import ( "net/http" "github.com/gorilla/websocket" + "github.com/sourcegraph/jsonrpc2" + jsonrpc2_ws "github.com/sourcegraph/jsonrpc2/websocket" + + "github.com/bloXroute-Labs/bdn-operations-relay/logger" ) const ( @@ -19,6 +23,21 @@ var upgrader = websocket.Upgrader{ }, } -func (s *Server) websocketSolver(w http.ResponseWriter, _ *http.Request) { +func (s *Server) websocketSolver(w http.ResponseWriter, r *http.Request) { + connection, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logger.Error("failed upgrading connection", "err", err) + writeInternalErrResponse(w) + + return + } + + h := &wsConnHandler{ + remoteAddress: r.RemoteAddr, + intentService: s.intentService, + subscriptionService: s.subscriptionService, + } + asyncHandler := jsonrpc2.AsyncHandler(h) + _ = jsonrpc2.NewConn(r.Context(), jsonrpc2_ws.NewObjectStream(connection), asyncHandler) } diff --git a/relay/server/solver_handler.go b/relay/server/solver_handler.go new file mode 100644 index 0000000..213d647 --- /dev/null +++ b/relay/server/solver_handler.go @@ -0,0 +1,169 @@ +package server + +import ( + "context" + "fmt" + "time" + + "github.com/sourcegraph/jsonrpc2" + "github.com/valyala/fastjson" + + "github.com/bloXroute-Labs/bdn-operations-relay/logger" + "github.com/bloXroute-Labs/bdn-operations-relay/relay/service" +) + +const ( + methodPing = "ping" + methodSubscribe = "subscribe" + methodUnsubscribe = "unsubscribe" + methodSubmitSolverOperation = "submitSolverOperation" + + microSecTimeFormat = "2006-01-02 15:04:05.000000" +) + +type wsConnHandler struct { + remoteAddress string + intentService *service.Intent + subscriptionService *service.SubscriptionManager +} + +func (h *wsConnHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { + method := req.Method + + switch method { + case methodPing: + response := pingResponse{ + Pong: time.Now().UTC().Format(microSecTimeFormat), + } + if err := conn.Reply(ctx, req.ID, response); err != nil { + logger.Error("error replying to client", "err", err, "reqID", req.ID, "caller", h.remoteAddress) + } + case methodSubscribe: + h.handleSubscribe(ctx, conn, req) + case methodUnsubscribe: + h.handleUnsubscribe(ctx, conn, req) + case methodSubmitSolverOperation: + h.handleSubmitIntentSolution(ctx, conn, req) + default: + h.sendErrorMsg(ctx, jsonrpc2.CodeMethodNotFound, "unsupported method name: "+method, conn, req.ID) + } +} + +// handleSubscribe handles the subscribe method +func (h *wsConnHandler) handleSubscribe(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { + if req.Params == nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, "params value is missing", conn, req.ID) + return + } + + var p fastjson.Parser + v, err := p.ParseBytes(*req.Params) + if err != nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, fmt.Sprintf("failed to parse params: %v", err), conn, req.ID) + return + } + + subscriptionType := v.GetStringBytes("subscription_type") + subscription, err := h.subscriptionService.Subscribe(h.remoteAddress, service.SubscriptionType(subscriptionType)) + if err != nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidRequest, fmt.Sprintf("failed to subscribe: %v", err), conn, req.ID) + return + } + + response := subscribeResponse{ + SubscriptionID: subscription.ID, + } + + if err = conn.Reply(ctx, req.ID, response); err != nil { + logger.Error("error replying to client", "err", err, "reqID", req.ID, "caller", h.remoteAddress) + return + } + + logger.Info("client subscribed", "subscription_type", string(subscriptionType), "caller", h.remoteAddress) + + h.handlerSubscriptionMessages(ctx, conn, subscription) +} + +// handleUnsubscribe handles the unsubscribe method +func (h *wsConnHandler) handleUnsubscribe(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { + if req.Params == nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, "params value is missing", conn, req.ID) + return + } + + var p fastjson.Parser + v, err := p.ParseBytes(*req.Params) + if err != nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, fmt.Sprintf("failed to parse params: %v", err), conn, req.ID) + return + } + + subscriptionID := v.GetStringBytes("subscription_id") + err = h.subscriptionService.Unsubscribe(h.remoteAddress, string(subscriptionID)) + if err != nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidRequest, fmt.Sprintf("failed to unsubscribe: %v", err), conn, req.ID) + return + } + + if err = conn.Reply(ctx, req.ID, "true"); err != nil { + logger.Error("error replying to client", "err", err, "reqID", req.ID, "caller", h.remoteAddress) + return + } + + logger.Info("client unsubscribed", "subscriptionID", string(subscriptionID), "caller", h.remoteAddress) +} + +// handleSubmitIntentSolution handles the submitSolverOperation method +func (h *wsConnHandler) handleSubmitIntentSolution(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { + if req.Params == nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, "params value is missing", conn, req.ID) + return + } + + var p fastjson.Parser + v, err := p.ParseBytes(*req.Params) + if err != nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, fmt.Sprintf("failed to parse params: %v", err), conn, req.ID) + return + } + + operationID := v.GetStringBytes("operation_id") + operation := v.GetObject("operation") + + err = h.intentService.SubmitIntentSolution(ctx, string(operationID), operation.MarshalTo(nil)) + if err != nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInternalError, fmt.Sprintf("failed to submit intent solution: %v", err), conn, req.ID) + } +} + +// sendErrorMsg formats and sends an RPC error message back to the client +func (h *wsConnHandler) sendErrorMsg(ctx context.Context, code int, message string, conn *jsonrpc2.Conn, reqID jsonrpc2.ID) { + rpcError := &jsonrpc2.Error{ + Code: int64(code), + Message: message, + } + + err := conn.ReplyWithError(ctx, reqID, rpcError) + if err != nil { + logger.Error("could not respond to client with error message", "err", err, "reqID", reqID, "caller", h.remoteAddress) + } +} + +func (h *wsConnHandler) handlerSubscriptionMessages(ctx context.Context, conn *jsonrpc2.Conn, subscription *service.Subscription) { + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-subscription.NotificationChannel: + if !ok { + return + } + + err := conn.Notify(ctx, "subscribe", msg) + if err != nil { + logger.Error("error replying to client", "err", err, "caller", h.remoteAddress) + return + } + } + } +} diff --git a/relay/service/intent.go b/relay/service/intent.go index 26f1455..a6e9b35 100644 --- a/relay/service/intent.go +++ b/relay/service/intent.go @@ -20,14 +20,16 @@ import ( // Intent is a service for interacting with the BDN intent network type Intent struct { - client *sdk.Client - cfg *config.Config + client *sdk.Client + cfg *config.Config + subscriptionManager *SubscriptionManager } // NewIntent creates a new Intent service -func NewIntent(ctx context.Context, cfg *config.Config) (*Intent, error) { +func NewIntent(ctx context.Context, cfg *config.Config, subscriptionManager *SubscriptionManager) (*Intent, error) { sdkConfig := &sdk.Config{ AuthHeader: cfg.BDN.AuthHeader, + Logger: new(logger.Instance), } if cfg.BDN.GRPCURL != "" { @@ -51,8 +53,9 @@ func NewIntent(ctx context.Context, cfg *config.Config) (*Intent, error) { } return &Intent{ - client: client, - cfg: cfg, + client: client, + cfg: cfg, + subscriptionManager: subscriptionManager, }, nil } @@ -62,12 +65,7 @@ func (i *Intent) Close() error { } // SubmitIntent submits an intent to the BDN -func (i *Intent) SubmitIntent(ctx context.Context, userOp *types.UserOperationPartialRaw) (string, error) { - intent, err := json.Marshal(userOp) - if err != nil { - return "", fmt.Errorf("failed to marshal user operation: %w", err) - } - +func (i *Intent) SubmitIntent(ctx context.Context, intent []byte) (string, error) { params := &sdk.SubmitIntentParams{ DappAddress: i.cfg.DAppAddress, SenderPrivateKey: i.cfg.DAppPrivateKey, @@ -82,18 +80,48 @@ func (i *Intent) SubmitIntent(ctx context.Context, userOp *types.UserOperationPa var p fastjson.Parser v, err := p.ParseBytes(*resp) if err != nil { - fmt.Println(err) + return "", fmt.Errorf("failed to parse message: %w", err) } return string(v.GetStringBytes("intent_id")), nil } +func (i *Intent) SubscribeToIntents(ctx context.Context) error { + params := &sdk.IntentsParams{ + SolverPrivateKey: i.cfg.SolverPrivateKey, + // TODO uncomment when the BDN supports filtering by DApp address + // DappAddress: i.cfg.DAppAddress, + } + + err := i.client.OnIntents(ctx, params, func(ctx context.Context, err error, result *sdk.OnIntentsNotification) { + if err != nil { + logger.Error("error receiving intent", "error", err) + return + } + logger.Debug("received intent", "dapp_address", result.DappAddress, "sender_address", result.SenderAddress, + "intent_id", result.IntentID) + + // TODO uncomment when the BDN supports filtering by DApp address + if result.DappAddress != i.cfg.DAppAddress { + logger.Debug("ignoring intent from different DApp address", "dapp_address", result.DappAddress) + return + } + + i.subscriptionManager.Notify(result) + }) + if err != nil { + return fmt.Errorf("failed to subscribe to intents: %w", err) + } + + return nil +} + // SubmitIntentSolution submits an intent solution to the BDN -func (i *Intent) SubmitIntentSolution(ctx context.Context, intentID string, intentSolution []byte) error { +func (i *Intent) SubmitIntentSolution(ctx context.Context, intentID string, intent []byte) error { params := &sdk.SubmitIntentSolutionParams{ SolverPrivateKey: i.cfg.SolverPrivateKey, IntentID: intentID, - IntentSolution: intentSolution, + IntentSolution: intent, } _, err := i.client.SubmitIntentSolution(ctx, params) diff --git a/relay/service/subscription.go b/relay/service/subscription.go new file mode 100644 index 0000000..f948ef9 --- /dev/null +++ b/relay/service/subscription.go @@ -0,0 +1,121 @@ +package service + +import ( + "fmt" + + sdk "github.com/bloXroute-Labs/bloxroute-sdk-go" + "github.com/cornelk/hashmap" + "github.com/google/uuid" + + "github.com/bloXroute-Labs/bdn-operations-relay/logger" +) + +const ( + notificationChannelSize = 10000 + + SubscriptionTypeIntent SubscriptionType = "intent" +) + +var ( + validSubscriptionTypes = map[SubscriptionType]struct{}{ + SubscriptionTypeIntent: {}, + } +) + +type Subscription struct { + ID string + NotificationChannel chan interface{} + Type SubscriptionType +} + +type SubscriptionType string + +type SubscriptionManager struct { + intentsSubscriptions *hashmap.Map[string, []Subscription] +} + +func NewSubscriptionManager() *SubscriptionManager { + return &SubscriptionManager{ + intentsSubscriptions: hashmap.New[string, []Subscription](), + } +} + +func (s *SubscriptionManager) Subscribe(remoteAddress string, subscriptionType SubscriptionType) (*Subscription, error) { + _, valid := validSubscriptionTypes[subscriptionType] + if !valid { + return nil, fmt.Errorf("invalid subscription type: %s", subscriptionType) + } + + subs, exists := s.intentsSubscriptions.Get(remoteAddress) + if exists { + for i := range subs { + if subs[i].Type == subscriptionType { + return nil, fmt.Errorf("subscription already exists for type: %s", subscriptionType) + } + } + } + + sub := Subscription{ + ID: uuid.New().String(), + NotificationChannel: make(chan interface{}, notificationChannelSize), + Type: subscriptionType, + } + subs = append(subs, sub) + s.intentsSubscriptions.Set(remoteAddress, subs) + + return &sub, nil +} + +func (s *SubscriptionManager) Unsubscribe(remoteAddress, subscriptionID string) error { + exists := false + subs, _ := s.intentsSubscriptions.Get(remoteAddress) + for i := range subs { + if subs[i].ID == subscriptionID { + close(subs[i].NotificationChannel) + subs = append(subs[:i], subs[i+1:]...) + exists = true + break + } + } + + if !exists { + return fmt.Errorf("subscription not found for id: %s", subscriptionID) + } + + s.intentsSubscriptions.Set(remoteAddress, subs) + + return nil +} + +func (s *SubscriptionManager) Notify(n interface{}) { + var subType SubscriptionType + + switch n.(type) { + case *sdk.OnIntentsNotification: + subType = SubscriptionTypeIntent + default: + return + } + + s.intentsSubscriptions.Range(func(key string, value []Subscription) bool { + for _, subscription := range value { + if subscription.Type == subType { + select { + case subscription.NotificationChannel <- n: + default: + logger.Warn("notification channel for subscription is full, dropping notification") + } + } + } + return true + }) +} + +func (s *SubscriptionManager) Close() { + s.intentsSubscriptions.Range(func(key string, value []Subscription) bool { + for _, subscription := range value { + close(subscription.NotificationChannel) + } + return true + }) +} From e4d0fc3b66aa4c781f3ab20c0e7ab907f821d1d8 Mon Sep 17 00:00:00 2001 From: Bogdan Prodan Date: Mon, 9 Sep 2024 13:47:23 +0300 Subject: [PATCH 2/4] Fixed subscription and solver operations --- relay/server/router.go | 2 +- relay/server/solver_handler.go | 28 ++++++++++++++++++---------- relay/service/intent.go | 20 +++++++++++++++++--- relay/service/subscription.go | 16 ++++++++++++---- 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/relay/server/router.go b/relay/server/router.go index 556af07..47f383d 100644 --- a/relay/server/router.go +++ b/relay/server/router.go @@ -65,7 +65,7 @@ func (s *Server) buildRoutes() []route { func (s *Server) dAppRoutes() []route { return []route{ { - name: "SubmitIntent", + name: "SubmitUserOperation", method: http.MethodPost, pattern: "/userOperation", handlerFunc: s.userOperation, diff --git a/relay/server/solver_handler.go b/relay/server/solver_handler.go index 213d647..6bf5a01 100644 --- a/relay/server/solver_handler.go +++ b/relay/server/solver_handler.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/log" "github.com/sourcegraph/jsonrpc2" "github.com/valyala/fastjson" @@ -43,7 +44,7 @@ func (h *wsConnHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *js case methodUnsubscribe: h.handleUnsubscribe(ctx, conn, req) case methodSubmitSolverOperation: - h.handleSubmitIntentSolution(ctx, conn, req) + h.handleSubmitSolverOperation(ctx, conn, req) default: h.sendErrorMsg(ctx, jsonrpc2.CodeMethodNotFound, "unsupported method name: "+method, conn, req.ID) } @@ -64,17 +65,21 @@ func (h *wsConnHandler) handleSubscribe(ctx context.Context, conn *jsonrpc2.Conn } subscriptionType := v.GetStringBytes("subscription_type") - subscription, err := h.subscriptionService.Subscribe(h.remoteAddress, service.SubscriptionType(subscriptionType)) + subscription, err := h.subscriptionService.Subscribe(h.remoteAddress, service.SubscriptionType(subscriptionType), conn) if err != nil { h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidRequest, fmt.Sprintf("failed to subscribe: %v", err), conn, req.ID) return } + defer func() { + _ = h.subscriptionService.Unsubscribe(h.remoteAddress, subscription.ID) + }() + response := subscribeResponse{ SubscriptionID: subscription.ID, } - if err = conn.Reply(ctx, req.ID, response); err != nil { + if err = conn.Notify(ctx, "subscribe", response); err != nil { logger.Error("error replying to client", "err", err, "reqID", req.ID, "caller", h.remoteAddress) return } @@ -113,8 +118,8 @@ func (h *wsConnHandler) handleUnsubscribe(ctx context.Context, conn *jsonrpc2.Co logger.Info("client unsubscribed", "subscriptionID", string(subscriptionID), "caller", h.remoteAddress) } -// handleSubmitIntentSolution handles the submitSolverOperation method -func (h *wsConnHandler) handleSubmitIntentSolution(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { +// handleSubmitSolverOperation handles the submitSolverOperation method +func (h *wsConnHandler) handleSubmitSolverOperation(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { if req.Params == nil { h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, "params value is missing", conn, req.ID) return @@ -127,12 +132,14 @@ func (h *wsConnHandler) handleSubmitIntentSolution(ctx context.Context, conn *js return } - operationID := v.GetStringBytes("operation_id") - operation := v.GetObject("operation") + intentID := v.GetStringBytes("intent_id") + intentSolution := v.GetObject("intent_solution") + + log.Debug("client submitted solver operation", "intent_id", string(intentID), "caller", h.remoteAddress) - err = h.intentService.SubmitIntentSolution(ctx, string(operationID), operation.MarshalTo(nil)) + err = h.intentService.SubmitIntentSolution(context.Background(), string(intentID), intentSolution.MarshalTo(nil)) if err != nil { - h.sendErrorMsg(ctx, jsonrpc2.CodeInternalError, fmt.Sprintf("failed to submit intent solution: %v", err), conn, req.ID) + h.sendErrorMsg(ctx, jsonrpc2.CodeInternalError, fmt.Sprintf("failed to submit solver opertaion: %v", err), conn, req.ID) } } @@ -152,7 +159,8 @@ func (h *wsConnHandler) sendErrorMsg(ctx context.Context, code int, message stri func (h *wsConnHandler) handlerSubscriptionMessages(ctx context.Context, conn *jsonrpc2.Conn, subscription *service.Subscription) { for { select { - case <-ctx.Done(): + case <-conn.DisconnectNotify(): + logger.Info("client disconnected", "caller", h.remoteAddress) return case msg, ok := <-subscription.NotificationChannel: if !ok { diff --git a/relay/service/intent.go b/relay/service/intent.go index a6e9b35..423d9b9 100644 --- a/relay/service/intent.go +++ b/relay/service/intent.go @@ -3,6 +3,7 @@ package service import ( "context" "crypto/tls" + "encoding/base64" "encoding/json" "fmt" "time" @@ -107,6 +108,12 @@ func (i *Intent) SubscribeToIntents(ctx context.Context) error { return } + rawIntent := make([]byte, base64.StdEncoding.DecodedLen(len(result.Intent))) + _, err = base64.StdEncoding.Decode(rawIntent, result.Intent) + if err == nil { + result.Intent = rawIntent + } + i.subscriptionManager.Notify(result) }) if err != nil { @@ -155,15 +162,22 @@ func (i *Intent) GetIntentSolutions(ctx context.Context, intentID string) ([]typ for _, obj := range v.GetArray() { intentSolution := obj.Get("intent_solution").GetStringBytes() - var solverOperation types.SolverOperationRaw - err = json.Unmarshal(intentSolution, &solverOperation) // TODO use var p fastjson.Parser + out := make([]byte, base64.StdEncoding.DecodedLen(len(intentSolution))) + n, err := base64.StdEncoding.Decode(out, intentSolution) + if err != nil { + logger.Error("failed to decode intent solution from base64", "error", err, "intent_solution", string(intentSolution)) + continue + } + + var solverOperation *types.SolverOperationRaw + err = json.Unmarshal(out[:n], &solverOperation) // TODO use var p fastjson.Parser if err != nil { logger.Error("failed to unmarshal intent solution into SolverOperationRaw", "error", err, "intent_solution", string(intentSolution)) continue } - result = append(result, solverOperation) + result = append(result, *solverOperation) } return result, nil diff --git a/relay/service/subscription.go b/relay/service/subscription.go index f948ef9..d81e6f0 100644 --- a/relay/service/subscription.go +++ b/relay/service/subscription.go @@ -6,6 +6,7 @@ import ( sdk "github.com/bloXroute-Labs/bloxroute-sdk-go" "github.com/cornelk/hashmap" "github.com/google/uuid" + "github.com/sourcegraph/jsonrpc2" "github.com/bloXroute-Labs/bdn-operations-relay/logger" ) @@ -20,12 +21,17 @@ var ( validSubscriptionTypes = map[SubscriptionType]struct{}{ SubscriptionTypeIntent: {}, } + + validSubscriptionTypeList = []SubscriptionType{ + SubscriptionTypeIntent, + } ) type Subscription struct { ID string NotificationChannel chan interface{} Type SubscriptionType + conn *jsonrpc2.Conn } type SubscriptionType string @@ -40,17 +46,17 @@ func NewSubscriptionManager() *SubscriptionManager { } } -func (s *SubscriptionManager) Subscribe(remoteAddress string, subscriptionType SubscriptionType) (*Subscription, error) { +func (s *SubscriptionManager) Subscribe(remoteAddress string, subscriptionType SubscriptionType, conn *jsonrpc2.Conn) (*Subscription, error) { _, valid := validSubscriptionTypes[subscriptionType] if !valid { - return nil, fmt.Errorf("invalid subscription type: %s", subscriptionType) + return nil, fmt.Errorf("invalid 'subscription_type' param: '%s', valid values are: %v", subscriptionType, validSubscriptionTypeList) } subs, exists := s.intentsSubscriptions.Get(remoteAddress) if exists { for i := range subs { if subs[i].Type == subscriptionType { - return nil, fmt.Errorf("subscription already exists for type: %s", subscriptionType) + return nil, fmt.Errorf("subscription already exists for type: %s, id: %s", subscriptionType, subs[i].ID) } } } @@ -59,6 +65,7 @@ func (s *SubscriptionManager) Subscribe(remoteAddress string, subscriptionType S ID: uuid.New().String(), NotificationChannel: make(chan interface{}, notificationChannelSize), Type: subscriptionType, + conn: conn, } subs = append(subs, sub) s.intentsSubscriptions.Set(remoteAddress, subs) @@ -71,6 +78,7 @@ func (s *SubscriptionManager) Unsubscribe(remoteAddress, subscriptionID string) subs, _ := s.intentsSubscriptions.Get(remoteAddress) for i := range subs { if subs[i].ID == subscriptionID { + fmt.Println("closing channel") close(subs[i].NotificationChannel) subs = append(subs[:i], subs[i+1:]...) exists = true @@ -114,7 +122,7 @@ func (s *SubscriptionManager) Notify(n interface{}) { func (s *SubscriptionManager) Close() { s.intentsSubscriptions.Range(func(key string, value []Subscription) bool { for _, subscription := range value { - close(subscription.NotificationChannel) + _ = subscription.conn.Close() } return true }) From 331d5863f09358931682f3a348ff9c7abd0f6a87 Mon Sep 17 00:00:00 2001 From: Bogdan Prodan Date: Tue, 10 Sep 2024 11:30:54 +0300 Subject: [PATCH 3/4] Added params checks --- go.mod | 2 +- go.sum | 4 ++-- relay/server/solver_handler.go | 25 +++++++++++++++++++++++++ relay/service/subscription.go | 1 - 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 61a435e..cc31699 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/go-playground/validator/v10 v10.19.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 - github.com/gorilla/websocket v1.5.1 + github.com/gorilla/websocket v1.5.3 github.com/sourcegraph/jsonrpc2 v0.2.0 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 diff --git a/go.sum b/go.sum index f412126..f0e6833 100644 --- a/go.sum +++ b/go.sum @@ -107,8 +107,8 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= -github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.1 h1:RoziI+96HlQWrbaVhgOOdFYUHtX81pwA6tCgDS9FNRo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.1/go.mod h1:Rj8lEaVgLiPn1jTMVXEhATiZhuyXJq167bMYPbJM1CY= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= diff --git a/relay/server/solver_handler.go b/relay/server/solver_handler.go index 6bf5a01..f4ceed4 100644 --- a/relay/server/solver_handler.go +++ b/relay/server/solver_handler.go @@ -22,6 +22,13 @@ const ( microSecTimeFormat = "2006-01-02 15:04:05.000000" ) +var ( + subscriptionTypeMissingErrMsg = "subscription_type value is missing" + subscriptionIDMissingErrMsg = "subscription_id value is missing" + intentIDMissingErrMsg = "intent_id value is missing" + intentSolutionMissingErrMsg = "intent_solution value is missing" +) + type wsConnHandler struct { remoteAddress string intentService *service.Intent @@ -65,6 +72,11 @@ func (h *wsConnHandler) handleSubscribe(ctx context.Context, conn *jsonrpc2.Conn } subscriptionType := v.GetStringBytes("subscription_type") + if len(subscriptionType) == 0 { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, subscriptionTypeMissingErrMsg, conn, req.ID) + return + } + subscription, err := h.subscriptionService.Subscribe(h.remoteAddress, service.SubscriptionType(subscriptionType), conn) if err != nil { h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidRequest, fmt.Sprintf("failed to subscribe: %v", err), conn, req.ID) @@ -104,6 +116,11 @@ func (h *wsConnHandler) handleUnsubscribe(ctx context.Context, conn *jsonrpc2.Co } subscriptionID := v.GetStringBytes("subscription_id") + if len(subscriptionID) == 0 { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, subscriptionIDMissingErrMsg, conn, req.ID) + return + } + err = h.subscriptionService.Unsubscribe(h.remoteAddress, string(subscriptionID)) if err != nil { h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidRequest, fmt.Sprintf("failed to unsubscribe: %v", err), conn, req.ID) @@ -133,7 +150,15 @@ func (h *wsConnHandler) handleSubmitSolverOperation(ctx context.Context, conn *j } intentID := v.GetStringBytes("intent_id") + if intentID == nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, intentIDMissingErrMsg, conn, req.ID) + return + } intentSolution := v.GetObject("intent_solution") + if intentSolution == nil { + h.sendErrorMsg(ctx, jsonrpc2.CodeInvalidParams, intentSolutionMissingErrMsg, conn, req.ID) + return + } log.Debug("client submitted solver operation", "intent_id", string(intentID), "caller", h.remoteAddress) diff --git a/relay/service/subscription.go b/relay/service/subscription.go index d81e6f0..9c82c58 100644 --- a/relay/service/subscription.go +++ b/relay/service/subscription.go @@ -78,7 +78,6 @@ func (s *SubscriptionManager) Unsubscribe(remoteAddress, subscriptionID string) subs, _ := s.intentsSubscriptions.Get(remoteAddress) for i := range subs { if subs[i].ID == subscriptionID { - fmt.Println("closing channel") close(subs[i].NotificationChannel) subs = append(subs[:i], subs[i+1:]...) exists = true From fe6a45c26ed9701013c6b5c7b6488e7a83bcf7bf Mon Sep 17 00:00:00 2001 From: Bogdan Prodan Date: Wed, 11 Sep 2024 09:56:25 +0300 Subject: [PATCH 4/4] Fixed context --- relay/server/solver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/relay/server/solver.go b/relay/server/solver.go index 79346a2..6242a49 100644 --- a/relay/server/solver.go +++ b/relay/server/solver.go @@ -1,6 +1,7 @@ package server import ( + "context" "net/http" "github.com/gorilla/websocket" @@ -39,5 +40,5 @@ func (s *Server) websocketSolver(w http.ResponseWriter, r *http.Request) { } asyncHandler := jsonrpc2.AsyncHandler(h) - _ = jsonrpc2.NewConn(r.Context(), jsonrpc2_ws.NewObjectStream(connection), asyncHandler) + _ = jsonrpc2.NewConn(context.Background(), jsonrpc2_ws.NewObjectStream(connection), asyncHandler) }