Skip to content

Commit

Permalink
Merge pull request #96 from Countly/Storage
Browse files Browse the repository at this point in the history
Added Storage Module
  • Loading branch information
turtledreams authored Aug 15, 2024
2 parents 9f72459 + 4b81696 commit f57f741
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 307 deletions.
146 changes: 13 additions & 133 deletions lib/countly-bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
* server.add_request({begin_session:1, metrics:{_os:"Linux"}, device_id:"users_device_id", events:[{key:"Test", count:1}]});
*/

var fs = require("fs");
var path = require("path");
var http = require("http");
var https = require("https");
var cluster = require("cluster");
var cc = require("./countly-common");
var BulkUser = require("./countly-bulk-user");
var CountlyStorage = require("./countly-storage");

/**
* @lends module:lib/countly-bulk
Expand All @@ -40,7 +39,7 @@ var BulkUser = require("./countly-bulk-user");
* @param {number} [conf.max_events=100] - maximum amount of events to send in one batch
* @param {boolean} [conf.persist_queue=false] - persistently store queue until processed, default is false if you want to keep queue in memory and process all in one process run
* @param {boolean} [conf.force_post=false] - force using post method for all requests
* @param {string} [conf.storage_path="../bulk_data/"] - where SDK would store data, including id, queues, etc
* @param {string} [conf.storage_path] - where SDK would store data, including id, queues, etc
* @param {string} [conf.http_options=] - function to get http options by reference and overwrite them, before running each request
* @param {number} [conf.max_key_length=128] - maximum size of all string keys
* @param {number} [conf.max_value_size=256] - maximum size of all values in our key-value pairs (Except "picture" field, that has a limit of 4096 chars)
Expand Down Expand Up @@ -73,7 +72,6 @@ function CountlyBulk(conf) {
var maxBreadcrumbCount = 100;
var maxStackTraceLinesPerThread = 30;
var maxStackTraceLineLength = 200;
var __data = {};

cc.debugBulk = conf.debug || false;
if (!conf.app_key) {
Expand All @@ -96,7 +94,6 @@ function CountlyBulk(conf) {
conf.max_events = conf.max_events || 100;
conf.force_post = conf.force_post || false;
conf.persist_queue = conf.persist_queue || false;
conf.storage_path = conf.storage_path || "../bulk_data/";
conf.http_options = conf.http_options || null;
conf.maxKeyLength = conf.max_key_length || maxKeyLength;
conf.maxValueSize = conf.max_value_size || maxValueSize;
Expand All @@ -105,19 +102,7 @@ function CountlyBulk(conf) {
conf.maxStackTraceLinesPerThread = conf.max_stack_trace_lines_per_thread || maxStackTraceLinesPerThread;
conf.maxStackTraceLineLength = conf.max_stack_trace_line_length || maxStackTraceLineLength;

var mainDir = path.resolve(__dirname, conf.storage_path);
if (conf.persist_queue) {
try {
if (!fs.existsSync(mainDir)) {
fs.mkdirSync(mainDir);
}
}
catch (ex) {
// problem creating directory
// eslint-disable-next-line no-console
cc.log(cc.logLevelEnums.ERROR, "CountlyBulk, Failed white creating the '/data' directory. Error: ", ex.stack);
}
}
CountlyStorage.setBulkDataPath(conf.storage_path, conf.persist_queue);

this.conf = conf;
/**
Expand Down Expand Up @@ -157,7 +142,7 @@ function CountlyBulk(conf) {

requestQueue.push(query);
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_request, Adding request to the queue.");
storeSet("cly_req_queue", requestQueue);
CountlyStorage.storeSet("cly_req_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -205,7 +190,7 @@ function CountlyBulk(conf) {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_bulk_request, adding the request into queue.");
requestQueue.push(query);
}
storeSet("cly_req_queue", requestQueue);
CountlyStorage.storeSet("cly_req_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_bulk_request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -260,7 +245,7 @@ function CountlyBulk(conf) {
eventQueue[device_id] = [];
}
eventQueue[device_id].push(e);
storeSet("cly_bulk_event", eventQueue);
CountlyStorage.storeSet("cly_bulk_event", eventQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, `CountlyBulk add_event, Sending message to the parent process. Adding event: [${event.key}].`);
Expand Down Expand Up @@ -358,7 +343,7 @@ function CountlyBulk(conf) {
*/
function toBulkRequestQueue(bulkRequest) {
bulkQueue.push(bulkRequest);
storeSet("cly_bulk_queue", bulkQueue);
CountlyStorage.storeSet("cly_bulk_queue", bulkQueue);
}
var self = this;

