Skip to content

Commit

Permalink
Merge pull request #2001 from menakaj/main
Browse files Browse the repository at this point in the history
[main] Add Logging improvements
  • Loading branch information
menakajl authored May 4, 2021
2 parents 5e3abe6 + 4ed78ad commit cd09bec
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 107 deletions.
22 changes: 11 additions & 11 deletions adapter/internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
subscriptionservice "github.com/wso2/adapter/internal/discovery/api/wso2/discovery/service/subscription"
throttleservice "github.com/wso2/adapter/internal/discovery/api/wso2/discovery/service/throtlle"
wso2_server "github.com/wso2/adapter/internal/discovery/protocol/server/v3"
enforcerCallbacks "github.com/wso2/adapter/internal/discovery/xds/enforcercallbacks"
routercb "github.com/wso2/adapter/internal/discovery/xds/routercallbacks"
"github.com/wso2/adapter/internal/health"
healthservice "github.com/wso2/adapter/internal/health/api/wso2/health/service"
Expand All @@ -48,7 +49,6 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/wso2/adapter/config"
"github.com/wso2/adapter/internal/discovery/xds"
cb "github.com/wso2/adapter/internal/discovery/xds"
"github.com/wso2/adapter/internal/eventhub"
"github.com/wso2/adapter/internal/messaging"
"github.com/wso2/adapter/internal/synchronizer"
Expand Down Expand Up @@ -180,16 +180,16 @@ func Run(conf *config.Config) {
enforcerThrottleDataCache := xds.GetEnforcerThrottleDataCache()

srv := xdsv3.NewServer(ctx, cache, &routercb.Callbacks{})
enforcerXdsSrv := wso2_server.NewServer(ctx, enforcerCache, &cb.Callbacks{})
enforcerSdsSrv := wso2_server.NewServer(ctx, enforcerSubscriptionCache, &cb.Callbacks{})
enforcerAppDsSrv := wso2_server.NewServer(ctx, enforcerApplicationCache, &cb.Callbacks{})
enforcerAPIDsSrv := wso2_server.NewServer(ctx, enforcerAPICache, &cb.Callbacks{})
enforcerAppPolicyDsSrv := wso2_server.NewServer(ctx, enforcerApplicationPolicyCache, &cb.Callbacks{})
enforcerSubPolicyDsSrv := wso2_server.NewServer(ctx, enforcerSubscriptionPolicyCache, &cb.Callbacks{})
enforcerAppKeyMappingDsSrv := wso2_server.NewServer(ctx, enforcerApplicationKeyMappingCache, &cb.Callbacks{})
enforcerKeyManagerDsSrv := wso2_server.NewServer(ctx, enforcerKeyManagerCache, &cb.Callbacks{})
enforcerRevokedTokenDsSrv := wso2_server.NewServer(ctx, enforcerRevokedTokenCache, &cb.Callbacks{})
enforcerThrottleDataDsSrv := wso2_server.NewServer(ctx, enforcerThrottleDataCache, &cb.Callbacks{})
enforcerXdsSrv := wso2_server.NewServer(ctx, enforcerCache, &enforcerCallbacks.Callbacks{})
enforcerSdsSrv := wso2_server.NewServer(ctx, enforcerSubscriptionCache, &enforcerCallbacks.Callbacks{})
enforcerAppDsSrv := wso2_server.NewServer(ctx, enforcerApplicationCache, &enforcerCallbacks.Callbacks{})
enforcerAPIDsSrv := wso2_server.NewServer(ctx, enforcerAPICache, &enforcerCallbacks.Callbacks{})
enforcerAppPolicyDsSrv := wso2_server.NewServer(ctx, enforcerApplicationPolicyCache, &enforcerCallbacks.Callbacks{})
enforcerSubPolicyDsSrv := wso2_server.NewServer(ctx, enforcerSubscriptionPolicyCache, &enforcerCallbacks.Callbacks{})
enforcerAppKeyMappingDsSrv := wso2_server.NewServer(ctx, enforcerApplicationKeyMappingCache, &enforcerCallbacks.Callbacks{})
enforcerKeyManagerDsSrv := wso2_server.NewServer(ctx, enforcerKeyManagerCache, &enforcerCallbacks.Callbacks{})
enforcerRevokedTokenDsSrv := wso2_server.NewServer(ctx, enforcerRevokedTokenCache, &enforcerCallbacks.Callbacks{})
enforcerThrottleDataDsSrv := wso2_server.NewServer(ctx, enforcerThrottleDataCache, &enforcerCallbacks.Callbacks{})

runManagementServer(conf, srv, enforcerXdsSrv, enforcerSdsSrv, enforcerAppDsSrv, enforcerAPIDsSrv,
enforcerAppPolicyDsSrv, enforcerSubPolicyDsSrv, enforcerAppKeyMappingDsSrv, enforcerKeyManagerDsSrv,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
*
*/

package xds
package enforcercallbacks

import (
"context"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/wso2/adapter/internal/discovery/protocol/resource/v3"
xds "github.com/wso2/adapter/internal/discovery/xds"
logger "github.com/wso2/adapter/loggers"
)

Expand All @@ -34,24 +35,24 @@ func (cb *Callbacks) Report() {}

// OnStreamOpen prints debug logs
func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error {
logger.LoggerXdsCallbacks.Debugf("stream %d open for %s\n", id, typ)
logger.LoggerEnforcerXdsCallbacks.Debugf("stream %d open for %s\n", id, typ)
return nil
}

// OnStreamClosed prints debug logs
func (cb *Callbacks) OnStreamClosed(id int64) {
logger.LoggerXdsCallbacks.Debugf("stream %d closed\n", id)
logger.LoggerEnforcerXdsCallbacks.Debugf("stream %d closed\n", id)
}

// OnStreamRequest prints debug logs
func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryRequest) error {
logger.LoggerXdsCallbacks.Debugf("stream request on stream id: %d, from node: %s, version: %s, for type: %s",
logger.LoggerEnforcerXdsCallbacks.Debugf("stream request on stream id: %d, from node: %s, version: %s, for type: %s",
id, request.GetNode(), request.GetVersionInfo(), request.GetTypeUrl())
requestEventChannel := GetRequestEventChannel()
requestEventChannel := xds.GetRequestEventChannel()
if resource.APIType == request.GetTypeUrl() {
requestEvent := NewRequestEvent()
requestEvent := xds.NewRequestEvent()
if request.ErrorDetail != nil {
logger.LoggerXdsCallbacks.Errorf("stream request on stream id: %d Error: %s", id, request.ErrorDetail.Message)
logger.LoggerEnforcerXdsCallbacks.Errorf("stream request on stream id: %d Error: %s", id, request.ErrorDetail.Message)
requestEvent.IsError = true
}
requestEvent.Node = request.GetNode().GetId()
Expand All @@ -63,17 +64,17 @@ func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryReque

// OnStreamResponse prints debug logs
func (cb *Callbacks) OnStreamResponse(id int64, request *discovery.DiscoveryRequest, response *discovery.DiscoveryResponse) {
logger.LoggerXdsCallbacks.Debugf("stream request on stream id: %d node: %s for type: %s version: %s",
logger.LoggerEnforcerXdsCallbacks.Debugf("stream response on stream id: %d node: %s for type: %s version: %s",
id, request.GetNode(), request.GetTypeUrl(), response.GetVersionInfo())
}

// OnFetchRequest prints debug logs
func (cb *Callbacks) OnFetchRequest(_ context.Context, req *discovery.DiscoveryRequest) error {
logger.LoggerXdsCallbacks.Debugf("fetch request from node: %s, version: %s, for type: %s", req.Node.Id, req.VersionInfo, req.TypeUrl)
logger.LoggerEnforcerXdsCallbacks.Debugf("fetch request from node: %s, version: %s, for type: %s", req.Node.Id, req.VersionInfo, req.TypeUrl)
return nil
}

// OnFetchResponse prints debug logs
func (cb *Callbacks) OnFetchResponse(req *discovery.DiscoveryRequest, res *discovery.DiscoveryResponse) {
logger.LoggerXdsCallbacks.Debugf("fetch response to node: %s, version: %s, for type: %s", req.Node.Id, req.VersionInfo, res.TypeUrl)
logger.LoggerEnforcerXdsCallbacks.Debugf("fetch response to node: %s, version: %s, for type: %s", req.Node.Id, req.VersionInfo, res.TypeUrl)
}
54 changes: 38 additions & 16 deletions adapter/internal/messaging/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,35 @@ func connectToRabbitMQ(url string) (*amqp.Connection, error) {
// is closed unexpectedly.
func (c *Consumer) reconnect(key string) {
var err error
conErr := <-c.conn.NotifyClose(make(chan *amqp.Error))
if conErr != nil {
shouldReconnect := false
connClose := <-c.conn.NotifyClose(make(chan *amqp.Error))
connBlocked := c.conn.NotifyBlocked(make(chan amqp.Blocking))
chClose := c.channel.NotifyClose(make(chan *amqp.Error))

if connClose != nil {
shouldReconnect = true
logger.LoggerMsg.Errorf("CRITICAL: Connection dropped for %s, reconnecting...", key)
}

if connBlocked != nil {
shouldReconnect = true
logger.LoggerMsg.Errorf("CRITICAL: Connection blocked for %s, reconnecting...", key)
}

if chClose != nil {
shouldReconnect = true
logger.LoggerMsg.Errorf("CRITICAL: Channel closed for %s, reconnecting...", key)
}

if shouldReconnect {
c.conn.Close()
c, rabbitConn, err = connectionRetry(key)
if err != nil {
logger.LoggerMsg.Errorf("Cannot establish connection for topic %s", key)
}
} else {
logger.LoggerMsg.Infof("NotifyClose from the connection and channel are %v and %v respectively, NotifyBlocked from the connection is %v",
connClose, chClose, connBlocked)
}
}

Expand Down Expand Up @@ -115,22 +136,23 @@ func retrieveAMQPURLList() []amqpFailoverURL {
amqpConnectionURL := strings.Split(conURL, "?")[0]
u, err := url.Parse(conURL)
if err != nil {
panic(err)
}
m, _ := url.ParseQuery(u.RawQuery)
if m["connectdelay"] != nil {
connectdelay := m["connectdelay"][0]
delay, err = strconv.Atoi(connectdelay[1 : len(connectdelay)-1])
}
logger.LoggerMsg.Errorf("Error occured %v", err)
} else {
m, _ := url.ParseQuery(u.RawQuery)
if m["connectdelay"] != nil {
connectdelay := m["connectdelay"][0]
delay, _ = strconv.Atoi(connectdelay[1 : len(connectdelay)-1])
}

if m["retries"] != nil {
retrycount := m["retries"][0]
retries, err = strconv.Atoi(retrycount[1 : len(retrycount)-1])
}
if m["retries"] != nil {
retrycount := m["retries"][0]
retries, _ = strconv.Atoi(retrycount[1 : len(retrycount)-1])
}

failoverurlObj := amqpFailoverURL{url: amqpConnectionURL, retryCount: retries,
connectionDelay: delay}
amqlURLList = append(amqlURLList, failoverurlObj)
failoverurlObj := amqpFailoverURL{url: amqpConnectionURL, retryCount: retries,
connectionDelay: delay}
amqlURLList = append(amqlURLList, failoverurlObj)
}
}
return amqlURLList
}
Expand Down
16 changes: 8 additions & 8 deletions adapter/internal/messaging/event_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ type ApplicationRegistrationEvent struct {

// ApplicationEvent for struct application events
type ApplicationEvent struct {
UUID string `json:"uuid"`
ApplicationID int32 `json:"applicationId"`
ApplicationName string `json:"applicationName"`
TokenType string `json:"tokenType"`
ApplicationPolicy string `json:"applicationPolicy"`
Attributes []string `json:"attributes"`
Subscriber string `json:"subscriber"`
GroupID []string `json:"groupIds"`
UUID string `json:"uuid"`
ApplicationID int32 `json:"applicationId"`
ApplicationName string `json:"applicationName"`
TokenType string `json:"tokenType"`
ApplicationPolicy string `json:"applicationPolicy"`
Attributes interface{} `json:"attributes"`
Subscriber string `json:"subscriber"`
GroupID []string `json:"groupIds"`
Event
}

Expand Down
24 changes: 16 additions & 8 deletions adapter/internal/messaging/km_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) {
var notification EventKeyManagerNotification
// var keyManagerConfig resourceTypes.KeymanagerConfig
var kmConfigMap map[string]interface{}

// var eventType string
json.Unmarshal([]byte(string(d.Body)), &notification)


unmarshalErr := json.Unmarshal([]byte(string(d.Body)), &notification)
if unmarshalErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling key manager event data %v", unmarshalErr.Error())
return
}
logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType)
for i := range xds.KeyManagerList {
if strings.EqualFold(notification.Event.PayloadData.Name, xds.KeyManagerList[i].Name) {
isFound = true
Expand All @@ -62,11 +63,14 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) {
}

var decodedByte, err = base64.StdEncoding.DecodeString(notification.Event.PayloadData.Value)

if err != nil {
if _, ok := err.(base64.CorruptInputError); ok {
panic("\nbase64 input is corrupt, check the provided key")
logger.LoggerMsg.Error("\nbase64 input is corrupt, check the provided key")
}
panic(err)

logger.LoggerMsg.Errorf("Error occurred while decoding the notification event %v", err)
return
}

if strings.EqualFold(keyManagerConfigEvent, notification.Event.PayloadData.EventType) {
Expand All @@ -80,7 +84,11 @@ func handleKMConfiguration(deliveries <-chan amqp.Delivery, done chan error) {
xds.GenerateAndUpdateKeyManagerList()
} else if decodedByte != nil {
logger.LoggerMsg.Infof("decoded stream %s", string(decodedByte))
json.Unmarshal([]byte(string(decodedByte)), &kmConfigMap)
kmConfigMapErr := json.Unmarshal([]byte(string(decodedByte)), &kmConfigMap)
if kmConfigMapErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling key manager config map %v", kmConfigMapErr)
return
}

if strings.EqualFold(actionAdd, notification.Event.PayloadData.Action) ||
strings.EqualFold(actionUpdate, notification.Event.PayloadData.Action) {
Expand Down
57 changes: 44 additions & 13 deletions adapter/internal/messaging/notification_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,22 @@ func handleNotification(deliveries <-chan amqp.Delivery, done chan error) {
for d := range deliveries {
var notification EventNotification
var eventType string
json.Unmarshal([]byte(string(d.Body)), &notification)
notificationErr := json.Unmarshal([]byte(string(d.Body)), &notification)
if notificationErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling event data %v", notificationErr)
continue
}
logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.EventType)
var decodedByte, err = base64.StdEncoding.DecodeString(notification.Event.PayloadData.Event)
if err != nil {
if _, ok := err.(base64.CorruptInputError); ok {
panic("\nbase64 input is corrupt, check the provided key")
logger.LoggerMsg.Error("\nbase64 input is corrupt, check the provided key")
}
panic(err)
logger.LoggerMsg.Errorf("Error occurred while decoding the notification event %v", err)
continue
}
logger.LoggerMsg.Debugf("\n\n[%s]", decodedByte)
eventType = notification.Event.PayloadData.EventType
logger.LoggerMsg.Debugf("Event type : %s", eventType)

if strings.Contains(eventType, apiLifeCycleChange) {
handleLifeCycleEvents(decodedByte)
} else if strings.Contains(eventType, apiEventType) {
Expand All @@ -108,7 +112,11 @@ func handleAPIEvents(data []byte, eventType string) {
currentTimeStamp int64 = apiEvent.Event.TimeStamp
)

json.Unmarshal([]byte(string(data)), &apiEvent)
apiEventErr := json.Unmarshal([]byte(string(data)), &apiEvent)
if apiEventErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling API event data %v", apiEventErr)
return
}
if !belongsToTenant(apiEvent.TenantDomain) {
apiName := apiEvent.APIName
if apiEvent.APIName == "" {
Expand Down Expand Up @@ -179,7 +187,11 @@ func handleAPIEvents(data []byte, eventType string) {

func handleLifeCycleEvents(data []byte) {
var apiEvent APIEvent
json.Unmarshal([]byte(string(data)), &apiEvent)
apiLCEventErr := json.Unmarshal([]byte(string(data)), &apiEvent)
if apiLCEventErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling Lifecycle event data %v", apiLCEventErr)
return
}
if !belongsToTenant(apiEvent.TenantDomain) {
logger.LoggerMsg.Debugf("API Lifecycle event for the API %s:%s is dropped due to having non related tenantDomain : %s",
apiEvent.APIName, apiEvent.APIVersion, apiEvent.TenantDomain)
Expand Down Expand Up @@ -220,7 +232,11 @@ func handleApplicationEvents(data []byte, eventType string) {
if strings.EqualFold(applicationRegistration, eventType) ||
strings.EqualFold(removeApplicationKeyMapping, eventType) {
var applicationRegistrationEvent ApplicationRegistrationEvent
json.Unmarshal([]byte(string(data)), &applicationRegistrationEvent)
appRegEventErr := json.Unmarshal([]byte(string(data)), &applicationRegistrationEvent)
if appRegEventErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling Application Registration event data %v", appRegEventErr)
return
}

if !belongsToTenant(applicationRegistrationEvent.TenantDomain) {
logger.LoggerMsg.Debugf("Application Registration event for the Consumer Key : %s is dropped due to having non related tenantDomain : %s",
Expand All @@ -242,7 +258,11 @@ func handleApplicationEvents(data []byte, eventType string) {
xds.UpdateEnforcerApplicationKeyMappings(xds.MarshalKeyMappingList(eh.AppKeyMappingList))
} else {
var applicationEvent ApplicationEvent
json.Unmarshal([]byte(string(data)), &applicationEvent)
appEventErr := json.Unmarshal([]byte(string(data)), &applicationEvent)
if appEventErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling Application event data %v", appEventErr)
return
}

if !belongsToTenant(applicationEvent.TenantDomain) {
logger.LoggerMsg.Debugf("Application event for the Application : %s (with uuid %s) is dropped due to having non related tenantDomain : %s",
Expand Down Expand Up @@ -277,7 +297,11 @@ func handleApplicationEvents(data []byte, eventType string) {
// handleSubscriptionRelatedEvents to process subscription related events
func handleSubscriptionEvents(data []byte, eventType string) {
var subscriptionEvent SubscriptionEvent
json.Unmarshal([]byte(string(data)), &subscriptionEvent)
subEventErr := json.Unmarshal([]byte(string(data)), &subscriptionEvent)
if subEventErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription event data %v", subEventErr)
return
}
if !belongsToTenant(subscriptionEvent.TenantDomain) {
logger.LoggerMsg.Debugf("Subscription event for the Application : %s and API %s is dropped due to having non related tenantDomain : %s",
subscriptionEvent.ApplicationUUID, subscriptionEvent.APIUUID, subscriptionEvent.TenantDomain)
Expand Down Expand Up @@ -307,8 +331,11 @@ func handleSubscriptionEvents(data []byte, eventType string) {
// handlePolicyRelatedEvents to process policy related events
func handlePolicyEvents(data []byte, eventType string) {
var policyEvent PolicyInfo
json.Unmarshal([]byte(string(data)), &policyEvent)

policyEventErr := json.Unmarshal([]byte(string(data)), &policyEvent)
if policyEventErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling Throttling Policy event data %v", policyEventErr)
return
}
// TODO: Handle policy events
if strings.EqualFold(eventType, policyCreate) {
logger.LoggerMsg.Infof("Policy: %s for policy type: %s", policyEvent.PolicyName, policyEvent.PolicyType)
Expand Down Expand Up @@ -339,7 +366,11 @@ func handlePolicyEvents(data []byte, eventType string) {

} else if strings.EqualFold(subscriptionEventType, policyEvent.PolicyType) {
var subscriptionPolicyEvent SubscriptionPolicyEvent
json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent)
subPolicyErr := json.Unmarshal([]byte(string(data)), &subscriptionPolicyEvent)
if subPolicyErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling Subscription Policy event data %v", subPolicyErr)
return
}

subscriptionPolicy := types.SubscriptionPolicy{ID: subscriptionPolicyEvent.PolicyID, TenantID: -1,
Name: subscriptionPolicyEvent.PolicyName, QuotaType: subscriptionPolicyEvent.QuotaType,
Expand Down
7 changes: 6 additions & 1 deletion adapter/internal/messaging/revoked_token_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func handleTokenRevocation(deliveries <-chan amqp.Delivery, done chan error) {

for d := range deliveries {
var notification EventTokenRevocationNotification
json.Unmarshal([]byte(string(d.Body)), &notification)
unmarshalErr := json.Unmarshal([]byte(string(d.Body)), &notification)
if unmarshalErr != nil {
logger.LoggerMsg.Errorf("Error occurred while unmarshalling revoked token event data %v", unmarshalErr)
continue
}
logger.LoggerMsg.Infof("Event %s is received", notification.Event.PayloadData.Type)
logger.LoggerMsg.Printf("RevokedToken: %s, Token Type: %s", notification.Event.PayloadData.RevokedToken,
notification.Event.PayloadData.Type)
var stokens []types.Resource
Expand Down
Loading

0 comments on commit cd09bec

Please sign in to comment.