Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Only Storage Option #102

Merged
merged 14 commits into from
Oct 3, 2024
25 changes: 14 additions & 11 deletions lib/countly-bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var cc = require("./countly-common");
var BulkUser = require("./countly-bulk-user");
var CountlyStorage = require("./countly-storage");

var storageMethod;

/**
* @lends module:lib/countly-bulk
* Initialize CountlyBulk server object
Expand Down Expand Up @@ -102,7 +104,8 @@ function CountlyBulk(conf) {
conf.maxStackTraceLinesPerThread = conf.max_stack_trace_lines_per_thread || maxStackTraceLinesPerThread;
conf.maxStackTraceLineLength = conf.max_stack_trace_line_length || maxStackTraceLineLength;

CountlyStorage.setStoragePath(conf.storage_path, true, conf.persist_queue);
// config time memory only option will be added
storageMethod = CountlyStorage.setStorage(conf.storage_path, false, true, conf.persist_queue);

this.conf = conf;
/**
Expand Down Expand Up @@ -142,7 +145,7 @@ function CountlyBulk(conf) {

requestQueue.push(query);
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_request, Adding request to the queue.");
CountlyStorage.storeSet("cly_req_queue", requestQueue);
storageMethod.storeSet("cly_req_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -190,7 +193,7 @@ function CountlyBulk(conf) {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_bulk_request, adding the request into queue.");
requestQueue.push(query);
}
CountlyStorage.storeSet("cly_req_queue", requestQueue);
storageMethod.storeSet("cly_req_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_bulk_request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -245,7 +248,7 @@ function CountlyBulk(conf) {
eventQueue[device_id] = [];
}
eventQueue[device_id].push(e);
CountlyStorage.storeSet("cly_bulk_event", eventQueue);
storageMethod.storeSet("cly_bulk_event", eventQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, `CountlyBulk add_event, Sending message to the parent process. Adding event: [${event.key}].`);
Expand Down Expand Up @@ -343,7 +346,7 @@ function CountlyBulk(conf) {
*/
function toBulkRequestQueue(bulkRequest) {
bulkQueue.push(bulkRequest);
CountlyStorage.storeSet("cly_bulk_queue", bulkQueue);
storageMethod.storeSet("cly_bulk_queue", bulkQueue);
}
var self = this;

Expand All @@ -369,7 +372,7 @@ function CountlyBulk(conf) {
}
if (eventChanges) {
isEmpty = false;
CountlyStorage.storeSet("cly_bulk_event", eventQueue);
storageMethod.storeSet("cly_bulk_event", eventQueue);
}

// process request queue into bulk requests
Expand All @@ -383,7 +386,7 @@ function CountlyBulk(conf) {
var requests = requestQueue.splice(0, conf.bulk_size);
toBulkRequestQueue({ app_key: conf.app_key, requests: JSON.stringify(requests) });
}
CountlyStorage.storeSet("cly_req_queue", requestQueue);
storageMethod.storeSet("cly_req_queue", requestQueue);
}

// process bulk request queue
Expand All @@ -398,7 +401,7 @@ function CountlyBulk(conf) {
bulkQueue.unshift(res);
failTimeout = cc.getTimestamp() + conf.fail_timeout;
}
CountlyStorage.storeSet("cly_bulk_queue", bulkQueue);
storageMethod.storeSet("cly_bulk_queue", bulkQueue);
readyToProcess = true;
}, "heartBeat", false);
}
Expand Down Expand Up @@ -591,9 +594,9 @@ function CountlyBulk(conf) {
worker.on("message", handleWorkerMessage);
});

var requestQueue = CountlyStorage.storeGet("cly_req_queue", []);
var eventQueue = CountlyStorage.storeGet("cly_bulk_event", {});
var bulkQueue = CountlyStorage.storeGet("cly_bulk_queue", []);
var requestQueue = storageMethod.storeGet("cly_req_queue", []);
var eventQueue = storageMethod.storeGet("cly_bulk_event", {});
var bulkQueue = storageMethod.storeGet("cly_bulk_queue", []);
}

module.exports = CountlyBulk;
167 changes: 116 additions & 51 deletions lib/countly-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,121 @@ var defaultPath = "../data/"; // Default path
var defaultBulkPath = "../bulk_data/"; // Default path
var asyncWriteLock = false;
var asyncWriteQueue = [];
let storageMethod = {};

