Skip to content

Commit

Permalink
Memory Only Storage Option
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Rıza Kat committed Sep 18, 2024
1 parent cd6c767 commit 3695932
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 94 deletions.
167 changes: 116 additions & 51 deletions lib/countly-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,121 @@ var defaultPath = "../data/"; // Default path
var defaultBulkPath = "../bulk_data/"; // Default path
var asyncWriteLock = false;
var asyncWriteQueue = [];
let storageMethod = {};

// Memory-only storage methods
const memoryStorage = {
/**
* Save value in memory
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
storeSet(key, value, callback) {
__data[key] = value;
if (typeof callback === "function") {
callback(null); // No errors for memory storage
}
},
/**
* Get value from memory
* @param {String} key - key of value to get
* @param {varies} def - default value to use if not set
* @returns {varies} value for the key
*/
storeGet(key, def) {
cc.log(cc.logLevelEnums.DEBUG, `storeGet, Fetching item from memory with key: [${key}].`);
return typeof __data[key] !== "undefined" ? __data[key] : def;
},
/**
* Remove value from memory
* @param {String} key - key of value to remove
*/
storeRemove(key) {
delete __data[key];
},
};

// File storage methods
const fileStorage = {
/**
* Save value in storage
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
storeSet(key, value, callback) {
__data[key] = value;
if (!asyncWriteLock) {
asyncWriteLock = true;
writeFile(key, value, callback);
}
else {
asyncWriteQueue.push([key, value, callback]);
}
},
/**
* Get value from storage
* @param {String} key - key of value to get
* @param {varies} def - default value to use if not set
* @returns {varies} value for the key
*/
storeGet(key, def) {
cc.log(cc.logLevelEnums.DEBUG, `storeGet, Fetching item from storage with key: [${key}].`);
if (typeof __data[key] === "undefined") {
var ob = readFile(key);
var obLen;
// check if the 'read object' is empty or not
try {
obLen = Object.keys(ob).length;
}
catch (error) {
// if we can not even asses length set it to 0 so we can return the default value
obLen = 0;
}

// if empty or falsy set default value
if (!ob || obLen === 0) {
__data[key] = def;
}
// else set the value read file has
else {
__data[key] = ob[key];
}
}
return __data[key];
},
storeRemove(key) {
delete __data[key];
var dir = path.resolve(__dirname, `${getStoragePath()}__${key}.json`);
fs.unlink(dir, (err) => {
if (err) {
cc.log(cc.logLevelEnums.ERROR, `storeRemove, Failed to remove file with key: [${key}]. Error: [${err}].`);
}
});
},
};

/**
* Sets the storage method, by default sets file storage and storage path.
* @param {String} userPath - User provided storage path
* @param {Boolean} memoryOnly - Whether to use memory only storage or not
* @param {Boolean} isBulk - Whether the storage is for bulk data
* @param {Boolean} persistQueue - Whether to persist the queue until processed
* @returns {storageMethod} Prefered storage method
*/
var setStorage = function(userPath, memoryOnly = false, isBulk = false, persistQueue = false) {
if (memoryOnly) {
cc.log(cc.logLevelEnums.DEBUG, "Using memory-only storage.");
storageMethod = memoryStorage; // Assign memory-only methods
}
else {
cc.log(cc.logLevelEnums.DEBUG, "Using persistent file storage.");
storageMethod = fileStorage; // Assign file storage methods
setStoragePath(userPath, isBulk, persistQueue);
}
return storageMethod;
};

/**
* Sets the storage path, defaulting to a specified path if none is provided.
Expand Down Expand Up @@ -139,59 +254,9 @@ var writeFile = function(key, value, callback) {
});
};

/**
* Save value in storage
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
var storeSet = function(key, value, callback) {
__data[key] = value;
if (!asyncWriteLock) {
asyncWriteLock = true;
writeFile(key, value, callback);
}
else {
asyncWriteQueue.push([key, value, callback]);
}
};

/**
* Get value from storage
* @param {String} key - key of value to get
* @param {varies} def - default value to use if not set
* @returns {varies} value for the key
*/
var storeGet = function(key, def) {
cc.log(cc.logLevelEnums.DEBUG, `storeGet, Fetching item from storage with key: [${key}].`);
if (typeof __data[key] === "undefined") {
var ob = readFile(key);
var obLen;
// check if the 'read object' is empty or not
try {
obLen = Object.keys(ob).length;
}
catch (error) {
// if we can not even asses length set it to 0 so we can return the default value
obLen = 0;
}

// if empty or falsy set default value
if (!ob || obLen === 0) {
__data[key] = def;
}
// else set the value read file has
else {
__data[key] = ob[key];
}
}
return __data[key];
};

