From 1e59444e674431130361588eb96302d84b23447e Mon Sep 17 00:00:00 2001 From: dinusha92 Date: Fri, 15 Jun 2018 12:49:25 +0530 Subject: [PATCH 1/4] Analytics filter for mgw. --- .../ballerina/gateway/constants/constants.bal | 1 + .../main/ballerina/gateway/data_writer.bal | 149 ++++++++++++++++++ .../ballerina/gateway/dtos/analytics_dtos.bal | 53 +++++++ .../filters/analytics_request_filter.bal | 34 ++++ .../listeners/api_gateway_listener.bal | 10 +- .../ballerina/gateway/uploade_files_timer.bal | 64 ++++++++ .../ballerina/gateway/usagedata_uploader.bal | 60 +++++++ .../main/ballerina/gateway/utils/utils.bal | 43 +++++ distribution/resources/conf/micro-gw.conf | 1 + tests/src/test/resources/confs/base.conf | 1 + .../resources/confs/default-test-config.conf | 1 + tests/src/test/resources/confs/startup.conf | 1 + 12 files changed, 416 insertions(+), 2 deletions(-) create mode 100644 components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal create mode 100644 components/micro-gateway-core/src/main/ballerina/gateway/dtos/analytics_dtos.bal create mode 100644 components/micro-gateway-core/src/main/ballerina/gateway/filters/analytics_request_filter.bal create mode 100644 components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal create mode 100644 components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/constants/constants.bal b/components/micro-gateway-core/src/main/ballerina/gateway/constants/constants.bal index 3d02e73c49..70edb8b4af 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/constants/constants.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/constants/constants.bal @@ -105,6 +105,7 @@ @final public string AUTHZ_FILTER = "AUTHZ_FILTER"; @final public string SUBSCRIPTION_FILTER = "SUBSCRIPTION_FILTER"; @final public string THROTTLE_FILTER = "THROTTLE_FILTER"; +@final public string ANALYTICS_FILTER = "ANALYTICS_FILTER"; diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal b/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal new file mode 100644 index 0000000000..8c0490c981 --- /dev/null +++ b/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal @@ -0,0 +1,149 @@ +// Copyright (c) WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/io; +import ballerina/http; + +@final +public string KVT = "--KVS--"; +public string EVS = "--EVT--"; +public string OBJ = "--OBJ--"; + +int initializingTime = 0; +//streams associated with DTOs +stream eventStream; + +function getPayload(AnalyticsRequestStream requestStream) returns (string) { + return requestStream.consumerKey + OBJ + requestStream.context + OBJ + requestStream.apiVersion + OBJ + + requestStream.api + OBJ + requestStream.resourcePath + OBJ +requestStream.resourceTemplate + OBJ + + requestStream.method + OBJ + requestStream.apiVersion + OBJ + requestStream.requestCount + OBJ + + requestStream.requestTime + OBJ + requestStream.username + OBJ + requestStream.tenantDomain + OBJ + + requestStream.hostName + OBJ + requestStream.apiPublisher + OBJ + requestStream.applicationName + OBJ + + requestStream.applicationId + OBJ + requestStream.userAgent + OBJ + requestStream.tier + + OBJ + requestStream.continuedOnThrottleOut + OBJ + requestStream.clientIp + requestStream.applicationOwner; +} + +function getMetaData(AnalyticsRequestStream requestStream) returns (string) { + return "{\"keyType\":\"" + requestStream.keyType + "\",\"correlationID\":\"" + requestStream.correlationID + "\"}"; +} + +function getCorrelationData(AnalyticsRequestStream request) returns (string) { + return request.correlationID; +} + +function generateRequestEvent(http:Request request, http:FilterContext context) returns (AnalyticsRequestStream){ + //ready authentication context to get values + AnalyticsRequestStream requestStream; + AuthenticationContext authContext = check context.attributes[AUTHENTICATION_CONTEXT]; + if ( authContext != null) { + requestStream.consumerKey = authContext.consumerKey; + requestStream.username = authContext.username; + requestStream.applicationId = authContext.applicationId; + requestStream.applicationName = authContext.applicationName; + requestStream.applicationOwner = authContext.subscriber; + requestStream.tier = authContext.tier; + requestStream.continuedOnThrottleOut = !authContext.stopOnQuotaReach; + } + requestStream.userAgent = request.userAgent; + requestStream.clientIp = getClientIp(request); + requestStream.context = getContext(context); + requestStream.tenantDomain = getTenantDomain(context); + requestStream.api = getApiName(context); + requestStream.apiVersion = getAPIDetailsFromServiceAnnotation(reflect:getServiceAnnotations(context.serviceType)). + apiVersion; + + //todo: hostname verify + requestStream.hostName = "localhost"; //todo:get the host properl + //todo:check if apiPublisher comes in keyValidation context + requestStream.apiPublisher = "admin@carbon.super"; //todo:get publisher properly + requestStream.method = request.method; + //todo:verify resourcepath and resourceTemplate + requestStream.resourceTemplate = "resourcePath"; + requestStream.resourcePath = getResourceConfigAnnotation + (reflect:getResourceAnnotations(context.serviceType, context.resourceName)).path; + requestStream.keyType = "PRODUCTION"; + //todo:random uuid taken from throttle filter + requestStream.correlationID = "71c60dbd-b2be-408d-9e2e-4fd11f60cfbc"; + requestStream.requestCount = 1; + //todo:get request time from authentication filter + time:Time time = time:currentTime(); + int currentTimeMills = time.time; + requestStream.requestTime = currentTimeMills; + return requestStream; + +} + +function generateEventFromRequest(AnalyticsRequestStream requestStream) returns EventDTO { + EventDTO eventDTO; + eventDTO.streamId = "org.wso2.apimgt.statistics.request:1.1.0"; + eventDTO.timeStamp = getCurrentTime(); + eventDTO.metaData = getMetaData(requestStream); + eventDTO.correlationData = getCorrelationData(requestStream); + eventDTO.payloadData = getPayload(requestStream); + return eventDTO; +} + +function getEventData(EventDTO dto) returns string { + string output = "streamId" + KVT + dto.streamId + EVS + "timestamp" + KVT + dto.timeStamp + EVS + + "metadata" + KVT + dto.metaData + EVS + "correlationData" + KVT + dto.correlationData + EVS + + "payLoadData" + KVT + dto.payloadData + "\n"; + return output; +} + +function writeEventToFile(EventDTO eventDTO) { + //todo:batch events to reduce IO cost + int currentTime = getCurrentTime(); + if (initializingTime == 0 ) { + initializingTime = getCurrentTime(); + io:println("initialTime: " + initializingTime); + } + if ( currentTime - initializingTime > 60*1000) { + var result = rotateFile("api-usage-data.dat"); + initializingTime = getCurrentTime(); + match result { + string name => { + io:println("File rotated successfully."); + } + error err => { + io:println("Error occurred while rotating the file."); + } + } + } + io:ByteChannel channel = io:openFile("api-usage-data.dat", io:APPEND); + io:CharacterChannel charChannel = new(channel, "UTF-8"); + try { + io:println("writing to events to a file"); + match charChannel.write(getEventData(eventDTO),0) { + int numberOfCharsWritten => { + io:println(" No of characters written : " + numberOfCharsWritten); + } + error err => { + throw err; + } + } + + } finally { + match charChannel.close() { + error sourceCloseError => { + io:println("Error occured while closing the channel: " + + sourceCloseError.message); + } + () => { + io:println("Source channel closed successfully."); + } + } + } +} \ No newline at end of file diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/dtos/analytics_dtos.bal b/components/micro-gateway-core/src/main/ballerina/gateway/dtos/analytics_dtos.bal new file mode 100644 index 0000000000..262f35ee81 --- /dev/null +++ b/components/micro-gateway-core/src/main/ballerina/gateway/dtos/analytics_dtos.bal @@ -0,0 +1,53 @@ +// Copyright (c) WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import ballerina/io; + +public type AnalyticsRequestStream { + string consumerKey; + string username; + string tenantDomain; + string context; + string apiVersion; + string api; + string resourcePath; + string resourceTemplate; + string method; + string hostName; + string apiPublisher; + string applicationName; + string applicationId; + string protocol; + string clientIp; + string applicationOwner; + string keyType; + string correlationID; + int requestTime; + string userAgent; + string tier; + boolean continuedOnThrottleOut; + int requestCount; + +}; + +public type EventDTO { + string streamId; + int timeStamp; + string metaData; + string correlationData; + string payloadData; +}; \ No newline at end of file diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/filters/analytics_request_filter.bal b/components/micro-gateway-core/src/main/ballerina/gateway/filters/analytics_request_filter.bal new file mode 100644 index 0000000000..b926007cc9 --- /dev/null +++ b/components/micro-gateway-core/src/main/ballerina/gateway/filters/analytics_request_filter.bal @@ -0,0 +1,34 @@ +// Copyright (c) WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/io; +import ballerina/http; +import ballerina/time; + + +public type AnalyticsRequestFilter object { + + public function filterRequest(http:Request request, http:FilterContext context) returns http:FilterResult { + http:FilterResult requestFilterResult; + AnalyticsRequestStream requestStream = generateRequestEvent(request, context); + EventDTO eventDto = generateEventFromRequest(requestStream); + eventStream.publish(eventDto); + requestFilterResult = { canProceed: true, statusCode: 200, message: "Analytics filter processed." }; + return requestFilterResult; + + } + +}; \ No newline at end of file diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/listeners/api_gateway_listener.bal b/components/micro-gateway-core/src/main/ballerina/gateway/listeners/api_gateway_listener.bal index 842a8a0bc4..0fd68d5b2f 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/listeners/api_gateway_listener.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/listeners/api_gateway_listener.bal @@ -146,6 +146,7 @@ function createAuthFiltersForSecureListener (EndpointConfiguration config) retur ThrottleFilter throttleFilter = new(); SubscriptionFilter subscriptionFilter = new; + AnalyticsRequestFilter analyticsFilter = new(); // use the ballerina in built scope(authz) filter cache:Cache authzCache = new(expiryTimeMillis = getConfigIntValue(CACHING_ID, TOKEN_CACHE_EXPIRY, @@ -157,10 +158,11 @@ function createAuthFiltersForSecureListener (EndpointConfiguration config) retur http:AuthzFilter authzFilter = new(authzHandler); // wraps the ballerina authz filter in new gateway filter OAuthzFilter authzFilterWrapper = new(authzFilter); - map defaultMap = {AUTHN_FILTER: true, AUTHZ_FILTER: true, SUBSCRIPTION_FILTER:true, THROTTLE_FILTER: true}; + map defaultMap = {AUTHN_FILTER: true, AUTHZ_FILTER: true, SUBSCRIPTION_FILTER:true, THROTTLE_FILTER: true, + ANALYTICS_FILTER: true}; map filterConfig = getConfigMapValue(FILTERS); if(lengthof filterConfig == 0) { - filterConfig = {AUTHN_FILTER: true, AUTHZ_FILTER: true, SUBSCRIPTION_FILTER:true, THROTTLE_FILTER: true}; + filterConfig = defaultMap; } int i=0; if(check filterConfig[AUTHN_FILTER]) { @@ -179,6 +181,10 @@ function createAuthFiltersForSecureListener (EndpointConfiguration config) retur authFilters[i] = < http:Filter> throttleFilter; i++; } + if(check filterConfig[ANALYTICS_FILTER]) { + authFilters[i] = < http:Filter> analyticsFilter; + i++; + } return authFilters; } diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal b/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal new file mode 100644 index 0000000000..e419918351 --- /dev/null +++ b/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal @@ -0,0 +1,64 @@ +import ballerina/io; +import ballerina/internal; +import ballerina/task; +import ballerina/math; +import ballerina/runtime; + + +int count; +task:Timer? timer; + + +function searchFilesToUpload() returns error? { + io:println("task starting"); + int cnt = 0; + internal:Path ex = new(""); + internal:Path[] ar = check ex.list(); + io:println(ar); + foreach pathEntry in ar { + io:println("starting the path entries"); + string fileName = pathEntry.getName(); + io:println(fileName); + if ( fileName.contains("zip")) { + http:Response response = multipartSender(pathEntry.getName()); + if (response.statusCode == 201) { + var result = pathEntry.delete(); + } else { + io:println("File uploading failed"); + } + cnt++; + } + } + if ( cnt == 0 ) { + io:println("No files to upload"); + error er = {message: "No files present to upload."}; + return er; + } else { + io:println("Files were uploaded"); + return (); + } +} + + +function timerTask() { + io:println("Timer task demo"); + (function() returns error?) onTriggerFunction = searchFilesToUpload; + + function(error) onErrorFunction = cleanup; + + timer = new task:Timer(onTriggerFunction, onErrorFunction, + 3000, delay = 2000); + + timer.start(); + +} + + +function cleanup(error e) { + count = count + 1; + io:println("Cleaning up..."); + io:println(count); +} + + +//future timerFtr = start timerTask(); \ No newline at end of file diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal b/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal new file mode 100644 index 0000000000..35d88f3dc5 --- /dev/null +++ b/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal @@ -0,0 +1,60 @@ +import ballerina/http; +import ballerina/log; +import ballerina/mime; + +stream filesToUpload; + +endpoint http:Client clientEP { + url: "https://localhost:9443" +}; + +function multipartSender(string file) returns http:Response { + io:println("starting uploading"); + mime:Entity filePart = new; + filePart.setContentDisposition(getContentDispositionForFormData("file")); + filePart.setFileAsEntityBody(file); + mime:Entity[] bodyParts = [filePart]; + http:Request request = new; + + request.addHeader("Authorization", getBasicAuthHeaderValue("admin", "admin")); + request.addHeader("FileName", file); + request.addHeader("Accept", "application/json"); + request.setBodyParts(bodyParts); + io:println(request); + var returnResponse = clientEP->post("/micro-gateway/v0.9/usage/upload-file",request); + + match returnResponse { + error err => { + http:Response response = new; + response.setPayload("Error occurred while sending multipart request!"); + response.statusCode = 500; + return response; + } + http:Response returnResult => { + io:println("successfully uploaded"); + io:println(returnResult); + return returnResult; + } + } +} + + +function getContentDispositionForFormData(string partName) + returns (mime:ContentDisposition) { + mime:ContentDisposition contentDisposition = new; + contentDisposition.name = partName; + contentDisposition.disposition = "form-data"; + return contentDisposition; +} + +function getBasicAuthHeaderValue(string username, string password) returns string { + string credentials = username + ":" + password; + match credentials.base64Encode() { + string encodedVal => { + return "Basic " + encodedVal; + } + error err => { + throw err; + } + } +} \ No newline at end of file diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal b/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal index aa02d245e0..304a1c329e 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal @@ -22,6 +22,7 @@ import ballerina/runtime; import ballerina/time; import ballerina/io; import ballerina/reflect; +import ballerina/internal; public function isResourceSecured(http:ListenerAuthConfig? resourceLevelAuthAnn, http:ListenerAuthConfig? serviceLevelAuthAnn) returns boolean { @@ -365,3 +366,45 @@ public function getAuthorizationHeader(reflect:annotationData[] annData) returns return authHeader; } + +public function getCurrentTime() returns int { + time:Time currentTime = time:currentTime(); + int time = currentTime.time; + return time; + +} + +public function rotateFile(string fileName) returns string|error { + int rotatingTimeStamp = getCurrentTime(); + string zipName = fileName + "." + rotatingTimeStamp + ".zip"; + internal:Path zipLocation = new(zipName); + internal:Path fileToZip = new(fileName); + match internal:compress(fileToZip, zipLocation) { + error compressError => { + io:println("Error occurred while compressing the file: " + compressError.message); + return compressError; + } + () => { + io:println("File compressed successfully"); + match fileToZip.delete() { + () => { + log:printInfo("File deleted successfully"); + } + error err => { + log:printError("Error occurred while deleting directory: " + fileName, err = err); + } + } + return zipName; + } + } +} + + + +function initStreamPublisher() { + io:println("subscribing events"); + eventStream.subscribe(writeEventToFile); + //filesToUpload.subscribe(multipartSender); +} + +future streamftr = start initStreamPublisher(); diff --git a/distribution/resources/conf/micro-gw.conf b/distribution/resources/conf/micro-gw.conf index bd02c79946..51ab8f7248 100644 --- a/distribution/resources/conf/micro-gw.conf +++ b/distribution/resources/conf/micro-gw.conf @@ -11,6 +11,7 @@ AUTHN_FILTER=true AUTHZ_FILTER=true SUBSCRIPTION_FILTER=true THROTTLE_FILTER=true +ANALYTICS_FILTER=true [authConfig] authorizationHeader="Authorization" diff --git a/tests/src/test/resources/confs/base.conf b/tests/src/test/resources/confs/base.conf index 8af2eaa515..ee5170aea8 100644 --- a/tests/src/test/resources/confs/base.conf +++ b/tests/src/test/resources/confs/base.conf @@ -12,6 +12,7 @@ AUTHN_FILTER=true AUTHZ_FILTER=true SUBSCRIPTION_FILTER=true THROTTLE_FILTER=true +ANALYTICS_FILTER=true [keyManager] serverUrl="http://localhost:9943" diff --git a/tests/src/test/resources/confs/default-test-config.conf b/tests/src/test/resources/confs/default-test-config.conf index 2508d4560c..2b2a41fba3 100644 --- a/tests/src/test/resources/confs/default-test-config.conf +++ b/tests/src/test/resources/confs/default-test-config.conf @@ -12,6 +12,7 @@ AUTHN_FILTER=true AUTHZ_FILTER=true SUBSCRIPTION_FILTER=true THROTTLE_FILTER=true +ANALYTICS_FILTER=true [keyManager] serverUrl="http://localhost:9443" diff --git a/tests/src/test/resources/confs/startup.conf b/tests/src/test/resources/confs/startup.conf index a34d357b44..25f9ebbffa 100644 --- a/tests/src/test/resources/confs/startup.conf +++ b/tests/src/test/resources/confs/startup.conf @@ -12,6 +12,7 @@ AUTHN_FILTER=true AUTHZ_FILTER=true SUBSCRIPTION_FILTER=true THROTTLE_FILTER=true +ANALYTICS_FILTER=true [keyManager] serverUrl="https://localhost:9443" From 1347f1bb8f6f6b2298ff75eea98c68a99e8a5ed2 Mon Sep 17 00:00:00 2001 From: dinusha92 Date: Fri, 15 Jun 2018 16:51:19 +0530 Subject: [PATCH 2/4] Adding logs for the analytics filter. --- .../main/ballerina/gateway/data_writer.bal | 13 +++--- .../filters/analytics_request_filter.bal | 9 ++++ .../ballerina/gateway/uploade_files_timer.bal | 41 ++++++------------- .../ballerina/gateway/usagedata_uploader.bal | 8 ++-- .../main/ballerina/gateway/utils/utils.bal | 11 +++-- 5 files changed, 36 insertions(+), 46 deletions(-) diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal b/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal index 8c0490c981..146902d538 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal @@ -108,27 +108,25 @@ function writeEventToFile(EventDTO eventDTO) { int currentTime = getCurrentTime(); if (initializingTime == 0 ) { initializingTime = getCurrentTime(); - io:println("initialTime: " + initializingTime); } if ( currentTime - initializingTime > 60*1000) { var result = rotateFile("api-usage-data.dat"); initializingTime = getCurrentTime(); match result { string name => { - io:println("File rotated successfully."); + log:printInfo("File rotated successfully."); } error err => { - io:println("Error occurred while rotating the file."); + log:printError("Error occurred while rotating the file: " + err.message); } } } io:ByteChannel channel = io:openFile("api-usage-data.dat", io:APPEND); io:CharacterChannel charChannel = new(channel, "UTF-8"); try { - io:println("writing to events to a file"); match charChannel.write(getEventData(eventDTO),0) { int numberOfCharsWritten => { - io:println(" No of characters written : " + numberOfCharsWritten); + log:printInfo("Event is getting written"); } error err => { throw err; @@ -138,11 +136,10 @@ function writeEventToFile(EventDTO eventDTO) { } finally { match charChannel.close() { error sourceCloseError => { - io:println("Error occured while closing the channel: " + - sourceCloseError.message); + log:printError("Error occured while closing the channel: " + sourceCloseError.message); } () => { - io:println("Source channel closed successfully."); + log:printInfo("Source channel closed successfully."); } } } diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/filters/analytics_request_filter.bal b/components/micro-gateway-core/src/main/ballerina/gateway/filters/analytics_request_filter.bal index b926007cc9..1dbb624d2e 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/filters/analytics_request_filter.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/filters/analytics_request_filter.bal @@ -22,6 +22,15 @@ import ballerina/time; public type AnalyticsRequestFilter object { public function filterRequest(http:Request request, http:FilterContext context) returns http:FilterResult { + match context.attributes[FILTER_FAILED] { + boolean failed => { + if (failed) { + return createFilterResult(true, 200, "Skipping filter due to parent filter has returned false"); + } + } error err => { + //Nothing to handle + } + } http:FilterResult requestFilterResult; AnalyticsRequestStream requestStream = generateRequestEvent(request, context); EventDTO eventDto = generateEventFromRequest(requestStream); diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal b/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal index e419918351..7b16b93685 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal @@ -3,62 +3,47 @@ import ballerina/internal; import ballerina/task; import ballerina/math; import ballerina/runtime; +import ballerina/log; - -int count; task:Timer? timer; +future timerFtr = start timerTask(); function searchFilesToUpload() returns error? { - io:println("task starting"); int cnt = 0; internal:Path ex = new(""); - internal:Path[] ar = check ex.list(); - io:println(ar); - foreach pathEntry in ar { - io:println("starting the path entries"); + internal:Path[] pathList = check ex.list(); + foreach pathEntry in pathList { string fileName = pathEntry.getName(); - io:println(fileName); if ( fileName.contains("zip")) { http:Response response = multipartSender(pathEntry.getName()); if (response.statusCode == 201) { var result = pathEntry.delete(); } else { - io:println("File uploading failed"); + log:printError("Error occurred while uploading file"); } cnt++; } } if ( cnt == 0 ) { - io:println("No files to upload"); error er = {message: "No files present to upload."}; return er; } else { - io:println("Files were uploaded"); - return (); + return (); } } -function timerTask() { - io:println("Timer task demo"); - (function() returns error?) onTriggerFunction = searchFilesToUpload; - - function(error) onErrorFunction = cleanup; - - timer = new task:Timer(onTriggerFunction, onErrorFunction, - 3000, delay = 2000); - timer.start(); +function informError(error e) { + log:printInfo("File were not present to upload yet:" + e.message); } - -function cleanup(error e) { - count = count + 1; - io:println("Cleaning up..."); - io:println(count); +function timerTask() { + (function() returns error?) onTriggerFunction = searchFilesToUpload; + function(error) onErrorFunction = informError; + timer = new task:Timer(onTriggerFunction, onErrorFunction, 60000, delay = 1000); + timer.start(); } - -//future timerFtr = start timerTask(); \ No newline at end of file diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal b/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal index 35d88f3dc5..69c4ed5b5d 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal @@ -9,7 +9,6 @@ endpoint http:Client clientEP { }; function multipartSender(string file) returns http:Response { - io:println("starting uploading"); mime:Entity filePart = new; filePart.setContentDisposition(getContentDispositionForFormData("file")); filePart.setFileAsEntityBody(file); @@ -26,13 +25,14 @@ function multipartSender(string file) returns http:Response { match returnResponse { error err => { http:Response response = new; - response.setPayload("Error occurred while sending multipart request!"); + string errorMessage = "Error occurred while sending multipart request: SC " + 500; + response.setPayload(errorMessage); response.statusCode = 500; + log:printError(errorMessage); return response; } http:Response returnResult => { - io:println("successfully uploaded"); - io:println(returnResult); + log:printInfo("successfully uploaded the file: " + file); return returnResult; } } diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal b/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal index 304a1c329e..87d85b4db0 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal @@ -381,17 +381,17 @@ public function rotateFile(string fileName) returns string|error { internal:Path fileToZip = new(fileName); match internal:compress(fileToZip, zipLocation) { error compressError => { - io:println("Error occurred while compressing the file: " + compressError.message); + log:printError("Error occurred while compressing the file: " + compressError.message); return compressError; } () => { - io:println("File compressed successfully"); + log:printInfo("File compressed successfully"); match fileToZip.delete() { () => { - log:printInfo("File deleted successfully"); + log:printInfo("Existed file deleted successfully"); } error err => { - log:printError("Error occurred while deleting directory: " + fileName, err = err); + log:printError("Error occurred while deleting file: " + fileName, err = err); } } return zipName; @@ -402,9 +402,8 @@ public function rotateFile(string fileName) returns string|error { function initStreamPublisher() { - io:println("subscribing events"); + log:printInfo("Subscribing writing method to event stream"); eventStream.subscribe(writeEventToFile); - //filesToUpload.subscribe(multipartSender); } future streamftr = start initStreamPublisher(); From a5f5dd9fcf5269175b3c75b9ac466923c40f7183 Mon Sep 17 00:00:00 2001 From: dinusha92 Date: Fri, 15 Jun 2018 16:52:07 +0530 Subject: [PATCH 3/4] Adding logs for the analytics filter. --- .../src/main/ballerina/gateway/data_writer.bal | 2 +- .../src/main/ballerina/gateway/uploade_files_timer.bal | 5 +---- .../src/main/ballerina/gateway/utils/utils.bal | 2 -- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal b/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal index 146902d538..57014f6ab3 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal @@ -109,7 +109,7 @@ function writeEventToFile(EventDTO eventDTO) { if (initializingTime == 0 ) { initializingTime = getCurrentTime(); } - if ( currentTime - initializingTime > 60*1000) { + if ( currentTime - initializingTime > 60*1000*10) { var result = rotateFile("api-usage-data.dat"); initializingTime = getCurrentTime(); match result { diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal b/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal index 7b16b93685..5de147db54 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/uploade_files_timer.bal @@ -33,9 +33,6 @@ function searchFilesToUpload() returns error? { } } - - - function informError(error e) { log:printInfo("File were not present to upload yet:" + e.message); } @@ -43,7 +40,7 @@ function informError(error e) { function timerTask() { (function() returns error?) onTriggerFunction = searchFilesToUpload; function(error) onErrorFunction = informError; - timer = new task:Timer(onTriggerFunction, onErrorFunction, 60000, delay = 1000); + timer = new task:Timer(onTriggerFunction, onErrorFunction, 300000, delay = 5000); timer.start(); } diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal b/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal index 87d85b4db0..2c115524aa 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal @@ -399,8 +399,6 @@ public function rotateFile(string fileName) returns string|error { } } - - function initStreamPublisher() { log:printInfo("Subscribing writing method to event stream"); eventStream.subscribe(writeEventToFile); From 1034d30cfadd5cb951b0e80e3af6f59980bdc6b4 Mon Sep 17 00:00:00 2001 From: dinusha92 Date: Fri, 15 Jun 2018 17:47:56 +0530 Subject: [PATCH 4/4] Fixing review changes. --- .../src/main/ballerina/gateway/data_writer.bal | 15 +++++++-------- .../main/ballerina/gateway/usagedata_uploader.bal | 3 +-- .../src/main/ballerina/gateway/utils/utils.bal | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal b/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal index 57014f6ab3..a06138ff14 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/data_writer.bal @@ -56,6 +56,8 @@ function generateRequestEvent(http:Request request, http:FilterContext context) requestStream.applicationOwner = authContext.subscriber; requestStream.tier = authContext.tier; requestStream.continuedOnThrottleOut = !authContext.stopOnQuotaReach; + requestStream.apiPublisher = authContext.apiPublisher; + requestStream.keyType = authContext.keyType; } requestStream.userAgent = request.userAgent; requestStream.clientIp = getClientIp(request); @@ -67,14 +69,11 @@ function generateRequestEvent(http:Request request, http:FilterContext context) //todo: hostname verify requestStream.hostName = "localhost"; //todo:get the host properl - //todo:check if apiPublisher comes in keyValidation context - requestStream.apiPublisher = "admin@carbon.super"; //todo:get publisher properly requestStream.method = request.method; //todo:verify resourcepath and resourceTemplate requestStream.resourceTemplate = "resourcePath"; requestStream.resourcePath = getResourceConfigAnnotation (reflect:getResourceAnnotations(context.serviceType, context.resourceName)).path; - requestStream.keyType = "PRODUCTION"; //todo:random uuid taken from throttle filter requestStream.correlationID = "71c60dbd-b2be-408d-9e2e-4fd11f60cfbc"; requestStream.requestCount = 1; @@ -106,10 +105,10 @@ function getEventData(EventDTO dto) returns string { function writeEventToFile(EventDTO eventDTO) { //todo:batch events to reduce IO cost int currentTime = getCurrentTime(); - if (initializingTime == 0 ) { + if (initializingTime == 0) { initializingTime = getCurrentTime(); } - if ( currentTime - initializingTime > 60*1000*10) { + if (currentTime - initializingTime > 60*1000*10) { var result = rotateFile("api-usage-data.dat"); initializingTime = getCurrentTime(); match result { @@ -117,7 +116,7 @@ function writeEventToFile(EventDTO eventDTO) { log:printInfo("File rotated successfully."); } error err => { - log:printError("Error occurred while rotating the file: " + err.message); + log:printError("Error occurred while rotating the file: ", err = err); } } } @@ -136,10 +135,10 @@ function writeEventToFile(EventDTO eventDTO) { } finally { match charChannel.close() { error sourceCloseError => { - log:printError("Error occured while closing the channel: " + sourceCloseError.message); + log:printError("Error occured while closing the channel: ", err = sourceCloseError); } () => { - log:printInfo("Source channel closed successfully."); + log:printDebug("Source channel closed successfully."); } } } diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal b/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal index 69c4ed5b5d..b228eab4b8 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/usagedata_uploader.bal @@ -19,7 +19,6 @@ function multipartSender(string file) returns http:Response { request.addHeader("FileName", file); request.addHeader("Accept", "application/json"); request.setBodyParts(bodyParts); - io:println(request); var returnResponse = clientEP->post("/micro-gateway/v0.9/usage/upload-file",request); match returnResponse { @@ -28,7 +27,7 @@ function multipartSender(string file) returns http:Response { string errorMessage = "Error occurred while sending multipart request: SC " + 500; response.setPayload(errorMessage); response.statusCode = 500; - log:printError(errorMessage); + log:printError(errorMessage, err = err); return response; } http:Response returnResult => { diff --git a/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal b/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal index 2c115524aa..89b4759284 100644 --- a/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal +++ b/components/micro-gateway-core/src/main/ballerina/gateway/utils/utils.bal @@ -381,7 +381,7 @@ public function rotateFile(string fileName) returns string|error { internal:Path fileToZip = new(fileName); match internal:compress(fileToZip, zipLocation) { error compressError => { - log:printError("Error occurred while compressing the file: " + compressError.message); + log:printError("Error occurred while compressing the file: ", err = compressError); return compressError; } () => {