Expand All @@ -384,7 +369,7 @@ function CountlyBulk(conf) {
}
if (eventChanges) {
isEmpty = false;
storeSet("cly_bulk_event", eventQueue);
CountlyStorage.storeSet("cly_bulk_event", eventQueue);
}

// process request queue into bulk requests
Expand All @@ -398,7 +383,7 @@ function CountlyBulk(conf) {
var requests = requestQueue.splice(0, conf.bulk_size);
toBulkRequestQueue({ app_key: conf.app_key, requests: JSON.stringify(requests) });
}
storeSet("cly_req_queue", requestQueue);
CountlyStorage.storeSet("cly_req_queue", requestQueue);
}

// process bulk request queue
Expand All @@ -413,7 +398,7 @@ function CountlyBulk(conf) {
bulkQueue.unshift(res);
failTimeout = cc.getTimestamp() + conf.fail_timeout;
}
storeSet("cly_bulk_queue", bulkQueue);
CountlyStorage.storeSet("cly_bulk_queue", bulkQueue);
readyToProcess = true;
}, "heartBeat", false);
}
Expand Down Expand Up @@ -594,111 +579,6 @@ function CountlyBulk(conf) {
}
}

/**
* Read value from file
* @param {String} key - key for file
* @returns {varies} value in file
*/
var readFile = function(key) {
var data;
if (conf.persist_queue) {
var dir = path.resolve(__dirname, `${conf.storage_path}__${key}.json`);

// try reading data file
try {
data = fs.readFileSync(dir);
}
catch (ex) {
// there was no file, probably new init
cc.log(cc.logLevelEnums.ERROR, "CountlyBulk readFile, Nothing to read. Might be first init. Error: ", ex);
data = null;
}

try {
// trying to parse json string
data = JSON.parse(data);
}
catch (ex) {
// problem parsing, corrupted file?
cc.log(cc.logLevelEnums.ERROR, "CountlyBulk readFile, Problem while parsing. Error:", ex.stack);
// backup corrupted file data
fs.writeFile(path.resolve(__dirname, `${conf.storage_path}__${key}.${cc.getTimestamp()}${Math.random()}.json`), data, () => {});
// start with new clean object
data = null;
}
}
return data;
};

var asyncWriteLock = false;
var asyncWriteQueue = [];

/**
* Write to file and process queue while in asyncWriteLock
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
var writeFile = function(key, value, callback) {
var ob = {};
ob[key] = value;
var dir = path.resolve(__dirname, `${conf.storage_path}__${key}.json`);
fs.writeFile(dir, JSON.stringify(ob), (err) => {
if (err) {
// eslint-disable-next-line no-console
cc.log(cc.logLevelEnums.ERROR, "CountlyBulk writeFile, Problem while writing. Error:", err);
}
if (typeof callback === "function") {
callback(err);
}
if (asyncWriteQueue.length) {
setTimeout(() => {
var arr = asyncWriteQueue.shift();
writeFile(arr[0], arr[1], arr[2]);
}, 0);
}
else {
asyncWriteLock = false;
}
});
};

/**
* Save value in storage
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
var storeSet = function(key, value, callback) {
__data[key] = value;
if (!asyncWriteLock) {
asyncWriteLock = true;
writeFile(key, value, callback);
}
else {
asyncWriteQueue.push([key, value, callback]);
}
};

/**
* Get value from storage
* @param {String} key - key of value to get
* @param {varies} def - default value to use if not set
* @returns {varies} value for the key
*/
var storeGet = function(key, def) {
if (typeof __data[key] === "undefined") {
var ob = readFile(key);
if (!ob) {
__data[key] = def;
}
else {
__data[key] = ob[key];
}
}
return __data[key];
};

// listen to current workers
if (cluster.workers) {
for (var id in cluster.workers) {
Expand All @@ -711,9 +591,9 @@ function CountlyBulk(conf) {
worker.on("message", handleWorkerMessage);
});

var requestQueue = storeGet("cly_req_queue", []);
var eventQueue = storeGet("cly_bulk_event", {});
var bulkQueue = storeGet("cly_bulk_queue", []);
var requestQueue = CountlyStorage.storeGet("cly_req_queue", []);
var eventQueue = CountlyStorage.storeGet("cly_bulk_event", {});
var bulkQueue = CountlyStorage.storeGet("cly_bulk_queue", []);
}

module.exports = CountlyBulk;
Loading

0 comments on commit f57f741

Please sign in to comment.