Skip to content

Commit

Permalink
Merge pull request #62 from dinusha92/master
Browse files Browse the repository at this point in the history
Adding analytics filter for microgateway
  • Loading branch information
Rajith90 authored Jun 15, 2018
2 parents 002c633 + 1034d30 commit 1fc0ed0
Show file tree
Hide file tree
Showing 12 changed files with 399 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
@final public string AUTHZ_FILTER = "AUTHZ_FILTER";
@final public string SUBSCRIPTION_FILTER = "SUBSCRIPTION_FILTER";
@final public string THROTTLE_FILTER = "THROTTLE_FILTER";
@final public string ANALYTICS_FILTER = "ANALYTICS_FILTER";



Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/io;
import ballerina/http;

@final
public string KVT = "--KVS--";
public string EVS = "--EVT--";
public string OBJ = "--OBJ--";

int initializingTime = 0;
//streams associated with DTOs
stream<EventDTO> eventStream;

function getPayload(AnalyticsRequestStream requestStream) returns (string) {
return requestStream.consumerKey + OBJ + requestStream.context + OBJ + requestStream.apiVersion + OBJ +
requestStream.api + OBJ + requestStream.resourcePath + OBJ +requestStream.resourceTemplate + OBJ +
requestStream.method + OBJ + requestStream.apiVersion + OBJ + requestStream.requestCount + OBJ +
requestStream.requestTime + OBJ + requestStream.username + OBJ + requestStream.tenantDomain + OBJ +
requestStream.hostName + OBJ + requestStream.apiPublisher + OBJ + requestStream.applicationName + OBJ
+ requestStream.applicationId + OBJ + requestStream.userAgent + OBJ + requestStream.tier +
OBJ + requestStream.continuedOnThrottleOut + OBJ + requestStream.clientIp + requestStream.applicationOwner;
}

function getMetaData(AnalyticsRequestStream requestStream) returns (string) {
return "{\"keyType\":\"" + requestStream.keyType + "\",\"correlationID\":\"" + requestStream.correlationID + "\"}";
}

function getCorrelationData(AnalyticsRequestStream request) returns (string) {
return request.correlationID;
}

function generateRequestEvent(http:Request request, http:FilterContext context) returns (AnalyticsRequestStream){
//ready authentication context to get values
AnalyticsRequestStream requestStream;
AuthenticationContext authContext = check <AuthenticationContext>context.attributes[AUTHENTICATION_CONTEXT];
if ( authContext != null) {
requestStream.consumerKey = authContext.consumerKey;
requestStream.username = authContext.username;
requestStream.applicationId = authContext.applicationId;
requestStream.applicationName = authContext.applicationName;
requestStream.applicationOwner = authContext.subscriber;
requestStream.tier = authContext.tier;
requestStream.continuedOnThrottleOut = !authContext.stopOnQuotaReach;
requestStream.apiPublisher = authContext.apiPublisher;
requestStream.keyType = authContext.keyType;
}
requestStream.userAgent = request.userAgent;
requestStream.clientIp = getClientIp(request);
requestStream.context = getContext(context);
requestStream.tenantDomain = getTenantDomain(context);
requestStream.api = getApiName(context);
requestStream.apiVersion = getAPIDetailsFromServiceAnnotation(reflect:getServiceAnnotations(context.serviceType)).
apiVersion;

//todo: hostname verify
requestStream.hostName = "localhost"; //todo:get the host properl
requestStream.method = request.method;
//todo:verify resourcepath and resourceTemplate
requestStream.resourceTemplate = "resourcePath";
requestStream.resourcePath = getResourceConfigAnnotation
(reflect:getResourceAnnotations(context.serviceType, context.resourceName)).path;
//todo:random uuid taken from throttle filter
requestStream.correlationID = "71c60dbd-b2be-408d-9e2e-4fd11f60cfbc";
requestStream.requestCount = 1;
//todo:get request time from authentication filter
time:Time time = time:currentTime();
int currentTimeMills = time.time;
requestStream.requestTime = currentTimeMills;
return requestStream;

}