module.exports = {
setStorage,
writeFile,
storeGet,
storeSet,
forceStore,
getStoragePath,
setStoragePath,
Expand Down
41 changes: 21 additions & 20 deletions lib/countly.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var Bulk = require("./countly-bulk");
var CountlyStorage = require("./countly-storage");

var Countly = {};
var storageMethod;

Countly.Bulk = Bulk;
(function() {
Expand Down Expand Up @@ -171,13 +172,13 @@ Countly.Bulk = Bulk;
cc.debug = conf.debug;

// Set the storage path
CountlyStorage.setStoragePath(conf.storage_path);
storageMethod = CountlyStorage.setStorage(conf.storage_path, true);

// clear stored device ID if flag is set
if (conf.clear_stored_device_id) {
cc.log(cc.logLevelEnums.WARNING, "init, clear_stored_device_id is true, erasing the stored ID.");
CountlyStorage.storeSet("cly_id", null);
CountlyStorage.storeSet("cly_id_type", null);
storageMethod.storeSet("cly_id", null);
storageMethod.storeSet("cly_id_type", null);
}

if (Countly.url === "") {
Expand Down Expand Up @@ -223,8 +224,8 @@ Countly.Bulk = Bulk;

if (cluster.isMaster) {
// fetch stored ID and ID type
var storedId = CountlyStorage.storeGet("cly_id", null);
var storedIdType = CountlyStorage.storeGet("cly_id_type", null);
var storedId = storageMethod.storeGet("cly_id", null);
var storedIdType = storageMethod.storeGet("cly_id_type", null);
// if there was a stored ID
if (storedId !== null) {
Countly.device_id = storedId;
Expand Down Expand Up @@ -253,12 +254,12 @@ Countly.Bulk = Bulk;
deviceIdType = cc.deviceIdTypeEnums.SDK_GENERATED;
}
// save the ID and ID type
CountlyStorage.storeSet("cly_id", Countly.device_id);
CountlyStorage.storeSet("cly_id_type", deviceIdType);
storageMethod.storeSet("cly_id", Countly.device_id);
storageMethod.storeSet("cly_id_type", deviceIdType);
// create queues
requestQueue = CountlyStorage.storeGet("cly_queue", []);
eventQueue = CountlyStorage.storeGet("cly_event", []);
remoteConfigs = CountlyStorage.storeGet("cly_remote_configs", {});
requestQueue = storageMethod.storeGet("cly_queue", []);
eventQueue = storageMethod.storeGet("cly_event", []);
remoteConfigs = storageMethod.storeGet("cly_remote_configs", {});
heartBeat();
// listen to current workers
if (cluster.workers) {
Expand Down Expand Up @@ -635,7 +636,7 @@ Countly.Bulk = Bulk;
if (eventQueue.length > 0) {
toRequestQueue({ events: JSON.stringify(eventQueue) });
eventQueue = [];
CountlyStorage.storeSet("cly_event", eventQueue);
storageMethod.storeSet("cly_event", eventQueue);
}
// end current session
Countly.end_session();
Expand All @@ -647,8 +648,8 @@ Countly.Bulk = Bulk;
var oldId = Countly.device_id;
Countly.device_id = newId;
deviceIdType = cc.deviceIdTypeEnums.DEVELOPER_SUPPLIED;
CountlyStorage.storeSet("cly_id", Countly.device_id);
CountlyStorage.storeSet("cly_id_type", deviceIdType);
storageMethod.storeSet("cly_id", Countly.device_id);
storageMethod.storeSet("cly_id_type", deviceIdType);
if (merge) {
if (Countly.check_any_consent()) {
toRequestQueue({ old_device_id: oldId });
Expand All @@ -664,7 +665,7 @@ Countly.Bulk = Bulk;
if (Countly.remote_config) {
remoteConfigs = {};
if (cluster.isMaster) {
CountlyStorage.storeSet("cly_remote_configs", remoteConfigs);
storageMethod.storeSet("cly_remote_configs", remoteConfigs);
}
Countly.fetch_remote_config(Countly.remote_config);
}
Expand Down Expand Up @@ -744,7 +745,7 @@ Countly.Bulk = Bulk;
e.dow = date.getDay();
cc.log(cc.logLevelEnums.DEBUG, "add_cly_events, Adding event: ", event);
eventQueue.push(e);
CountlyStorage.storeSet("cly_event", eventQueue);
storageMethod.storeSet("cly_event", eventQueue);
}
else {
process.send({ cly: { event } });
Expand Down Expand Up @@ -1155,7 +1156,7 @@ Countly.Bulk = Bulk;
remoteConfigs = configs;
}
if (cluster.isMaster) {
CountlyStorage.storeSet("cly_remote_configs", remoteConfigs);
storageMethod.storeSet("cly_remote_configs", remoteConfigs);
cc.log(cc.logLevelEnums.INFO, `fetch_remote_config, Fetched remote config: [${remoteConfigs}].`);
}
}
Expand Down Expand Up @@ -1339,7 +1340,7 @@ Countly.Bulk = Bulk;
if (cluster.isMaster) {
cc.log(cc.logLevelEnums.INFO, "request, Adding the raw request to the queue.");
requestQueue.push(request);
CountlyStorage.storeSet("cly_queue", requestQueue);
storageMethod.storeSet("cly_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -1426,7 +1427,7 @@ Countly.Bulk = Bulk;

cc.log(cc.logLevelEnums.INFO, "toRequestQueue, Adding request to the queue.");
requestQueue.push(request);
CountlyStorage.storeSet("cly_queue", requestQueue);
storageMethod.storeSet("cly_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "toRequestQueue, Sending message to the parent process. Adding request to the queue.");
Expand Down Expand Up @@ -1457,7 +1458,7 @@ Countly.Bulk = Bulk;
var events = eventQueue.splice(0, maxEventBatch);
toRequestQueue({ events: JSON.stringify(events) });
}
CountlyStorage.storeSet("cly_event", eventQueue);
storageMethod.storeSet("cly_event", eventQueue);
}

// process request queue with event queue
Expand All @@ -1472,7 +1473,7 @@ Countly.Bulk = Bulk;
failTimeout = cc.getTimestamp() + failTimeoutAmount;
cc.log(cc.logLevelEnums.ERROR, `makeRequest, Encountered a problem while making the request: [${err}]`);
}
CountlyStorage.storeSet("cly_queue", requestQueue);
storageMethod.storeSet("cly_queue", requestQueue);
readyToProcess = true;
}, "heartBeat", false);
}
Expand Down
Loading

0 comments on commit 3695932

Please sign in to comment.