// Memory-only storage methods
const memoryStorage = {
/**
* Save value in memory
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
storeSet(key, value, callback) {
__data[key] = value;
if (typeof callback === "function") {
callback(null); // No errors for memory storage
}
},
/**
* Get value from memory
* @param {String} key - key of value to get
* @param {varies} def - default value to use if not set
* @returns {varies} value for the key
*/
storeGet(key, def) {
cc.log(cc.logLevelEnums.DEBUG, `storeGet, Fetching item from memory with key: [${key}].`);
return typeof __data[key] !== "undefined" ? __data[key] : def;
},
/**
* Remove value from memory
* @param {String} key - key of value to remove
*/
storeRemove(key) {
delete __data[key];
},
};

// File storage methods
const fileStorage = {
/**
* Save value in storage
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
storeSet(key, value, callback) {
__data[key] = value;
if (!asyncWriteLock) {
asyncWriteLock = true;
writeFile(key, value, callback);
}
else {
asyncWriteQueue.push([key, value, callback]);
}
},
/**
* Get value from storage
* @param {String} key - key of value to get
* @param {varies} def - default value to use if not set
* @returns {varies} value for the key
*/
storeGet(key, def) {
cc.log(cc.logLevelEnums.DEBUG, `storeGet, Fetching item from storage with key: [${key}].`);
if (typeof __data[key] === "undefined") {
var ob = readFile(key);
var obLen;
// check if the 'read object' is empty or not
try {
obLen = Object.keys(ob).length;
}
catch (error) {
// if we can not even asses length set it to 0 so we can return the default value
obLen = 0;
}

// if empty or falsy set default value
if (!ob || obLen === 0) {
__data[key] = def;
}
// else set the value read file has
else {
__data[key] = ob[key];
}
}
return __data[key];
},
storeRemove(key) {
delete __data[key];
var dir = path.resolve(__dirname, `${getStoragePath()}__${key}.json`);
fs.unlink(dir, (err) => {
if (err) {
cc.log(cc.logLevelEnums.ERROR, `storeRemove, Failed to remove file with key: [${key}]. Error: [${err}].`);
}
});
},
};

/**
* Sets the storage method, by default sets file storage and storage path.
* @param {String} userPath - User provided storage path
* @param {Boolean} memoryOnly - Whether to use memory only storage or not
* @param {Boolean} isBulk - Whether the storage is for bulk data
* @param {Boolean} persistQueue - Whether to persist the queue until processed
* @returns {storageMethod} Prefered storage method
*/
var setStorage = function(userPath, memoryOnly = false, isBulk = false, persistQueue = false) {
if (memoryOnly) {
cc.log(cc.logLevelEnums.DEBUG, "Using memory-only storage.");
storageMethod = memoryStorage; // Assign memory-only methods
}
else {
cc.log(cc.logLevelEnums.DEBUG, "Using persistent file storage.");
storageMethod = fileStorage; // Assign file storage methods
setStoragePath(userPath, isBulk, persistQueue);
}
return storageMethod;
};

/**
* Sets the storage path, defaulting to a specified path if none is provided.
Expand Down Expand Up @@ -139,59 +254,9 @@ var writeFile = function(key, value, callback) {
});
};

/**
* Save value in storage
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
var storeSet = function(key, value, callback) {
__data[key] = value;
if (!asyncWriteLock) {
asyncWriteLock = true;
writeFile(key, value, callback);
}
else {
asyncWriteQueue.push([key, value, callback]);
}
};

/**
* Get value from storage
* @param {String} key - key of value to get
* @param {varies} def - default value to use if not set
* @returns {varies} value for the key
*/
var storeGet = function(key, def) {
cc.log(cc.logLevelEnums.DEBUG, `storeGet, Fetching item from storage with key: [${key}].`);
if (typeof __data[key] === "undefined") {
var ob = readFile(key);
var obLen;
// check if the 'read object' is empty or not
try {
obLen = Object.keys(ob).length;
}
catch (error) {
// if we can not even asses length set it to 0 so we can return the default value
obLen = 0;
}

// if empty or falsy set default value
if (!ob || obLen === 0) {
__data[key] = def;
}
// else set the value read file has
else {
__data[key] = ob[key];
}
}
return __data[key];
};

module.exports = {
setStorage,
writeFile,
storeGet,
storeSet,
forceStore,
getStoragePath,
setStoragePath,
Expand Down
Loading
Loading