function generateEventFromRequest(AnalyticsRequestStream requestStream) returns EventDTO {
EventDTO eventDTO;
eventDTO.streamId = "org.wso2.apimgt.statistics.request:1.1.0";
eventDTO.timeStamp = getCurrentTime();
eventDTO.metaData = getMetaData(requestStream);
eventDTO.correlationData = getCorrelationData(requestStream);
eventDTO.payloadData = getPayload(requestStream);
return eventDTO;
}

function getEventData(EventDTO dto) returns string {
string output = "streamId" + KVT + dto.streamId + EVS + "timestamp" + KVT + dto.timeStamp + EVS +
"metadata" + KVT + dto.metaData + EVS + "correlationData" + KVT + dto.correlationData + EVS +
"payLoadData" + KVT + dto.payloadData + "\n";
return output;
}

function writeEventToFile(EventDTO eventDTO) {
//todo:batch events to reduce IO cost
int currentTime = getCurrentTime();
if (initializingTime == 0) {
initializingTime = getCurrentTime();
}
if (currentTime - initializingTime > 60*1000*10) {
var result = rotateFile("api-usage-data.dat");
initializingTime = getCurrentTime();
match result {
string name => {
log:printInfo("File rotated successfully.");
}
error err => {
log:printError("Error occurred while rotating the file: ", err = err);
}
}
}
io:ByteChannel channel = io:openFile("api-usage-data.dat", io:APPEND);
io:CharacterChannel charChannel = new(channel, "UTF-8");
try {
match charChannel.write(getEventData(eventDTO),0) {
int numberOfCharsWritten => {
log:printInfo("Event is getting written");
}
error err => {
throw err;
}
}

} finally {
match charChannel.close() {
error sourceCloseError => {
log:printError("Error occured while closing the channel: ", err = sourceCloseError);
}
() => {
log:printDebug("Source channel closed successfully.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import ballerina/io;

public type AnalyticsRequestStream {
string consumerKey;
string username;
string tenantDomain;
string context;
string apiVersion;
string api;
string resourcePath;
string resourceTemplate;
string method;
string hostName;
string apiPublisher;
string applicationName;
string applicationId;
string protocol;
string clientIp;
string applicationOwner;
string keyType;
string correlationID;
int requestTime;
string userAgent;
string tier;
boolean continuedOnThrottleOut;
int requestCount;

};

public type EventDTO {
string streamId;
int timeStamp;
string metaData;
string correlationData;
string payloadData;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/io;
import ballerina/http;
import ballerina/time;


public type AnalyticsRequestFilter object {

public function filterRequest(http:Request request, http:FilterContext context) returns http:FilterResult {
match <boolean> context.attributes[FILTER_FAILED] {
boolean failed => {
if (failed) {
return createFilterResult(true, 200, "Skipping filter due to parent filter has returned false");
}
} error err => {
//Nothing to handle
}
}
http:FilterResult requestFilterResult;
AnalyticsRequestStream requestStream = generateRequestEvent(request, context);
EventDTO eventDto = generateEventFromRequest(requestStream);
eventStream.publish(eventDto);
requestFilterResult = { canProceed: true, statusCode: 200, message: "Analytics filter processed." };
return requestFilterResult;

}

};
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ function createAuthFiltersForSecureListener (EndpointConfiguration config) retur

ThrottleFilter throttleFilter = new();
SubscriptionFilter subscriptionFilter = new;
AnalyticsRequestFilter analyticsFilter = new();

// use the ballerina in built scope(authz) filter
cache:Cache authzCache = new(expiryTimeMillis = getConfigIntValue(CACHING_ID, TOKEN_CACHE_EXPIRY,
Expand All @@ -157,10 +158,11 @@ function createAuthFiltersForSecureListener (EndpointConfiguration config) retur
http:AuthzFilter authzFilter = new(authzHandler);
// wraps the ballerina authz filter in new gateway filter
OAuthzFilter authzFilterWrapper = new(authzFilter);
map defaultMap = {AUTHN_FILTER: true, AUTHZ_FILTER: true, SUBSCRIPTION_FILTER:true, THROTTLE_FILTER: true};
map defaultMap = {AUTHN_FILTER: true, AUTHZ_FILTER: true, SUBSCRIPTION_FILTER:true, THROTTLE_FILTER: true,
ANALYTICS_FILTER: true};
map filterConfig = getConfigMapValue(FILTERS);
if(lengthof filterConfig == 0) {
filterConfig = {AUTHN_FILTER: true, AUTHZ_FILTER: true, SUBSCRIPTION_FILTER:true, THROTTLE_FILTER: true};
filterConfig = defaultMap;
}
int i=0;
if(check <boolean> filterConfig[AUTHN_FILTER]) {
Expand All @@ -179,6 +181,10 @@ function createAuthFiltersForSecureListener (EndpointConfiguration config) retur
authFilters[i] = < http:Filter> throttleFilter;
i++;
}
if(check <boolean> filterConfig[ANALYTICS_FILTER]) {
authFilters[i] = < http:Filter> analyticsFilter;
i++;
}

return authFilters;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import ballerina/io;
import ballerina/internal;
import ballerina/task;
import ballerina/math;
import ballerina/runtime;
import ballerina/log;

task:Timer? timer;

future timerFtr = start timerTask();

function searchFilesToUpload() returns error? {
int cnt = 0;
internal:Path ex = new("");
internal:Path[] pathList = check ex.list();
foreach pathEntry in pathList {
string fileName = pathEntry.getName();
if ( fileName.contains("zip")) {
http:Response response = multipartSender(pathEntry.getName());
if (response.statusCode == 201) {
var result = pathEntry.delete();
} else {
log:printError("Error occurred while uploading file");
}
cnt++;
}
}
if ( cnt == 0 ) {
error er = {message: "No files present to upload."};
return er;
} else {
return ();
}
}

function informError(error e) {
log:printInfo("File were not present to upload yet:" + e.message);
}

function timerTask() {
(function() returns error?) onTriggerFunction = searchFilesToUpload;
function(error) onErrorFunction = informError;
timer = new task:Timer(onTriggerFunction, onErrorFunction, 300000, delay = 5000);
timer.start();
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import ballerina/http;
import ballerina/log;
import ballerina/mime;

stream<string> filesToUpload;

endpoint http:Client clientEP {
url: "https://localhost:9443"
};

function multipartSender(string file) returns http:Response {
mime:Entity filePart = new;
filePart.setContentDisposition(getContentDispositionForFormData("file"));
filePart.setFileAsEntityBody(file);
mime:Entity[] bodyParts = [filePart];
http:Request request = new;

request.addHeader("Authorization", getBasicAuthHeaderValue("admin", "admin"));
request.addHeader("FileName", file);
request.addHeader("Accept", "application/json");
request.setBodyParts(bodyParts);
var returnResponse = clientEP->post("/micro-gateway/v0.9/usage/upload-file",request);

match returnResponse {
error err => {
http:Response response = new;
string errorMessage = "Error occurred while sending multipart request: SC " + 500;
response.setPayload(errorMessage);
response.statusCode = 500;
log:printError(errorMessage, err = err);
return response;
}
http:Response returnResult => {
log:printInfo("successfully uploaded the file: " + file);
return returnResult;
}
}
}


function getContentDispositionForFormData(string partName)
returns (mime:ContentDisposition) {
mime:ContentDisposition contentDisposition = new;
contentDisposition.name = partName;
contentDisposition.disposition = "form-data";
return contentDisposition;
}

function getBasicAuthHeaderValue(string username, string password) returns string {
string credentials = username + ":" + password;
match credentials.base64Encode() {
string encodedVal => {
return "Basic " + encodedVal;
}
error err => {
throw err;
}
}
}
Loading

0 comments on commit 1fc0ed0

Please sign in to comment.