Skip to content

Commit

Permalink
[Functions] Rename a few secrets-specific objects to more generic nam…
Browse files Browse the repository at this point in the history
…es (#11340)

These structs/functions are generic enough to be used for non-secrets related requests.
  • Loading branch information
bolekk authored Nov 20, 2023
1 parent e42562c commit 7e0fa23
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 22 deletions.
2 changes: 1 addition & 1 deletion core/services/functions/connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, ga
case functions.MethodSecretsSet:
if balance, err := h.subscriptions.GetMaxUserBalance(fromAddr); err != nil || balance.Cmp(h.minimumBalance.ToInt()) < 0 {
h.lggr.Errorw("user subscription has insufficient balance", "id", gatewayId, "address", fromAddr, "balance", balance, "minBalance", h.minimumBalance)
response := functions.SecretsResponseBase{
response := functions.ResponseBase{
Success: false,
ErrorMessage: "user subscription has insufficient balance",
}
Expand Down
10 changes: 5 additions & 5 deletions core/services/gateway/handlers/functions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ type SecretsSetRequest struct {

// SecretsListRequest has empty payload

type SecretsResponseBase struct {
type ResponseBase struct {
Success bool `json:"success"`
ErrorMessage string `json:"error_message,omitempty"`
}

type SecretsSetResponse struct {
SecretsResponseBase
ResponseBase
}

type SecretsListResponse struct {
SecretsResponseBase
ResponseBase
Rows []SecretsListRow `json:"rows,omitempty"`
}

Expand All @@ -38,7 +38,7 @@ type SecretsListRow struct {
}

// Gateway -> User response, which combines responses from several nodes
type CombinedSecretsResponse struct {
SecretsResponseBase
type CombinedResponse struct {
ResponseBase
NodeResponses []*api.Message `json:"node_responses"`
}
28 changes: 14 additions & 14 deletions core/services/gateway/handlers/functions/handler.functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type functionsHandler struct {
handlerConfig FunctionsHandlerConfig
donConfig *config.DONConfig
don handlers.DON
pendingRequests hc.RequestCache[PendingSecretsRequest]
pendingRequests hc.RequestCache[PendingRequest]
allowlist OnchainAllowlist
subscriptions OnchainSubscriptions
minimumBalance *assets.Link
Expand All @@ -85,7 +85,7 @@ type functionsHandler struct {
lggr logger.Logger
}

type PendingSecretsRequest struct {
type PendingRequest struct {
request *api.Message
responses map[string]*api.Message
successful []*api.Message
Expand Down Expand Up @@ -136,15 +136,15 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con
return nil, err2
}
}
pendingRequestsCache := hc.NewRequestCache[PendingSecretsRequest](time.Millisecond*time.Duration(cfg.RequestTimeoutMillis), cfg.MaxPendingRequests)
pendingRequestsCache := hc.NewRequestCache[PendingRequest](time.Millisecond*time.Duration(cfg.RequestTimeoutMillis), cfg.MaxPendingRequests)
return NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, cfg.MinimumSubscriptionBalance, userRateLimiter, nodeRateLimiter, lggr), nil
}

func NewFunctionsHandler(
cfg FunctionsHandlerConfig,
donConfig *config.DONConfig,
don handlers.DON,
pendingRequestsCache hc.RequestCache[PendingSecretsRequest],
pendingRequestsCache hc.RequestCache[PendingRequest],
allowlist OnchainAllowlist,
subscriptions OnchainSubscriptions,
minimumBalance *assets.Link,
Expand Down Expand Up @@ -193,27 +193,27 @@ func (h *functionsHandler) HandleUserMessage(ctx context.Context, msg *api.Messa
}
switch msg.Body.Method {
case MethodSecretsSet, MethodSecretsList:
return h.handleSecretsRequest(ctx, msg, callbackCh)
return h.handleRequest(ctx, msg, callbackCh)
default:
h.lggr.Debugw("unsupported method", "method", msg.Body.Method)
promHandlerError.WithLabelValues(h.donConfig.DonId, ErrUnsupportedMethod.Error()).Inc()
return ErrUnsupportedMethod
}
}

func (h *functionsHandler) handleSecretsRequest(ctx context.Context, msg *api.Message, callbackCh chan<- handlers.UserCallbackPayload) error {
h.lggr.Debugw("handleSecretsRequest: processing message", "sender", msg.Body.Sender, "messageId", msg.Body.MessageId)
err := h.pendingRequests.NewRequest(msg, callbackCh, &PendingSecretsRequest{request: msg, responses: make(map[string]*api.Message)})
func (h *functionsHandler) handleRequest(ctx context.Context, msg *api.Message, callbackCh chan<- handlers.UserCallbackPayload) error {
h.lggr.Debugw("handleRequest: processing message", "sender", msg.Body.Sender, "messageId", msg.Body.MessageId)
err := h.pendingRequests.NewRequest(msg, callbackCh, &PendingRequest{request: msg, responses: make(map[string]*api.Message)})
if err != nil {
h.lggr.Warnw("handleSecretsRequest: error adding new request", "sender", msg.Body.Sender, "err", err)
h.lggr.Warnw("handleRequest: error adding new request", "sender", msg.Body.Sender, "err", err)
promHandlerError.WithLabelValues(h.donConfig.DonId, err.Error()).Inc()
return err
}
// Send to all nodes.
for _, member := range h.donConfig.Members {
err := h.don.SendToNode(ctx, member.Address, msg)
if err != nil {
h.lggr.Debugw("handleSecretsRequest: failed to send to a node", "node", member.Address, "err", err)
h.lggr.Debugw("handleRequest: failed to send to a node", "node", member.Address, "err", err)
}
}
return nil
Expand All @@ -234,16 +234,16 @@ func (h *functionsHandler) HandleNodeMessage(ctx context.Context, msg *api.Messa
}
}

// Conforms to ResponseProcessor[*PendingSecretsRequest]
func (h *functionsHandler) processSecretsResponse(response *api.Message, responseData *PendingSecretsRequest) (*handlers.UserCallbackPayload, *PendingSecretsRequest, error) {
// Conforms to ResponseProcessor[*PendingRequest]
func (h *functionsHandler) processSecretsResponse(response *api.Message, responseData *PendingRequest) (*handlers.UserCallbackPayload, *PendingRequest, error) {
if _, exists := responseData.responses[response.Body.Sender]; exists {
return nil, nil, errors.New("duplicate response")
}
if response.Body.Method != responseData.request.Body.Method {
return nil, responseData, errors.New("invalid method")
}
responseData.responses[response.Body.Sender] = response
var responsePayload SecretsResponseBase
var responsePayload ResponseBase
err := json.Unmarshal(response.Body.Payload, &responsePayload)
if err != nil {
responseData.errors = append(responseData.errors, response)
Expand All @@ -270,7 +270,7 @@ func (h *functionsHandler) processSecretsResponse(response *api.Message, respons
}

func newSecretsResponse(request *api.Message, success bool, responses []*api.Message) (*handlers.UserCallbackPayload, error) {
payload := CombinedSecretsResponse{SecretsResponseBase: SecretsResponseBase{Success: success}, NodeResponses: responses}
payload := CombinedResponse{ResponseBase: ResponseBase{Success: success}, NodeResponses: responses}
payloadJson, err := json.Marshal(payload)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newFunctionsHandlerForATestDON(t *testing.T, nodes []gc.TestNode, requestTi
require.NoError(t, err)
nodeRateLimiter, err := hc.NewRateLimiter(hc.RateLimiterConfig{GlobalRPS: 100.0, GlobalBurst: 100, PerSenderRPS: 100.0, PerSenderBurst: 100})
require.NoError(t, err)
pendingRequestsCache := hc.NewRequestCache[functions.PendingSecretsRequest](requestTimeout, 1000)
pendingRequestsCache := hc.NewRequestCache[functions.PendingRequest](requestTimeout, 1000)
handler := functions.NewFunctionsHandler(cfg, donConfig, don, pendingRequestsCache, allowlist, subscriptions, minBalance, userRateLimiter, nodeRateLimiter, logger.TestLogger(t))
return handler, don, allowlist, subscriptions
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestFunctionsHandler_HandleUserMessage_SecretsSet(t *testing.T) {
response := <-callbachCh
require.Equal(t, api.NoError, response.ErrCode)
require.Equal(t, userRequestMsg.Body.MessageId, response.Msg.Body.MessageId)
var payload functions.CombinedSecretsResponse
var payload functions.CombinedResponse
require.NoError(t, json.Unmarshal(response.Msg.Body.Payload, &payload))
require.Equal(t, test.expectedGatewayResult, payload.Success)
require.Equal(t, test.expectedNodeMessageCount, len(payload.NodeResponses))
Expand Down

0 comments on commit 7e0fa23

Please sign in to comment.