Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google.Drive: NewFile update #34

Merged
merged 9 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 71 additions & 70 deletions src/appmixer/google/drive/NewFile/NewFile.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict';
const Promise = require('bluebird');
const moment = require('moment');
const { google } = require('googleapis');
const commons = require('../drive-commons');
const { URL, URLSearchParams } = require('url');

const getNewFiles = async (lock, drive, folder, pageToken, newFiles = []) => {

const { data: { changes, newStartPageToken, nextPageToken } } = await drive.changes.list({ pageToken, fields: '*', includeRemoved: false, includeDeleted: false });
const { data: { changes, newStartPageToken, nextPageToken } } = await drive.changes.list({
pageToken,
vtalas marked this conversation as resolved.
Show resolved Hide resolved
fields: '*',
vtalas marked this conversation as resolved.
Show resolved Hide resolved
includeRemoved: false
});

changes.forEach(change => {
if (change.changeType === 'file' && !change.removed && !change.file?.trashed && new Date(change.file?.createdTime) >= new Date(change.file?.modifiedTime)) {
Expand All @@ -29,6 +30,62 @@ const getNewFiles = async (lock, drive, folder, pageToken, newFiles = []) => {
return { newFiles, newStartPageToken };
};

const detectNewFiles = async function(context) {

const DEBUG = context.config.DEBUG !== 'false' || false;
const { folder = {} } = context.properties;

let lock = null;
try {
lock = await context.lock(context.componentId, { maxRetryCount: 0 });
} catch (err) {
await context.stateSet('hasSkippedMessage', true);
return;
}

try {
const { startPageToken, processedFiles = [], debugInfo = {} } = await context.loadState();
const auth = commons.getOauth2Client(context.auth);
const drive = google.drive({ version: 'v3', auth });

await context.stateSet('hasSkippedMessage', false);

vtalas marked this conversation as resolved.
Show resolved Hide resolved
const { newFiles, newStartPageToken } = await getNewFiles(lock, drive, folder.id, startPageToken);

const processedFilesSet = commons.processedItemsBuffer(processedFiles);

for (let file of newFiles) {
if (!processedFilesSet.has(file.id)) {
processedFilesSet.add(newStartPageToken, file.id);
await context.sendJson(file, 'file');
await context.stateSet('processedFiles', processedFilesSet.export());
vtalas marked this conversation as resolved.
Show resolved Hide resolved

if (DEBUG) {
debugInfo[file.name] = debugInfo[file.name] || [];
debugInfo[file.name].push(newStartPageToken);
}
}
}

if (DEBUG) {
await context.log({ 'DEBUG': debugInfo });
if (Object.keys(debugInfo).length > 10000) {
await context.log({ 'DEBUG': 'Clearing debug log, maximum count of records reached.' });
await context.stateSet('debugInfo', {});
} else {
await context.stateSet('debugInfo', debugInfo);
}
}

await context.stateSet('startPageToken', newStartPageToken);

} finally {
if (lock) {
await lock.unlock();
}
}
};

module.exports = {

async start(context) {
Expand All @@ -52,77 +109,21 @@ module.exports = {
return context.response();
}

// there's a bug in the drive changes API. Looks like they have a replica there and when the
// webhook arrives the change may not get to all the replicas yet, therefore the getChanges
// api sometimes does not correctly return that change (it's not there yet), component can handle
// this situation (line 86), but seems that delaying the webhook a bit helps.
await new Promise(resolve => {
setTimeout(() => {
resolve();
}, 1000);
});

const { folder = {} } = context.properties;

let lock = null;
try {
try {
lock = await context.lock(context.componentId, { retryDelay: 4000 });
} catch (e) {
return;
}
const { startPageToken, lastChangeFileIDs } = await context.loadState();
const lastFiles = Array.isArray(lastChangeFileIDs) ? new Set(lastChangeFileIDs) : new Set();

const changesResourceURL = new URL(context.messages.webhook.content['headers']['x-goog-resource-uri']);
const params = new URLSearchParams(changesResourceURL.searchParams);

const auth = commons.getOauth2Client(context.auth);
const drive = google.drive({ version: 'v3', auth });

if (params.get('pageToken') < startPageToken) {
// if the Google Drive changes API worked properly, this section wouldn't be needed,
// but a pageToken with the same ID may be sent many times (case when multiple files
// are created at the same time) through the webhook and return different results
// when getting the list of changes starting from the same pageToken. That seems to
// be a bug, that should not happen.
const { newFiles } = await getNewFiles(lock, drive, folder.id, params.get('pageToken'));

await Promise.map(newFiles, file => {
if (!lastFiles.has(file.id)) {
lastFiles.add(file.id);
return context.sendJson(file, 'file');
}
}, { concurrency: 5 });

await context.stateSet('lastChangeFileIDs', Array.from(lastFiles));

// we have already processed that change
return context.response();
}

const { newFiles, newStartPageToken } = await getNewFiles(lock, drive, folder.id, startPageToken);

const newFileIDs = [];
await Promise.map(newFiles, file => {
newFileIDs.push(file.id);
return context.sendJson(file, 'file');
}, { concurrency: 5 });

await context.stateSet('startPageToken', newStartPageToken);
await context.stateSet('lastChangeFileIDs', newFileIDs);
} finally {
if (lock) {
await lock.unlock();
}
}
await detectNewFiles(context);

vtalas marked this conversation as resolved.
Show resolved Hide resolved
return context.response();
},

async tick(context) {

const { expiration } = await context.loadState();
const { expiration, hasSkippedMessage } = await context.loadState();

if (hasSkippedMessage) {
// a message came when we were processing results,
// we have to check for new files again
await detectNewFiles(context);
}

if (expiration) {
const renewDate = moment(expiration).subtract(5, 'hours');
if (moment().isSameOrAfter(renewDate)) {
Expand Down Expand Up @@ -162,7 +163,7 @@ module.exports = {
includeRemoved: false,
pageToken,
requestBody: {
address: context.getWebhookUrl(),
address: context.getWebhookUrl() + '?enqueueOnly=true',
id: context.componentId,
payload: true,
token: context.componentId,
Expand Down
2 changes: 1 addition & 1 deletion src/appmixer/google/drive/NewFile/component.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "appmixer.google.drive.NewFile",
"author": "Jimoh Damilola <[email protected]>",
"description": "This trigger fires every time there's new file. It does not download the file! If you need the content of the file use the ExportFile component.",
"version": "1.0.3",
"version": "1.0.4",
"private": false,
"webhook": true,
"tick": true,
Expand Down
6 changes: 3 additions & 3 deletions src/appmixer/google/drive/bundle.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "appmixer.google.drive",
"version": "1.4.3",
"version": "1.4.4",
"changelog": {
"1.0.0": [
"Initial version"
Expand Down Expand Up @@ -40,8 +40,8 @@
"1.4.1": [
"Fixed 'ListFiles' component outPort options."
],
"1.4.3": [
"Fixed 'NewFile' trigger."
"1.4.4": [
"NewFile: fixes, improvements. Added DEBUG info messages. Set the DEBUG configuration key in the Backoffice to true to see the messages."
]
}
}
24 changes: 24 additions & 0 deletions src/appmixer/google/drive/drive-commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,32 @@ let defaultExportFormats = {
}
};

const processedItemsBuffer = function(data = []) {

const MAX_GROUP_COUNT = 3;
return {
has(id) {
return data.find(group => group.ids[id]);
},
add(group, id) {
const groupData = data.find(groupData => groupData.group === group);
if (!groupData) {
const ids = {};
ids[id] = true;
data.push({ group, ids });
} else {
groupData.ids[id] = true;
}
},
export() {
return data.slice(-MAX_GROUP_COUNT);
}
};
};

module.exports = {

processedItemsBuffer,
defaultExportFormats,

getOauth2Client(auth) {
Expand Down
1 change: 1 addition & 0 deletions src/appmixer/google/drive/module.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"name": "appmixer.google.drive",
"label": "Google Drive",
"category": "applications",
"version": "1.1.0",
"description": "Google Drive is a file storage and synchronization service. It allows users to store files on their servers, synchronize files across devices, and share files.",
"icon": ""
}
Loading