diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index 343980afdd5..76608b8ada3 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -84,7 +84,7 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga case functions.MethodSecretsSet: if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minimumBalance.ToInt()) < 0 { h.lggr.Errorw("user subscription has insufficient balance", "id", gatewayId, "address", fromAddr, "balance", balance, "minBalance", h.minimumBalance) - response := functions.SecretsResponseBase{ + response := functions.ResponseBase{ Success: false, ErrorMessage: "user subscription has insufficient balance", } diff --git a/core/services/gateway/handlers/functions/api.go b/core/services/gateway/handlers/functions/api.go index 979cdb939b6..202fa99e414 100644 --- a/core/services/gateway/handlers/functions/api.go +++ b/core/services/gateway/handlers/functions/api.go @@ -17,17 +17,17 @@ type SecretsSetRequest struct { // SecretsListRequest has empty payload -type SecretsResponseBase struct { +type ResponseBase struct { Success bool `json:"success"` ErrorMessage string `json:"error_message,omitempty"` } type SecretsSetResponse struct { - SecretsResponseBase + ResponseBase } type SecretsListResponse struct { - SecretsResponseBase + ResponseBase Rows []SecretsListRow `json:"rows,omitempty"` } @@ -38,7 +38,7 @@ type SecretsListRow struct { } // Gateway -> User response, which combines responses from several nodes -type CombinedSecretsResponse struct { - SecretsResponseBase +type CombinedResponse struct { + ResponseBase NodeResponses []*api.Message `json:"node_responses"` } diff --git a/core/services/gateway/handlers/functions/handler.functions.go b/core/services/gateway/handlers/functions/handler.functions.go index 32a132c075f..6cc4581a505 100644 --- a/core/services/gateway/handlers/functions/handler.functions.go +++ b/core/services/gateway/handlers/functions/handler.functions.go @@ -75,7 +75,7 @@ type functionsHandler struct { handlerConfig FunctionsHandlerConfig donConfig *config.DONConfig don handlers.DON - pendingRequests hc.RequestCache[PendingSecretsRequest] + pendingRequests hc.RequestCache[PendingRequest] allowlist OnchainAllowlist subscriptions OnchainSubscriptions minimumBalance *assets.Link @@ -85,7 +85,7 @@ type functionsHandler struct { lggr logger.Logger } -type PendingSecretsRequest struct { +type PendingRequest struct { request *api.Message responses map[string]*api.Message successful []*api.Message @@ -136,7 +136,7 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con return nil, err2 } } - pendingRequestsCache := hc.NewRequestCache[PendingSecretsRequest](time.Millisecond*time.Duration(cfg.RequestTimeoutMillis), cfg.MaxPendingRequests) + pendingRequestsCache := hc.NewRequestCache[PendingRequest](time.Millisecond*time.Duration(cfg.RequestTimeoutMillis), cfg.MaxPendingRequests) return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, cfg.MinimumSubscriptionBalance, userRateLimiter, nodeRateLimiter, lggr), nil } @@ -144,7 +144,7 @@ func NewFunctionsHandler( cfg FunctionsHandlerConfig, donConfig *config.DONConfig, don handlers.DON, - pendingRequestsCache hc.RequestCache[PendingSecretsRequest], + pendingRequestsCache hc.RequestCache[PendingRequest], allowlist OnchainAllowlist, subscriptions OnchainSubscriptions, minimumBalance *assets.Link, @@ -193,7 +193,7 @@ func (h *functionsHandler) HandleUserMessage(ctx context.Context, msg *api.Messa } switch msg.Body.Method { case MethodSecretsSet, MethodSecretsList: - return h.handleSecretsRequest(ctx, msg, callbackCh) + return h.handleRequest(ctx, msg, callbackCh) default: h.lggr.Debugw("unsupported method", "method", msg.Body.Method) promHandlerError.WithLabelValues(h.donConfig.DonId, ErrUnsupportedMethod.Error()).Inc() @@ -201,11 +201,11 @@ func (h *functionsHandler) HandleUserMessage(ctx context.Context, msg *api.Messa } } -func (h *functionsHandler) handleSecretsRequest(ctx context.Context, msg *api.Message, callbackCh chan<- handlers.UserCallbackPayload) error { - h.lggr.Debugw("handleSecretsRequest: processing message", "sender", msg.Body.Sender, "messageId", msg.Body.MessageId) - err := h.pendingRequests.NewRequest(msg, callbackCh, &PendingSecretsRequest{request: msg, responses: make(map[string]*api.Message)}) +func (h *functionsHandler) handleRequest(ctx context.Context, msg *api.Message, callbackCh chan<- handlers.UserCallbackPayload) error { + h.lggr.Debugw("handleRequest: processing message", "sender", msg.Body.Sender, "messageId", msg.Body.MessageId) + err := h.pendingRequests.NewRequest(msg, callbackCh, &PendingRequest{request: msg, responses: make(map[string]*api.Message)}) if err != nil { - h.lggr.Warnw("handleSecretsRequest: error adding new request", "sender", msg.Body.Sender, "err", err) + h.lggr.Warnw("handleRequest: error adding new request", "sender", msg.Body.Sender, "err", err) promHandlerError.WithLabelValues(h.donConfig.DonId, err.Error()).Inc() return err } @@ -213,7 +213,7 @@ func (h *functionsHandler) handleSecretsRequest(ctx context.Context, msg *api.Me for _, member := range h.donConfig.Members { err := h.don.SendToNode(ctx, member.Address, msg) if err != nil { - h.lggr.Debugw("handleSecretsRequest: failed to send to a node", "node", member.Address, "err", err) + h.lggr.Debugw("handleRequest: failed to send to a node", "node", member.Address, "err", err) } } return nil @@ -234,8 +234,8 @@ func (h *functionsHandler) HandleNodeMessage(ctx context.Context, msg *api.Messa } } -// Conforms to ResponseProcessor[*PendingSecretsRequest] -func (h *functionsHandler) processSecretsResponse(response *api.Message, responseData *PendingSecretsRequest) (*handlers.UserCallbackPayload, *PendingSecretsRequest, error) { +// Conforms to ResponseProcessor[*PendingRequest] +func (h *functionsHandler) processSecretsResponse(response *api.Message, responseData *PendingRequest) (*handlers.UserCallbackPayload, *PendingRequest, error) { if _, exists := responseData.responses[response.Body.Sender]; exists { return nil, nil, errors.New("duplicate response") } @@ -243,7 +243,7 @@ func (h *functionsHandler) processSecretsResponse(response *api.Message, respons return nil, responseData, errors.New("invalid method") } responseData.responses[response.Body.Sender] = response - var responsePayload SecretsResponseBase + var responsePayload ResponseBase err := json.Unmarshal(response.Body.Payload, &responsePayload) if err != nil { responseData.errors = append(responseData.errors, response) @@ -270,7 +270,7 @@ func (h *functionsHandler) processSecretsResponse(response *api.Message, respons } func newSecretsResponse(request *api.Message, success bool, responses []*api.Message) (*handlers.UserCallbackPayload, error) { - payload := CombinedSecretsResponse{SecretsResponseBase: SecretsResponseBase{Success: success}, NodeResponses: responses} + payload := CombinedResponse{ResponseBase: ResponseBase{Success: success}, NodeResponses: responses} payloadJson, err := json.Marshal(payload) if err != nil { return nil, err diff --git a/core/services/gateway/handlers/functions/handler.functions_test.go b/core/services/gateway/handlers/functions/handler.functions_test.go index 00334d30682..402823df173 100644 --- a/core/services/gateway/handlers/functions/handler.functions_test.go +++ b/core/services/gateway/handlers/functions/handler.functions_test.go @@ -47,7 +47,7 @@ func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTi require.NoError(t, err) nodeRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100}) require.NoError(t, err) - pendingRequestsCache := hc.NewRequestCache[functions.PendingSecretsRequest](requestTimeout, 1000) + pendingRequestsCache := hc.NewRequestCache[functions.PendingRequest](requestTimeout, 1000) handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minBalance, userRateLimiter, nodeRateLimiter, logger.TestLogger(t)) return handler, don, allowlist, subscriptions } @@ -128,7 +128,7 @@ func TestFunctionsHandler_HandleUserMessage_SecretsSet(t *testing.T) { response := <-callbachCh require.Equal(t, api.NoError, response.ErrCode) require.Equal(t, userRequestMsg.Body.MessageId, response.Msg.Body.MessageId) - var payload functions.CombinedSecretsResponse + var payload functions.CombinedResponse require.NoError(t, json.Unmarshal(response.Msg.Body.Payload, &payload)) require.Equal(t, test.expectedGatewayResult, payload.Success) require.Equal(t, test.expectedNodeMessageCount, len(payload.NodeResponses))