From fd780a991a73354e91b4db466dd8abbec1ececdf Mon Sep 17 00:00:00 2001 From: hiranya Date: Sun, 2 May 2021 16:24:43 +0530 Subject: [PATCH 01/11] Improving eventhub closing connection logic --- adapter/internal/messaging/connection.go | 25 ++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/adapter/internal/messaging/connection.go b/adapter/internal/messaging/connection.go index 7ac293dffc..6675284e2c 100644 --- a/adapter/internal/messaging/connection.go +++ b/adapter/internal/messaging/connection.go @@ -44,14 +44,35 @@ func connectToRabbitMQ(url string) (*amqp.Connection, error) { // is closed unexpectedly. func (c *Consumer) reconnect(key string) { var err error - conErr := <-c.conn.NotifyClose(make(chan *amqp.Error)) - if conErr != nil { + shouldReconnect := false + connClose := <-c.conn.NotifyClose(make(chan *amqp.Error)) + connBlocked := c.conn.NotifyBlocked(make(chan amqp.Blocking)) + chClose := c.channel.NotifyClose(make(chan *amqp.Error)) + + if connClose != nil { + shouldReconnect = true logger.LoggerMsg.Errorf("CRITICAL: Connection dropped for %s, reconnecting...", key) + } + + if connBlocked != nil { + shouldReconnect = true + logger.LoggerMsg.Errorf("CRITICAL: Connection blocked for %s, reconnecting...", key) + } + + if chClose != nil { + shouldReconnect = true + logger.LoggerMsg.Errorf("CRITICAL: Channel closed for %s, reconnecting...", key) + } + + if shouldReconnect { c.conn.Close() c, rabbitConn, err = connectionRetry(key) if err != nil { logger.LoggerMsg.Errorf("Cannot establish connection for topic %s", key) } + } else { + logger.LoggerMsg.Infof("NotifyClose from the connection and channel are %v and %v respectively, NotifyBlocked from the connection is %v", + connClose, connBlocked, chClose) } } From c8912eab78b95c9e02d687796fa05e57fc8b099c Mon Sep 17 00:00:00 2001 From: hiranya Date: Mon, 3 May 2021 11:27:59 +0530 Subject: [PATCH 02/11] Improving logs --- adapter/internal/messaging/connection.go | 29 ++++++++++--------- adapter/internal/messaging/km_listener.go | 17 ++++++----- .../messaging/notification_listener.go | 14 +++++---- .../messaging/revoked_token_listener.go | 7 ++++- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/adapter/internal/messaging/connection.go b/adapter/internal/messaging/connection.go index 6675284e2c..98219c74de 100644 --- a/adapter/internal/messaging/connection.go +++ b/adapter/internal/messaging/connection.go @@ -136,22 +136,23 @@ func retrieveAMQPURLList() []amqpFailoverURL { amqpConnectionURL := strings.Split(conURL, "?")[0] u, err := url.Parse(conURL) if err != nil { - panic(err) - } - m, _ := url.ParseQuery(u.RawQuery) - if m["connectdelay"] != nil { - connectdelay := m["connectdelay"][0] - delay, err = strconv.Atoi(connectdelay[1 : len(connectdelay)-1]) - } + logger.LoggerMsg.Errorf("Error occured %v", err) + } else { + m, _ := url.ParseQuery(u.RawQuery) + if m["connectdelay"] != nil { + connectdelay := m["connectdelay"][0] + delay, _ = strconv.Atoi(connectdelay[1 : len(connectdelay)-1]) + } - if m["retries"] != nil { - retrycount := m["retries"][0] - retries, err = strconv.Atoi(retrycount[1 : len(retrycount)-1]) - } + if m["retries"] != nil { + retrycount := m["retries"][0] + retries, _ = strconv.Atoi(retrycount[1 : len(retrycount)-1]) + } - failoverurlObj := amqpFailoverURL{url: amqpConnectionURL, retryCount: retries, - connectionDelay: delay} - amqlURLList = append(amqlURLList, failoverurlObj) + failoverurlObj := amqpFailoverURL{url: amqpConnectionURL, retryCount: retries, + connectionDelay: delay} + amqlURLList = append(amqlURLList, failoverurlObj) + } } return amqlURLList } diff --git a/adapter/internal/messaging/km_listener.go b/adapter/internal/messaging/km_listener.go index c0ba2dd38b..7c547799c3 100644 --- a/adapter/internal/messaging/km_listener.go +++ b/adapter/internal/messaging/km_listener.go @@ -48,11 +48,12 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { var notification EventKeyManagerNotification // var keyManagerConfig resourceTypes.KeymanagerConfig var kmConfigMap map[string]interface{} - - // var eventType string - json.Unmarshal([]byte(string(d.Body)), ¬ification) - - + unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) + if unmarshalErr != nil { + logger.LoggerMsg.Errorf("Error occured while unmarshalling event data %v", unmarshalErr) + return + } + logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType) for i := range xds.KeyManagerList { if strings.EqualFold(notification.Event.PayloadData.Name, xds.KeyManagerList[i].Name) { isFound = true @@ -62,11 +63,13 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { } var decodedByte, err = base64.StdEncoding.DecodeString(notification.Event.PayloadData.Value) + if err != nil { if _, ok := err.(base64.CorruptInputError); ok { - panic("\nbase64 input is corrupt, check the provided key") + logger.LoggerMsg.Error("\nbase64 input is corrupt, check the provided key") } - panic(err) + logger.LoggerMsg.Errorf("Error occured %v", err) + return } if strings.EqualFold(keyManagerConfigEvent, notification.Event.PayloadData.EventType) { diff --git a/adapter/internal/messaging/notification_listener.go b/adapter/internal/messaging/notification_listener.go index 25d5ba2b9b..63ed8420c4 100644 --- a/adapter/internal/messaging/notification_listener.go +++ b/adapter/internal/messaging/notification_listener.go @@ -72,18 +72,22 @@ func handleNotification(deliveries <-chan amqp.Delivery, done chan error) { for d := range deliveries { var notification EventNotification var eventType string - json.Unmarshal([]byte(string(d.Body)), ¬ification) + unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) + if unmarshalErr != nil { + logger.LoggerMsg.Errorf("Error occured while unmarshalling event data %v", unmarshalErr) + return + } + logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType) var decodedByte, err = base64.StdEncoding.DecodeString(notification.Event.PayloadData.Event) if err != nil { if _, ok := err.(base64.CorruptInputError); ok { - panic("\nbase64 input is corrupt, check the provided key") + logger.LoggerMsg.Error("\nbase64 input is corrupt, check the provided key") } - panic(err) + logger.LoggerMsg.Errorf("Error occured %v", err) + continue } logger.LoggerMsg.Debugf("\n\n[%s]", decodedByte) eventType = notification.Event.PayloadData.EventType - logger.LoggerMsg.Debugf("Event type : %s", eventType) - if strings.Contains(eventType, apiLifeCycleChange) { handleLifeCycleEvents(decodedByte) } else if strings.Contains(eventType, apiEventType) { diff --git a/adapter/internal/messaging/revoked_token_listener.go b/adapter/internal/messaging/revoked_token_listener.go index 4631c375df..f8283b3df7 100644 --- a/adapter/internal/messaging/revoked_token_listener.go +++ b/adapter/internal/messaging/revoked_token_listener.go @@ -32,7 +32,12 @@ func handleTokenRevocation(deliveries <-chan amqp.Delivery, done chan error) { for d := range deliveries { var notification EventTokenRevocationNotification - json.Unmarshal([]byte(string(d.Body)), ¬ification) + unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) + if unmarshalErr != nil { + logger.LoggerMsg.Errorf("Error occured while unmarshalling event data %v", unmarshalErr) + return + } + logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.Type) logger.LoggerMsg.Printf("RevokedToken: %s, Token Type: %s", notification.Event.PayloadData.RevokedToken, notification.Event.PayloadData.Type) var stokens []types.Resource From 360384df36aa5e2a5d9ef66f6d44b5b90b1edc60 Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Mon, 3 May 2021 16:52:48 +0530 Subject: [PATCH 03/11] Improve logs --- adapter/internal/adapter/adapter.go | 22 ++++---- .../enforcer_callbacks.go | 21 ++++---- adapter/internal/messaging/connection.go | 2 +- adapter/internal/messaging/km_listener.go | 11 ++-- .../messaging/notification_listener.go | 46 ++++++++++++---- .../messaging/revoked_token_listener.go | 2 +- .../synchronizer/keymanagers_fetcher.go | 6 ++- adapter/loggers/logger.go | 54 +++++++++---------- 8 files changed, 101 insertions(+), 63 deletions(-) rename adapter/internal/discovery/xds/{ => enforcercallbacks}/enforcer_callbacks.go (70%) diff --git a/adapter/internal/adapter/adapter.go b/adapter/internal/adapter/adapter.go index 4fad6fc22f..664c883812 100644 --- a/adapter/internal/adapter/adapter.go +++ b/adapter/internal/adapter/adapter.go @@ -32,6 +32,7 @@ import ( subscriptionservice "github.com/wso2/adapter/internal/discovery/api/wso2/discovery/service/subscription" throttleservice "github.com/wso2/adapter/internal/discovery/api/wso2/discovery/service/throtlle" wso2_server "github.com/wso2/adapter/internal/discovery/protocol/server/v3" + enforcerCallbacks "github.com/wso2/adapter/internal/discovery/xds/enforcercallbacks" routercb "github.com/wso2/adapter/internal/discovery/xds/routercallbacks" "github.com/wso2/adapter/internal/health" healthservice "github.com/wso2/adapter/internal/health/api/wso2/health/service" @@ -48,7 +49,6 @@ import ( "github.com/fsnotify/fsnotify" "github.com/wso2/adapter/config" "github.com/wso2/adapter/internal/discovery/xds" - cb "github.com/wso2/adapter/internal/discovery/xds" "github.com/wso2/adapter/internal/eventhub" "github.com/wso2/adapter/internal/messaging" "github.com/wso2/adapter/internal/synchronizer" @@ -180,16 +180,16 @@ func Run(conf *config.Config) { enforcerThrottleDataCache := xds.GetEnforcerThrottleDataCache() srv := xdsv3.NewServer(ctx, cache, &routercb.Callbacks{}) - enforcerXdsSrv := wso2_server.NewServer(ctx, enforcerCache, &cb.Callbacks{}) - enforcerSdsSrv := wso2_server.NewServer(ctx, enforcerSubscriptionCache, &cb.Callbacks{}) - enforcerAppDsSrv := wso2_server.NewServer(ctx, enforcerApplicationCache, &cb.Callbacks{}) - enforcerAPIDsSrv := wso2_server.NewServer(ctx, enforcerAPICache, &cb.Callbacks{}) - enforcerAppPolicyDsSrv := wso2_server.NewServer(ctx, enforcerApplicationPolicyCache, &cb.Callbacks{}) - enforcerSubPolicyDsSrv := wso2_server.NewServer(ctx, enforcerSubscriptionPolicyCache, &cb.Callbacks{}) - enforcerAppKeyMappingDsSrv := wso2_server.NewServer(ctx, enforcerApplicationKeyMappingCache, &cb.Callbacks{}) - enforcerKeyManagerDsSrv := wso2_server.NewServer(ctx, enforcerKeyManagerCache, &cb.Callbacks{}) - enforcerRevokedTokenDsSrv := wso2_server.NewServer(ctx, enforcerRevokedTokenCache, &cb.Callbacks{}) - enforcerThrottleDataDsSrv := wso2_server.NewServer(ctx, enforcerThrottleDataCache, &cb.Callbacks{}) + enforcerXdsSrv := wso2_server.NewServer(ctx, enforcerCache, &enforcerCallbacks.Callbacks{}) + enforcerSdsSrv := wso2_server.NewServer(ctx, enforcerSubscriptionCache, &enforcerCallbacks.Callbacks{}) + enforcerAppDsSrv := wso2_server.NewServer(ctx, enforcerApplicationCache, &enforcerCallbacks.Callbacks{}) + enforcerAPIDsSrv := wso2_server.NewServer(ctx, enforcerAPICache, &enforcerCallbacks.Callbacks{}) + enforcerAppPolicyDsSrv := wso2_server.NewServer(ctx, enforcerApplicationPolicyCache, &enforcerCallbacks.Callbacks{}) + enforcerSubPolicyDsSrv := wso2_server.NewServer(ctx, enforcerSubscriptionPolicyCache, &enforcerCallbacks.Callbacks{}) + enforcerAppKeyMappingDsSrv := wso2_server.NewServer(ctx, enforcerApplicationKeyMappingCache, &enforcerCallbacks.Callbacks{}) + enforcerKeyManagerDsSrv := wso2_server.NewServer(ctx, enforcerKeyManagerCache, &enforcerCallbacks.Callbacks{}) + enforcerRevokedTokenDsSrv := wso2_server.NewServer(ctx, enforcerRevokedTokenCache, &enforcerCallbacks.Callbacks{}) + enforcerThrottleDataDsSrv := wso2_server.NewServer(ctx, enforcerThrottleDataCache, &enforcerCallbacks.Callbacks{}) runManagementServer(conf, srv, enforcerXdsSrv, enforcerSdsSrv, enforcerAppDsSrv, enforcerAPIDsSrv, enforcerAppPolicyDsSrv, enforcerSubPolicyDsSrv, enforcerAppKeyMappingDsSrv, enforcerKeyManagerDsSrv, diff --git a/adapter/internal/discovery/xds/enforcer_callbacks.go b/adapter/internal/discovery/xds/enforcercallbacks/enforcer_callbacks.go similarity index 70% rename from adapter/internal/discovery/xds/enforcer_callbacks.go rename to adapter/internal/discovery/xds/enforcercallbacks/enforcer_callbacks.go index 13984b9fbc..22680c8745 100644 --- a/adapter/internal/discovery/xds/enforcer_callbacks.go +++ b/adapter/internal/discovery/xds/enforcercallbacks/enforcer_callbacks.go @@ -15,13 +15,14 @@ * */ -package xds +package enforcercallbacks import ( "context" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/wso2/adapter/internal/discovery/protocol/resource/v3" + xds "github.com/wso2/adapter/internal/discovery/xds" logger "github.com/wso2/adapter/loggers" ) @@ -34,24 +35,24 @@ func (cb *Callbacks) Report() {} // OnStreamOpen prints debug logs func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error { - logger.LoggerXdsCallbacks.Debugf("stream %d open for %s\n", id, typ) + logger.LoggerEnforcerXdsCallbacks.Debugf("stream %d open for %s\n", id, typ) return nil } // OnStreamClosed prints debug logs func (cb *Callbacks) OnStreamClosed(id int64) { - logger.LoggerXdsCallbacks.Debugf("stream %d closed\n", id) + logger.LoggerEnforcerXdsCallbacks.Debugf("stream %d closed\n", id) } // OnStreamRequest prints debug logs func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryRequest) error { - logger.LoggerXdsCallbacks.Debugf("stream request on stream id: %d, from node: %s, version: %s, for type: %s", + logger.LoggerEnforcerXdsCallbacks.Debugf("stream request on stream id: %d, from node: %s, version: %s, for type: %s", id, request.GetNode(), request.GetVersionInfo(), request.GetTypeUrl()) - requestEventChannel := GetRequestEventChannel() + requestEventChannel := xds.GetRequestEventChannel() if resource.APIType == request.GetTypeUrl() { - requestEvent := NewRequestEvent() + requestEvent := xds.NewRequestEvent() if request.ErrorDetail != nil { - logger.LoggerXdsCallbacks.Errorf("stream request on stream id: %d Error: %s", id, request.ErrorDetail.Message) + logger.LoggerEnforcerXdsCallbacks.Errorf("stream request on stream id: %d Error: %s", id, request.ErrorDetail.Message) requestEvent.IsError = true } requestEvent.Node = request.GetNode().GetId() @@ -63,17 +64,17 @@ func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryReque // OnStreamResponse prints debug logs func (cb *Callbacks) OnStreamResponse(id int64, request *discovery.DiscoveryRequest, response *discovery.DiscoveryResponse) { - logger.LoggerXdsCallbacks.Debugf("stream request on stream id: %d node: %s for type: %s version: %s", + logger.LoggerEnforcerXdsCallbacks.Debugf("stream request on stream id: %d node: %s for type: %s version: %s", id, request.GetNode(), request.GetTypeUrl(), response.GetVersionInfo()) } // OnFetchRequest prints debug logs func (cb *Callbacks) OnFetchRequest(_ context.Context, req *discovery.DiscoveryRequest) error { - logger.LoggerXdsCallbacks.Debugf("fetch request from node: %s, version: %s, for type: %s", req.Node.Id, req.VersionInfo, req.TypeUrl) + logger.LoggerEnforcerXdsCallbacks.Debugf("fetch request from node: %s, version: %s, for type: %s", req.Node.Id, req.VersionInfo, req.TypeUrl) return nil } // OnFetchResponse prints debug logs func (cb *Callbacks) OnFetchResponse(req *discovery.DiscoveryRequest, res *discovery.DiscoveryResponse) { - logger.LoggerXdsCallbacks.Debugf("fetch response to node: %s, version: %s, for type: %s", req.Node.Id, req.VersionInfo, res.TypeUrl) + logger.LoggerEnforcerXdsCallbacks.Debugf("fetch response to node: %s, version: %s, for type: %s", req.Node.Id, req.VersionInfo, res.TypeUrl) } diff --git a/adapter/internal/messaging/connection.go b/adapter/internal/messaging/connection.go index 98219c74de..8818dc19bc 100644 --- a/adapter/internal/messaging/connection.go +++ b/adapter/internal/messaging/connection.go @@ -72,7 +72,7 @@ func (c *Consumer) reconnect(key string) { } } else { logger.LoggerMsg.Infof("NotifyClose from the connection and channel are %v and %v respectively, NotifyBlocked from the connection is %v", - connClose, connBlocked, chClose) + connClose, chClose, connBlocked) } } diff --git a/adapter/internal/messaging/km_listener.go b/adapter/internal/messaging/km_listener.go index 7c547799c3..f82b782491 100644 --- a/adapter/internal/messaging/km_listener.go +++ b/adapter/internal/messaging/km_listener.go @@ -50,7 +50,7 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { var kmConfigMap map[string]interface{} unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) if unmarshalErr != nil { - logger.LoggerMsg.Errorf("Error occured while unmarshalling event data %v", unmarshalErr) + logger.LoggerMsg.Errorf("Error occurred while unmarshalling key manager event data %v", unmarshalErr.Error()) return } logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType) @@ -68,7 +68,8 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { if _, ok := err.(base64.CorruptInputError); ok { logger.LoggerMsg.Error("\nbase64 input is corrupt, check the provided key") } - logger.LoggerMsg.Errorf("Error occured %v", err) + + logger.LoggerMsg.Errorf("Error occurred while decoding the notification event %v", err) return } @@ -83,7 +84,11 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { xds.GenerateAndUpdateKeyManagerList() } else if decodedByte != nil { logger.LoggerMsg.Infof("decoded stream %s", string(decodedByte)) - json.Unmarshal([]byte(string(decodedByte)), &kmConfigMap) + err := json.Unmarshal([]byte(string(decodedByte)), &kmConfigMap) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling key manager config map %v", err) + return + } if strings.EqualFold(actionAdd, notification.Event.PayloadData.Action) || strings.EqualFold(actionUpdate, notification.Event.PayloadData.Action) { diff --git a/adapter/internal/messaging/notification_listener.go b/adapter/internal/messaging/notification_listener.go index 63ed8420c4..3d30032b69 100644 --- a/adapter/internal/messaging/notification_listener.go +++ b/adapter/internal/messaging/notification_listener.go @@ -83,7 +83,8 @@ func handleNotification(deliveries <-chan amqp.Delivery, done chan error) { if _, ok := err.(base64.CorruptInputError); ok { logger.LoggerMsg.Error("\nbase64 input is corrupt, check the provided key") } - logger.LoggerMsg.Errorf("Error occured %v", err) + // + logger.LoggerMsg.Errorf("Error occurred while decoding the notification event %v", err) continue } logger.LoggerMsg.Debugf("\n\n[%s]", decodedByte) @@ -112,7 +113,11 @@ func handleAPIEvents(data []byte, eventType string) { currentTimeStamp int64 = apiEvent.Event.TimeStamp ) - json.Unmarshal([]byte(string(data)), &apiEvent) + err := json.Unmarshal([]byte(string(data)), &apiEvent) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling API event data %v", err) + return + } if !belongsToTenant(apiEvent.TenantDomain) { apiName := apiEvent.APIName if apiEvent.APIName == "" { @@ -183,7 +188,11 @@ func handleAPIEvents(data []byte, eventType string) { func handleLifeCycleEvents(data []byte) { var apiEvent APIEvent - json.Unmarshal([]byte(string(data)), &apiEvent) + err := json.Unmarshal([]byte(string(data)), &apiEvent) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Lifecycle event data %v", err) + return + } if !belongsToTenant(apiEvent.TenantDomain) { logger.LoggerMsg.Debugf("API Lifecycle event for the API %s:%s is dropped due to having non related tenantDomain : %s", apiEvent.APIName, apiEvent.APIVersion, apiEvent.TenantDomain) @@ -224,7 +233,11 @@ func handleApplicationEvents(data []byte, eventType string) { if strings.EqualFold(applicationRegistration, eventType) || strings.EqualFold(removeApplicationKeyMapping, eventType) { var applicationRegistrationEvent ApplicationRegistrationEvent - json.Unmarshal([]byte(string(data)), &applicationRegistrationEvent) + err := json.Unmarshal([]byte(string(data)), &applicationRegistrationEvent) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Application Registration event data %v", err) + return + } if !belongsToTenant(applicationRegistrationEvent.TenantDomain) { logger.LoggerMsg.Debugf("Application Registration event for the Consumer Key : %s is dropped due to having non related tenantDomain : %s", @@ -246,7 +259,11 @@ func handleApplicationEvents(data []byte, eventType string) { xds.UpdateEnforcerApplicationKeyMappings(xds.MarshalKeyMappingList(eh.AppKeyMappingList)) } else { var applicationEvent ApplicationEvent - json.Unmarshal([]byte(string(data)), &applicationEvent) + err := json.Unmarshal([]byte(string(data)), &applicationEvent) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Application event data %v", err) + return + } if !belongsToTenant(applicationEvent.TenantDomain) { logger.LoggerMsg.Debugf("Application event for the Application : %s (with uuid %s) is dropped due to having non related tenantDomain : %s", @@ -281,7 +298,11 @@ func handleApplicationEvents(data []byte, eventType string) { // handleSubscriptionRelatedEvents to process subscription related events func handleSubscriptionEvents(data []byte, eventType string) { var subscriptionEvent SubscriptionEvent - json.Unmarshal([]byte(string(data)), &subscriptionEvent) + err := json.Unmarshal([]byte(string(data)), &subscriptionEvent) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription event data %v", err) + return + } if !belongsToTenant(subscriptionEvent.TenantDomain) { logger.LoggerMsg.Debugf("Subscription event for the Application : %s and API %s is dropped due to having non related tenantDomain : %s", subscriptionEvent.ApplicationUUID, subscriptionEvent.APIUUID, subscriptionEvent.TenantDomain) @@ -311,8 +332,11 @@ func handleSubscriptionEvents(data []byte, eventType string) { // handlePolicyRelatedEvents to process policy related events func handlePolicyEvents(data []byte, eventType string) { var policyEvent PolicyInfo - json.Unmarshal([]byte(string(data)), &policyEvent) - + err := json.Unmarshal([]byte(string(data)), &policyEvent) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Throttling Policy event data %v", err) + return + } // TODO: Handle policy events if strings.EqualFold(eventType, policyCreate) { logger.LoggerMsg.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType) @@ -343,7 +367,11 @@ func handlePolicyEvents(data []byte, eventType string) { } else if strings.EqualFold(subscriptionEventType, policyEvent.PolicyType) { var subscriptionPolicyEvent SubscriptionPolicyEvent - json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent) + subPolicyErr := json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription Policy event data %v", subPolicyErr) + return + } subscriptionPolicy := types.SubscriptionPolicy{ID: subscriptionPolicyEvent.PolicyID, TenantID: -1, Name: subscriptionPolicyEvent.PolicyName, QuotaType: subscriptionPolicyEvent.QuotaType, diff --git a/adapter/internal/messaging/revoked_token_listener.go b/adapter/internal/messaging/revoked_token_listener.go index f8283b3df7..a6dcaef179 100644 --- a/adapter/internal/messaging/revoked_token_listener.go +++ b/adapter/internal/messaging/revoked_token_listener.go @@ -34,7 +34,7 @@ func handleTokenRevocation(deliveries <-chan amqp.Delivery, done chan error) { var notification EventTokenRevocationNotification unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) if unmarshalErr != nil { - logger.LoggerMsg.Errorf("Error occured while unmarshalling event data %v", unmarshalErr) + logger.LoggerMsg.Errorf("Error occurred while unmarshalling revoked token event data %v", unmarshalErr) return } logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.Type) diff --git a/adapter/internal/synchronizer/keymanagers_fetcher.go b/adapter/internal/synchronizer/keymanagers_fetcher.go index 8b68274243..2b6871ae10 100644 --- a/adapter/internal/synchronizer/keymanagers_fetcher.go +++ b/adapter/internal/synchronizer/keymanagers_fetcher.go @@ -120,7 +120,11 @@ func FetchKeyManagersOnStartUp(conf *config.Config) { if resp.StatusCode == http.StatusOK { var keyManagers []eventhubTypes.KeyManager - json.Unmarshal(responseBytes, &keyManagers) + err := json.Unmarshal(responseBytes, &keyManagers) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshelling Key Managers event data %v", err) + return + } for _, kmConfig := range keyManagers { xds.KeyManagerList = append(xds.KeyManagerList, kmConfig) diff --git a/adapter/loggers/logger.go b/adapter/loggers/logger.go index 1ef4a6a014..4c030af83e 100644 --- a/adapter/loggers/logger.go +++ b/adapter/loggers/logger.go @@ -32,36 +32,36 @@ When you add a new logger instance add the related package name as a constant // package name constants const ( - pkgAPI = "github.com/wso2/adapter/internal/api" - pkgAuth = "github.com/wso2/adapter/internal/auth" - pkgMgw = "github.com/wso2/adapter/internal/adapter" - pkgOasparser = "github.com/wso2/adapter/internal/oasparser" - pkgXds = "github.com/wso2/adapter/internal/discovery/xds" - pkgSync = "github.com/wso2/adapter/internal/synchronizer" - pkgMsg = "github.com/wso2/adapter/internal/messaging" - pkgSvcDiscovery = "github.com/wso2/adapter/internal/svcDiscovery" - pkgTLSUtils = "github.com/wso2/adapter/internal/tlsutils" - pkgSubscription = "github.com/wso2/adapter/internal/subscription" - pkgXdsCallbacks = "github.com/wso2/adapter/internal/discovery/xds/enforcercallbacks" - pkgHealth = "github.com/wso2/adapter/internal/health" - pkgRouterXdsCallbacks = "github.com/wso2/adapter/internal/discovery/xds/routercallbacks" + pkgAPI = "github.com/wso2/adapter/internal/api" + pkgAuth = "github.com/wso2/adapter/internal/auth" + pkgMgw = "github.com/wso2/adapter/internal/adapter" + pkgOasparser = "github.com/wso2/adapter/internal/oasparser" + pkgXds = "github.com/wso2/adapter/internal/discovery/xds" + pkgSync = "github.com/wso2/adapter/internal/synchronizer" + pkgMsg = "github.com/wso2/adapter/internal/messaging" + pkgSvcDiscovery = "github.com/wso2/adapter/internal/svcDiscovery" + pkgTLSUtils = "github.com/wso2/adapter/internal/tlsutils" + pkgSubscription = "github.com/wso2/adapter/internal/subscription" + pkgHealth = "github.com/wso2/adapter/internal/health" + pkgRouterXdsCallbacks = "github.com/wso2/adapter/internal/discovery/xds/routercallbacks" + pkgEnforcerXdsCallbacks = "github.com/wso2/adapter/internal/discovery/xds/enforcercallbacks" ) // logger package references var ( - LoggerAPI *logrus.Logger - LoggerAuth *logrus.Logger - LoggerMgw *logrus.Logger - LoggerOasparser *logrus.Logger - LoggerXds *logrus.Logger - LoggerSync *logrus.Logger - LoggerMsg *logrus.Logger - LoggerSvcDiscovery *logrus.Logger - LoggerTLSUtils *logrus.Logger - LoggerSubscription *logrus.Logger - LoggerXdsCallbacks *logrus.Logger - LoggerHealth *logrus.Logger - LoggerRouterXdsCallbacks *logrus.Logger + LoggerAPI *logrus.Logger + LoggerAuth *logrus.Logger + LoggerMgw *logrus.Logger + LoggerOasparser *logrus.Logger + LoggerXds *logrus.Logger + LoggerSync *logrus.Logger + LoggerMsg *logrus.Logger + LoggerSvcDiscovery *logrus.Logger + LoggerTLSUtils *logrus.Logger + LoggerSubscription *logrus.Logger + LoggerHealth *logrus.Logger + LoggerRouterXdsCallbacks *logrus.Logger + LoggerEnforcerXdsCallbacks *logrus.Logger ) func init() { @@ -81,8 +81,8 @@ func UpdateLoggers() { LoggerSvcDiscovery = logging.InitPackageLogger(pkgSvcDiscovery) LoggerTLSUtils = logging.InitPackageLogger(pkgTLSUtils) LoggerSubscription = logging.InitPackageLogger(pkgSubscription) - LoggerXdsCallbacks = logging.InitPackageLogger(pkgXdsCallbacks) LoggerHealth = logging.InitPackageLogger(pkgHealth) LoggerRouterXdsCallbacks = logging.InitPackageLogger(pkgRouterXdsCallbacks) + LoggerEnforcerXdsCallbacks = logging.InitPackageLogger(pkgEnforcerXdsCallbacks) logrus.Info("Updated loggers") } From 50ec3eee2943613a5c125351b2dabc737c0c1820 Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Mon, 3 May 2021 17:14:21 +0530 Subject: [PATCH 04/11] Update adapter/internal/discovery/xds/enforcercallbacks/enforcer_callbacks.go Co-authored-by: Rajith Roshan --- .../discovery/xds/enforcercallbacks/enforcer_callbacks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/internal/discovery/xds/enforcercallbacks/enforcer_callbacks.go b/adapter/internal/discovery/xds/enforcercallbacks/enforcer_callbacks.go index 22680c8745..a5cb40b9b3 100644 --- a/adapter/internal/discovery/xds/enforcercallbacks/enforcer_callbacks.go +++ b/adapter/internal/discovery/xds/enforcercallbacks/enforcer_callbacks.go @@ -64,7 +64,7 @@ func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryReque // OnStreamResponse prints debug logs func (cb *Callbacks) OnStreamResponse(id int64, request *discovery.DiscoveryRequest, response *discovery.DiscoveryResponse) { - logger.LoggerEnforcerXdsCallbacks.Debugf("stream request on stream id: %d node: %s for type: %s version: %s", + logger.LoggerEnforcerXdsCallbacks.Debugf("stream response on stream id: %d node: %s for type: %s version: %s", id, request.GetNode(), request.GetTypeUrl(), response.GetVersionInfo()) } From b2a0b18b48e0c58e087449ca86cec1dc9e67c8b7 Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Mon, 3 May 2021 17:18:47 +0530 Subject: [PATCH 05/11] Fix error handling in loops. (review suggestions) --- adapter/internal/messaging/notification_listener.go | 2 +- adapter/internal/messaging/revoked_token_listener.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/adapter/internal/messaging/notification_listener.go b/adapter/internal/messaging/notification_listener.go index 3d30032b69..6a70c31def 100644 --- a/adapter/internal/messaging/notification_listener.go +++ b/adapter/internal/messaging/notification_listener.go @@ -75,7 +75,7 @@ func handleNotification(deliveries <-chan amqp.Delivery, done chan error) { unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) if unmarshalErr != nil { logger.LoggerMsg.Errorf("Error occured while unmarshalling event data %v", unmarshalErr) - return + continue } logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType) var decodedByte, err = base64.StdEncoding.DecodeString(notification.Event.PayloadData.Event) diff --git a/adapter/internal/messaging/revoked_token_listener.go b/adapter/internal/messaging/revoked_token_listener.go index a6dcaef179..9020acd8af 100644 --- a/adapter/internal/messaging/revoked_token_listener.go +++ b/adapter/internal/messaging/revoked_token_listener.go @@ -35,7 +35,7 @@ func handleTokenRevocation(deliveries <-chan amqp.Delivery, done chan error) { unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) if unmarshalErr != nil { logger.LoggerMsg.Errorf("Error occurred while unmarshalling revoked token event data %v", unmarshalErr) - return + continue } logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.Type) logger.LoggerMsg.Printf("RevokedToken: %s, Token Type: %s", notification.Event.PayloadData.RevokedToken, From 5f9209368211dae5a03b4667b16003463cb52124 Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Tue, 4 May 2021 02:26:33 +0530 Subject: [PATCH 06/11] Revert handling marshalling errors due to test failure --- adapter/internal/messaging/km_listener.go | 12 +---- .../messaging/notification_listener.go | 50 ++++--------------- .../messaging/revoked_token_listener.go | 6 +-- .../synchronizer/keymanagers_fetcher.go | 6 +-- 4 files changed, 13 insertions(+), 61 deletions(-) diff --git a/adapter/internal/messaging/km_listener.go b/adapter/internal/messaging/km_listener.go index f82b782491..7e667ec10a 100644 --- a/adapter/internal/messaging/km_listener.go +++ b/adapter/internal/messaging/km_listener.go @@ -48,11 +48,7 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { var notification EventKeyManagerNotification // var keyManagerConfig resourceTypes.KeymanagerConfig var kmConfigMap map[string]interface{} - unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) - if unmarshalErr != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling key manager event data %v", unmarshalErr.Error()) - return - } + json.Unmarshal([]byte(string(d.Body)), ¬ification) logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType) for i := range xds.KeyManagerList { if strings.EqualFold(notification.Event.PayloadData.Name, xds.KeyManagerList[i].Name) { @@ -84,11 +80,7 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { xds.GenerateAndUpdateKeyManagerList() } else if decodedByte != nil { logger.LoggerMsg.Infof("decoded stream %s", string(decodedByte)) - err := json.Unmarshal([]byte(string(decodedByte)), &kmConfigMap) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling key manager config map %v", err) - return - } + json.Unmarshal([]byte(string(decodedByte)), &kmConfigMap) if strings.EqualFold(actionAdd, notification.Event.PayloadData.Action) || strings.EqualFold(actionUpdate, notification.Event.PayloadData.Action) { diff --git a/adapter/internal/messaging/notification_listener.go b/adapter/internal/messaging/notification_listener.go index 6a70c31def..0999882327 100644 --- a/adapter/internal/messaging/notification_listener.go +++ b/adapter/internal/messaging/notification_listener.go @@ -72,18 +72,13 @@ func handleNotification(deliveries <-chan amqp.Delivery, done chan error) { for d := range deliveries { var notification EventNotification var eventType string - unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) - if unmarshalErr != nil { - logger.LoggerMsg.Errorf("Error occured while unmarshalling event data %v", unmarshalErr) - continue - } + json.Unmarshal([]byte(string(d.Body)), ¬ification) logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType) var decodedByte, err = base64.StdEncoding.DecodeString(notification.Event.PayloadData.Event) if err != nil { if _, ok := err.(base64.CorruptInputError); ok { logger.LoggerMsg.Error("\nbase64 input is corrupt, check the provided key") } - // logger.LoggerMsg.Errorf("Error occurred while decoding the notification event %v", err) continue } @@ -113,11 +108,7 @@ func handleAPIEvents(data []byte, eventType string) { currentTimeStamp int64 = apiEvent.Event.TimeStamp ) - err := json.Unmarshal([]byte(string(data)), &apiEvent) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling API event data %v", err) - return - } + json.Unmarshal([]byte(string(data)), &apiEvent) if !belongsToTenant(apiEvent.TenantDomain) { apiName := apiEvent.APIName if apiEvent.APIName == "" { @@ -188,11 +179,7 @@ func handleAPIEvents(data []byte, eventType string) { func handleLifeCycleEvents(data []byte) { var apiEvent APIEvent - err := json.Unmarshal([]byte(string(data)), &apiEvent) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling Lifecycle event data %v", err) - return - } + json.Unmarshal([]byte(string(data)), &apiEvent) if !belongsToTenant(apiEvent.TenantDomain) { logger.LoggerMsg.Debugf("API Lifecycle event for the API %s:%s is dropped due to having non related tenantDomain : %s", apiEvent.APIName, apiEvent.APIVersion, apiEvent.TenantDomain) @@ -233,11 +220,7 @@ func handleApplicationEvents(data []byte, eventType string) { if strings.EqualFold(applicationRegistration, eventType) || strings.EqualFold(removeApplicationKeyMapping, eventType) { var applicationRegistrationEvent ApplicationRegistrationEvent - err := json.Unmarshal([]byte(string(data)), &applicationRegistrationEvent) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling Application Registration event data %v", err) - return - } + json.Unmarshal([]byte(string(data)), &applicationRegistrationEvent) if !belongsToTenant(applicationRegistrationEvent.TenantDomain) { logger.LoggerMsg.Debugf("Application Registration event for the Consumer Key : %s is dropped due to having non related tenantDomain : %s", @@ -259,11 +242,7 @@ func handleApplicationEvents(data []byte, eventType string) { xds.UpdateEnforcerApplicationKeyMappings(xds.MarshalKeyMappingList(eh.AppKeyMappingList)) } else { var applicationEvent ApplicationEvent - err := json.Unmarshal([]byte(string(data)), &applicationEvent) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling Application event data %v", err) - return - } + json.Unmarshal([]byte(string(data)), &applicationEvent) if !belongsToTenant(applicationEvent.TenantDomain) { logger.LoggerMsg.Debugf("Application event for the Application : %s (with uuid %s) is dropped due to having non related tenantDomain : %s", @@ -298,11 +277,7 @@ func handleApplicationEvents(data []byte, eventType string) { // handleSubscriptionRelatedEvents to process subscription related events func handleSubscriptionEvents(data []byte, eventType string) { var subscriptionEvent SubscriptionEvent - err := json.Unmarshal([]byte(string(data)), &subscriptionEvent) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription event data %v", err) - return - } + json.Unmarshal([]byte(string(data)), &subscriptionEvent) if !belongsToTenant(subscriptionEvent.TenantDomain) { logger.LoggerMsg.Debugf("Subscription event for the Application : %s and API %s is dropped due to having non related tenantDomain : %s", subscriptionEvent.ApplicationUUID, subscriptionEvent.APIUUID, subscriptionEvent.TenantDomain) @@ -332,11 +307,8 @@ func handleSubscriptionEvents(data []byte, eventType string) { // handlePolicyRelatedEvents to process policy related events func handlePolicyEvents(data []byte, eventType string) { var policyEvent PolicyInfo - err := json.Unmarshal([]byte(string(data)), &policyEvent) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling Throttling Policy event data %v", err) - return - } + json.Unmarshal([]byte(string(data)), &policyEvent) + // TODO: Handle policy events if strings.EqualFold(eventType, policyCreate) { logger.LoggerMsg.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType) @@ -367,11 +339,7 @@ func handlePolicyEvents(data []byte, eventType string) { } else if strings.EqualFold(subscriptionEventType, policyEvent.PolicyType) { var subscriptionPolicyEvent SubscriptionPolicyEvent - subPolicyErr := json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription Policy event data %v", subPolicyErr) - return - } + json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent) subscriptionPolicy := types.SubscriptionPolicy{ID: subscriptionPolicyEvent.PolicyID, TenantID: -1, Name: subscriptionPolicyEvent.PolicyName, QuotaType: subscriptionPolicyEvent.QuotaType, diff --git a/adapter/internal/messaging/revoked_token_listener.go b/adapter/internal/messaging/revoked_token_listener.go index 9020acd8af..7dea443a91 100644 --- a/adapter/internal/messaging/revoked_token_listener.go +++ b/adapter/internal/messaging/revoked_token_listener.go @@ -32,11 +32,7 @@ func handleTokenRevocation(deliveries <-chan amqp.Delivery, done chan error) { for d := range deliveries { var notification EventTokenRevocationNotification - unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) - if unmarshalErr != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshalling revoked token event data %v", unmarshalErr) - continue - } + json.Unmarshal([]byte(string(d.Body)), ¬ification) logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.Type) logger.LoggerMsg.Printf("RevokedToken: %s, Token Type: %s", notification.Event.PayloadData.RevokedToken, notification.Event.PayloadData.Type) diff --git a/adapter/internal/synchronizer/keymanagers_fetcher.go b/adapter/internal/synchronizer/keymanagers_fetcher.go index 2b6871ae10..8b68274243 100644 --- a/adapter/internal/synchronizer/keymanagers_fetcher.go +++ b/adapter/internal/synchronizer/keymanagers_fetcher.go @@ -120,11 +120,7 @@ func FetchKeyManagersOnStartUp(conf *config.Config) { if resp.StatusCode == http.StatusOK { var keyManagers []eventhubTypes.KeyManager - err := json.Unmarshal(responseBytes, &keyManagers) - if err != nil { - logger.LoggerMsg.Errorf("Error occurred while unmarshelling Key Managers event data %v", err) - return - } + json.Unmarshal(responseBytes, &keyManagers) for _, kmConfig := range keyManagers { xds.KeyManagerList = append(xds.KeyManagerList, kmConfig) From 7f2c9f3bcc142330da59154199a34e9edf493237 Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Tue, 4 May 2021 10:52:28 +0530 Subject: [PATCH 07/11] Fix test case failure --- adapter/internal/messaging/event_types.go | 16 +++--- adapter/internal/messaging/km_listener.go | 12 ++++- .../messaging/notification_listener.go | 49 +++++++++++++++---- .../messaging/revoked_token_listener.go | 6 ++- .../synchronizer/keymanagers_fetcher.go | 6 ++- 5 files changed, 68 insertions(+), 21 deletions(-) diff --git a/adapter/internal/messaging/event_types.go b/adapter/internal/messaging/event_types.go index b1b7526d47..e5015bcfe8 100644 --- a/adapter/internal/messaging/event_types.go +++ b/adapter/internal/messaging/event_types.go @@ -113,14 +113,14 @@ type ApplicationRegistrationEvent struct { // ApplicationEvent for struct application events type ApplicationEvent struct { - UUID string `json:"uuid"` - ApplicationID int32 `json:"applicationId"` - ApplicationName string `json:"applicationName"` - TokenType string `json:"tokenType"` - ApplicationPolicy string `json:"applicationPolicy"` - Attributes []string `json:"attributes"` - Subscriber string `json:"subscriber"` - GroupID []string `json:"groupIds"` + UUID string `json:"uuid"` + ApplicationID int32 `json:"applicationId"` + ApplicationName string `json:"applicationName"` + TokenType string `json:"tokenType"` + ApplicationPolicy string `json:"applicationPolicy"` + Attributes interface{} `json:"attributes"` + Subscriber string `json:"subscriber"` + GroupID []string `json:"groupIds"` Event } diff --git a/adapter/internal/messaging/km_listener.go b/adapter/internal/messaging/km_listener.go index 7e667ec10a..60b4ed065a 100644 --- a/adapter/internal/messaging/km_listener.go +++ b/adapter/internal/messaging/km_listener.go @@ -48,7 +48,11 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { var notification EventKeyManagerNotification // var keyManagerConfig resourceTypes.KeymanagerConfig var kmConfigMap map[string]interface{} - json.Unmarshal([]byte(string(d.Body)), ¬ification) + unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) + if unmarshalErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling key manager event data %v", unmarshalErr.Error()) + return + } logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType) for i := range xds.KeyManagerList { if strings.EqualFold(notification.Event.PayloadData.Name, xds.KeyManagerList[i].Name) { @@ -80,7 +84,11 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) { xds.GenerateAndUpdateKeyManagerList() } else if decodedByte != nil { logger.LoggerMsg.Infof("decoded stream %s", string(decodedByte)) - json.Unmarshal([]byte(string(decodedByte)), &kmConfigMap) + kmConfigMapErr := json.Unmarshal([]byte(string(decodedByte)), &kmConfigMap) + if kmConfigMapErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling key manager config map %v", kmConfigMapErr) + return + } if strings.EqualFold(actionAdd, notification.Event.PayloadData.Action) || strings.EqualFold(actionUpdate, notification.Event.PayloadData.Action) { diff --git a/adapter/internal/messaging/notification_listener.go b/adapter/internal/messaging/notification_listener.go index 0999882327..585bdb0439 100644 --- a/adapter/internal/messaging/notification_listener.go +++ b/adapter/internal/messaging/notification_listener.go @@ -72,7 +72,11 @@ func handleNotification(deliveries <-chan amqp.Delivery, done chan error) { for d := range deliveries { var notification EventNotification var eventType string - json.Unmarshal([]byte(string(d.Body)), ¬ification) + notificationErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) + if notificationErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling event data %v", notificationErr) + continue + } logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType) var decodedByte, err = base64.StdEncoding.DecodeString(notification.Event.PayloadData.Event) if err != nil { @@ -108,7 +112,11 @@ func handleAPIEvents(data []byte, eventType string) { currentTimeStamp int64 = apiEvent.Event.TimeStamp ) - json.Unmarshal([]byte(string(data)), &apiEvent) + apiEventErr := json.Unmarshal([]byte(string(data)), &apiEvent) + if apiEventErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling API event data %v", apiEventErr) + return + } if !belongsToTenant(apiEvent.TenantDomain) { apiName := apiEvent.APIName if apiEvent.APIName == "" { @@ -179,7 +187,11 @@ func handleAPIEvents(data []byte, eventType string) { func handleLifeCycleEvents(data []byte) { var apiEvent APIEvent - json.Unmarshal([]byte(string(data)), &apiEvent) + apiLCEventErr := json.Unmarshal([]byte(string(data)), &apiEvent) + if apiLCEventErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Lifecycle event data %v", apiLCEventErr) + return + } if !belongsToTenant(apiEvent.TenantDomain) { logger.LoggerMsg.Debugf("API Lifecycle event for the API %s:%s is dropped due to having non related tenantDomain : %s", apiEvent.APIName, apiEvent.APIVersion, apiEvent.TenantDomain) @@ -220,7 +232,11 @@ func handleApplicationEvents(data []byte, eventType string) { if strings.EqualFold(applicationRegistration, eventType) || strings.EqualFold(removeApplicationKeyMapping, eventType) { var applicationRegistrationEvent ApplicationRegistrationEvent - json.Unmarshal([]byte(string(data)), &applicationRegistrationEvent) + appRegEventErr := json.Unmarshal([]byte(string(data)), &applicationRegistrationEvent) + if appRegEventErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Application Registration event data %v", appRegEventErr) + return + } if !belongsToTenant(applicationRegistrationEvent.TenantDomain) { logger.LoggerMsg.Debugf("Application Registration event for the Consumer Key : %s is dropped due to having non related tenantDomain : %s", @@ -242,7 +258,11 @@ func handleApplicationEvents(data []byte, eventType string) { xds.UpdateEnforcerApplicationKeyMappings(xds.MarshalKeyMappingList(eh.AppKeyMappingList)) } else { var applicationEvent ApplicationEvent - json.Unmarshal([]byte(string(data)), &applicationEvent) + appEventErr := json.Unmarshal([]byte(string(data)), &applicationEvent) + if appEventErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Application event data %v", appEventErr) + return + } if !belongsToTenant(applicationEvent.TenantDomain) { logger.LoggerMsg.Debugf("Application event for the Application : %s (with uuid %s) is dropped due to having non related tenantDomain : %s", @@ -277,7 +297,11 @@ func handleApplicationEvents(data []byte, eventType string) { // handleSubscriptionRelatedEvents to process subscription related events func handleSubscriptionEvents(data []byte, eventType string) { var subscriptionEvent SubscriptionEvent - json.Unmarshal([]byte(string(data)), &subscriptionEvent) + subEventErr := json.Unmarshal([]byte(string(data)), &subscriptionEvent) + if subEventErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription event data %v", subEventErr) + return + } if !belongsToTenant(subscriptionEvent.TenantDomain) { logger.LoggerMsg.Debugf("Subscription event for the Application : %s and API %s is dropped due to having non related tenantDomain : %s", subscriptionEvent.ApplicationUUID, subscriptionEvent.APIUUID, subscriptionEvent.TenantDomain) @@ -307,8 +331,11 @@ func handleSubscriptionEvents(data []byte, eventType string) { // handlePolicyRelatedEvents to process policy related events func handlePolicyEvents(data []byte, eventType string) { var policyEvent PolicyInfo - json.Unmarshal([]byte(string(data)), &policyEvent) - + policyEventErr := json.Unmarshal([]byte(string(data)), &policyEvent) + if policyEventErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Throttling Policy event data %v", policyEventErr) + return + } // TODO: Handle policy events if strings.EqualFold(eventType, policyCreate) { logger.LoggerMsg.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType) @@ -339,7 +366,11 @@ func handlePolicyEvents(data []byte, eventType string) { } else if strings.EqualFold(subscriptionEventType, policyEvent.PolicyType) { var subscriptionPolicyEvent SubscriptionPolicyEvent - json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent) + subPolicyErr := json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription Policy event data %v", subPolicyErr) + return + } subscriptionPolicy := types.SubscriptionPolicy{ID: subscriptionPolicyEvent.PolicyID, TenantID: -1, Name: subscriptionPolicyEvent.PolicyName, QuotaType: subscriptionPolicyEvent.QuotaType, diff --git a/adapter/internal/messaging/revoked_token_listener.go b/adapter/internal/messaging/revoked_token_listener.go index 7dea443a91..9020acd8af 100644 --- a/adapter/internal/messaging/revoked_token_listener.go +++ b/adapter/internal/messaging/revoked_token_listener.go @@ -32,7 +32,11 @@ func handleTokenRevocation(deliveries <-chan amqp.Delivery, done chan error) { for d := range deliveries { var notification EventTokenRevocationNotification - json.Unmarshal([]byte(string(d.Body)), ¬ification) + unmarshalErr := json.Unmarshal([]byte(string(d.Body)), ¬ification) + if unmarshalErr != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshalling revoked token event data %v", unmarshalErr) + continue + } logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.Type) logger.LoggerMsg.Printf("RevokedToken: %s, Token Type: %s", notification.Event.PayloadData.RevokedToken, notification.Event.PayloadData.Type) diff --git a/adapter/internal/synchronizer/keymanagers_fetcher.go b/adapter/internal/synchronizer/keymanagers_fetcher.go index 8b68274243..2b6871ae10 100644 --- a/adapter/internal/synchronizer/keymanagers_fetcher.go +++ b/adapter/internal/synchronizer/keymanagers_fetcher.go @@ -120,7 +120,11 @@ func FetchKeyManagersOnStartUp(conf *config.Config) { if resp.StatusCode == http.StatusOK { var keyManagers []eventhubTypes.KeyManager - json.Unmarshal(responseBytes, &keyManagers) + err := json.Unmarshal(responseBytes, &keyManagers) + if err != nil { + logger.LoggerMsg.Errorf("Error occurred while unmarshelling Key Managers event data %v", err) + return + } for _, kmConfig := range keyManagers { xds.KeyManagerList = append(xds.KeyManagerList, kmConfig) From 601bbf9077802b6d454c418b38a1a746f761c9f3 Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Tue, 4 May 2021 11:52:33 +0530 Subject: [PATCH 08/11] Fix build failure --- adapter/internal/messaging/notification_listener.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/internal/messaging/notification_listener.go b/adapter/internal/messaging/notification_listener.go index 585bdb0439..290b6a720c 100644 --- a/adapter/internal/messaging/notification_listener.go +++ b/adapter/internal/messaging/notification_listener.go @@ -367,7 +367,7 @@ func handlePolicyEvents(data []byte, eventType string) { } else if strings.EqualFold(subscriptionEventType, policyEvent.PolicyType) { var subscriptionPolicyEvent SubscriptionPolicyEvent subPolicyErr := json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent) - if err != nil { + if subPolicyErr != nil { logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription Policy event data %v", subPolicyErr) return } From 2048baa46448cedd12d7f11c76ccda250d3014c2 Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Tue, 4 May 2021 16:32:41 +0530 Subject: [PATCH 09/11] Update enforcer docker base image --- enforcer/src/main/resources/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enforcer/src/main/resources/Dockerfile b/enforcer/src/main/resources/Dockerfile index 3c4d71c8a4..bed588e6f3 100644 --- a/enforcer/src/main/resources/Dockerfile +++ b/enforcer/src/main/resources/Dockerfile @@ -14,7 +14,7 @@ # limitations under the License. # ----------------------------------------------------------------------- -FROM adoptopenjdk/openjdk11:jre-11.0.10_9-alpine +FROM adoptopenjdk/openjdk11:jre-11.0.11_9-alpine RUN apk update && apk upgrade --no-cache LABEL maintainer="WSO2 Docker Maintainers " From 7518d5d3cc683f21ce667c5aae4f190192534c9a Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Tue, 4 May 2021 17:11:33 +0530 Subject: [PATCH 10/11] Update the version to 0.9.1-SNAPSHOT in docker-compose and k8s artefacts. --- resources/docker-compose/apim/docker-compose.yaml | 6 +++--- resources/docker-compose/docker-compose.yaml | 6 +++--- .../k8s-artifacts/choreo-connect/adapter-deployment.yaml | 2 +- .../choreo-connect/choreo-connect-deployment.yaml | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/resources/docker-compose/apim/docker-compose.yaml b/resources/docker-compose/apim/docker-compose.yaml index bb8bf416ab..0715b9a7f7 100644 --- a/resources/docker-compose/apim/docker-compose.yaml +++ b/resources/docker-compose/apim/docker-compose.yaml @@ -14,7 +14,7 @@ services: volumes: - ./conf/deployment.toml:/home/wso2carbon/wso2am-4.0.0/repository/conf/deployment.toml router: - image: wso2/choreo-connect-router:0.9.0 + image: wso2/choreo-connect-router:0.9.1-SNAPSHOT logging: options: max-size: "20m" @@ -42,7 +42,7 @@ services: - adapter - enforcer adapter: - image: wso2/choreo-connect-adapter:0.9.0 + image: wso2/choreo-connect-adapter:0.9.1-SNAPSHOT logging: options: max-size: "20m" @@ -65,7 +65,7 @@ services: links: - apim enforcer: - image: wso2/choreo-connect-enforcer:0.9.0 + image: wso2/choreo-connect-enforcer:0.9.1-SNAPSHOT logging: options: max-size: "20m" diff --git a/resources/docker-compose/docker-compose.yaml b/resources/docker-compose/docker-compose.yaml index 6eb6d0a509..0cb7c15a06 100644 --- a/resources/docker-compose/docker-compose.yaml +++ b/resources/docker-compose/docker-compose.yaml @@ -1,7 +1,7 @@ version: "3.7" services: router: - image: wso2/choreo-connect-router:0.9.0 + image: wso2/choreo-connect-router:0.9.1-SNAPSHOT logging: options: max-size: "20m" @@ -30,7 +30,7 @@ services: - adapter - enforcer adapter: - image: wso2/choreo-connect-adapter:0.9.0 + image: wso2/choreo-connect-adapter:0.9.1-SNAPSHOT logging: options: max-size: "20m" @@ -48,7 +48,7 @@ services: - "18000:18000" - "9843:9843" enforcer: - image: wso2/choreo-connect-enforcer:0.9.0 + image: wso2/choreo-connect-enforcer:0.9.1-SNAPSHOT logging: options: max-size: "20m" diff --git a/resources/k8s-artifacts/choreo-connect/adapter-deployment.yaml b/resources/k8s-artifacts/choreo-connect/adapter-deployment.yaml index 9dfdfb50ad..f8a5ec44e3 100644 --- a/resources/k8s-artifacts/choreo-connect/adapter-deployment.yaml +++ b/resources/k8s-artifacts/choreo-connect/adapter-deployment.yaml @@ -45,7 +45,7 @@ spec: - mountPath: /home/wso2/conf/log_config.toml subPath: log_config.toml name: logconfig-toml-vol - image: wso2/choreo-connect-adapter:0.9.0 + image: wso2/choreo-connect-adapter:0.9.1-SNAPSHOT imagePullPolicy: IfNotPresent env: - name: ADAPTER_PRIVATE_KEY_PATH diff --git a/resources/k8s-artifacts/choreo-connect/choreo-connect-deployment.yaml b/resources/k8s-artifacts/choreo-connect/choreo-connect-deployment.yaml index 32e48dc64e..2e8916695f 100644 --- a/resources/k8s-artifacts/choreo-connect/choreo-connect-deployment.yaml +++ b/resources/k8s-artifacts/choreo-connect/choreo-connect-deployment.yaml @@ -46,7 +46,7 @@ spec: name: log4j2-vol - mountPath: /home/wso2/lib/dropins name: dropins-vol - image: wso2/choreo-connect-enforcer:0.9.0 + image: wso2/choreo-connect-enforcer:0.9.1-SNAPSHOT imagePullPolicy: IfNotPresent env: - name: ENFORCER_PRIVATE_KEY_PATH @@ -112,7 +112,7 @@ spec: name: router-keystore-vol - mountPath: /home/wso2/security/truststore name: router-truststore-vol - image: wso2/choreo-connect-router:0.9.0 + image: wso2/choreo-connect-router:0.9.1-SNAPSHOT imagePullPolicy: IfNotPresent env: - name: ROUTER_ADMIN_HOST From 4ed78ad6ac96c872febb4d6d4e20db19173025a3 Mon Sep 17 00:00:00 2001 From: Menaka Jayawardena Date: Tue, 4 May 2021 19:27:43 +0530 Subject: [PATCH 11/11] Update dependency versions --- pom.xml | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index f97bd38b04..513d5ceb65 100644 --- a/pom.xml +++ b/pom.xml @@ -331,11 +331,23 @@ org.wso2.carbon.apimgt org.wso2.carbon.apimgt.common.gateway ${carbon.apimgt.version} + + + com.fasterxml.jackson.core + jackson-databind + + org.wso2.carbon.apimgt org.wso2.carbon.apimgt.common.analytics ${carbon.apimgt.version} + + + com.fasterxml.jackson.core + jackson-databind + + org.wso2.carbon.apimgt @@ -358,6 +370,11 @@ org.wso2.am.analytics.publisher.client ${analytics.publisher.client.version} + + com.fasterxml.jackson.core + jackson-databind + ${jackson.databind.version} + org.awaitility awaitility @@ -379,14 +396,14 @@ 9.0.174 3.2.0 3.9 - 2.6 + 2.7 0.1.28 1.1.1.wso2v1 1.32.1 1.32.1 1.32.1 2.8.6 - 27.0-jre + 30.0-jre 0.34.1 4.1.63.Final 2.3 @@ -419,5 +436,6 @@ 1.0.0 3.1.2 4.5.13 + 2.10.5.1