Skip to content

Commit

Permalink
Merge pull request #34 from vtalas/drive
Browse files Browse the repository at this point in the history
Google.Drive: NewFile update
  • Loading branch information
martin-krcmar authored Mar 12, 2024
2 parents 5b48d3b + b953ee5 commit 97b22a7
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 74 deletions.
141 changes: 71 additions & 70 deletions src/appmixer/google/drive/NewFile/NewFile.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict';
const Promise = require('bluebird');
const moment = require('moment');
const { google } = require('googleapis');
const commons = require('../drive-commons');
const { URL, URLSearchParams } = require('url');

const getNewFiles = async (lock, drive, folder, pageToken, newFiles = []) => {

const { data: { changes, newStartPageToken, nextPageToken } } = await drive.changes.list({ pageToken, fields: '*', includeRemoved: false, includeDeleted: false });
const { data: { changes, newStartPageToken, nextPageToken } } = await drive.changes.list({
pageToken,
fields: '*',
includeRemoved: false
});

changes.forEach(change => {
if (change.changeType === 'file' && !change.removed && !change.file?.trashed && new Date(change.file?.createdTime) >= new Date(change.file?.modifiedTime)) {
Expand All @@ -29,6 +30,62 @@ const getNewFiles = async (lock, drive, folder, pageToken, newFiles = []) => {
return { newFiles, newStartPageToken };
};

const detectNewFiles = async function(context) {

const DEBUG = context.config.DEBUG !== 'false' || false;
const { folder = {} } = context.properties;

let lock = null;
try {
lock = await context.lock(context.componentId, { maxRetryCount: 0 });
} catch (err) {
await context.stateSet('hasSkippedMessage', true);
return;
}

try {
const { startPageToken, processedFiles = [], debugInfo = {} } = await context.loadState();
const auth = commons.getOauth2Client(context.auth);
const drive = google.drive({ version: 'v3', auth });

await context.stateSet('hasSkippedMessage', false);

const { newFiles, newStartPageToken } = await getNewFiles(lock, drive, folder.id, startPageToken);

const processedFilesSet = commons.processedItemsBuffer(processedFiles);

for (let file of newFiles) {
if (!processedFilesSet.has(file.id)) {
processedFilesSet.add(newStartPageToken, file.id);
await context.sendJson(file, 'file');
await context.stateSet('processedFiles', processedFilesSet.export());

if (DEBUG) {
debugInfo[file.name] = debugInfo[file.name] || [];
debugInfo[file.name].push(newStartPageToken);
}
}
}

if (DEBUG) {
await context.log({ 'DEBUG': debugInfo });
if (Object.keys(debugInfo).length > 10000) {
await context.log({ 'DEBUG': 'Clearing debug log, maximum count of records reached.' });
await context.stateSet('debugInfo', {});
} else {
await context.stateSet('debugInfo', debugInfo);
}
}

await context.stateSet('startPageToken', newStartPageToken);

} finally {
if (lock) {
await lock.unlock();
}
}
};

module.exports = {

async start(context) {
Expand All @@ -52,77 +109,21 @@ module.exports = {
return context.response();
}

// there's a bug in the drive changes API. Looks like they have a replica there and when the
// webhook arrives the change may not get to all the replicas yet, therefore the getChanges
// api sometimes does not correctly return that change (it's not there yet), component can handle
// this situation (line 86), but seems that delaying the webhook a bit helps.
await new Promise(resolve => {
setTimeout(() => {
resolve();
}, 1000);
});

const { folder = {} } = context.properties;

let lock = null;
try {
try {
lock = await context.lock(context.componentId, { retryDelay: 4000 });
} catch (e) {
return;
}
const { startPageToken, lastChangeFileIDs } = await context.loadState();
const lastFiles = Array.isArray(lastChangeFileIDs) ? new Set(lastChangeFileIDs) : new Set();

const changesResourceURL = new URL(context.messages.webhook.content['headers']['x-goog-resource-uri']);
const params = new URLSearchParams(changesResourceURL.searchParams);

const auth = commons.getOauth2Client(context.auth);
const drive = google.drive({ version: 'v3', auth });

if (params.get('pageToken') < startPageToken) {
// if the Google Drive changes API worked properly, this section wouldn't be needed,
// but a pageToken with the same ID may be sent many times (case when multiple files
// are created at the same time) through the webhook and return different results
// when getting the list of changes starting from the same pageToken. That seems to
// be a bug, that should not happen.
const { newFiles } = await getNewFiles(lock, drive, folder.id, params.get('pageToken'));

await Promise.map(newFiles, file => {
if (!lastFiles.has(file.id)) {
lastFiles.add(file.id);
return context.sendJson(file, 'file');
}
}, { concurrency: 5 });

await context.stateSet('lastChangeFileIDs', Array.from(lastFiles));

// we have already processed that change
return context.response();
}

const { newFiles, newStartPageToken } = await getNewFiles(lock, drive, folder.id, startPageToken);

const newFileIDs = [];
await Promise.map(newFiles, file => {
newFileIDs.push(file.id);
return context.sendJson(file, 'file');
}, { concurrency: 5 });

await context.stateSet('startPageToken', newStartPageToken);
await context.stateSet('lastChangeFileIDs', newFileIDs);
} finally {
if (lock) {
await lock.unlock();
}
}
await detectNewFiles(context);

return context.response();
},

async tick(context) {

const { expiration } = await context.loadState();
const { expiration, hasSkippedMessage } = await context.loadState();

if (hasSkippedMessage) {
// a message came when we were processing results,
// we have to check for new files again
await detectNewFiles(context);
}

if (expiration) {
const renewDate = moment(expiration).subtract(5, 'hours');
if (moment().isSameOrAfter(renewDate)) {
Expand Down Expand Up @@ -162,7 +163,7 @@ module.exports = {
includeRemoved: false,
pageToken,
requestBody: {
address: context.getWebhookUrl(),
address: context.getWebhookUrl() + '?enqueueOnly=true',
id: context.componentId,
payload: true,
token: context.componentId,
Expand Down
2 changes: 1 addition & 1 deletion src/appmixer/google/drive/NewFile/component.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "appmixer.google.drive.NewFile",
"author": "Jimoh Damilola <[email protected]>",
"description": "This trigger fires every time there's new file. It does not download the file! If you need the content of the file use the ExportFile component.",
"version": "1.0.3",
"version": "1.0.4",
"private": false,
"webhook": true,
"tick": true,
Expand Down
6 changes: 3 additions & 3 deletions src/appmixer/google/drive/bundle.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "appmixer.google.drive",
"version": "1.4.3",
"version": "1.4.4",
"changelog": {
"1.0.0": [
"Initial version"
Expand Down Expand Up @@ -40,8 +40,8 @@
"1.4.1": [
"Fixed 'ListFiles' component outPort options."
],
"1.4.3": [
"Fixed 'NewFile' trigger."
"1.4.4": [
"NewFile: fixes, improvements. Added DEBUG info messages. Set the DEBUG configuration key in the Backoffice to true to see the messages."
]
}
}
24 changes: 24 additions & 0 deletions src/appmixer/google/drive/drive-commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,32 @@ let defaultExportFormats = {
}
};

const processedItemsBuffer = function(data = []) {

const MAX_GROUP_COUNT = 3;
return {
has(id) {
return data.find(group => group.ids[id]);
},
add(group, id) {
const groupData = data.find(groupData => groupData.group === group);
if (!groupData) {
const ids = {};
ids[id] = true;
data.push({ group, ids });
} else {
groupData.ids[id] = true;
}
},
export() {
return data.slice(-MAX_GROUP_COUNT);
}
};
};

module.exports = {

processedItemsBuffer,
defaultExportFormats,

getOauth2Client(auth) {
Expand Down
1 change: 1 addition & 0 deletions src/appmixer/google/drive/module.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"name": "appmixer.google.drive",
"label": "Google Drive",
"category": "applications",
"version": "1.1.0",
"description": "Google Drive is a file storage and synchronization service. It allows users to store files on their servers, synchronize files across devices, and share files.",
"icon": ""
}

0 comments on commit 97b22a7

Please sign in to comment.