Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
vtalas committed Feb 29, 2024
1 parent 29618cc commit de2558c
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 73 deletions.
127 changes: 60 additions & 67 deletions src/appmixer/google/drive/NewFile/NewFile.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict';
const Promise = require('bluebird');

Check failure on line 1 in src/appmixer/google/drive/NewFile/NewFile.js

View workflow job for this annotation

GitHub Actions / build

'Promise' is assigned a value but never used
const moment = require('moment');
const { google } = require('googleapis');
const commons = require('../drive-commons');
const { URL, URLSearchParams } = require('url');

const getNewFiles = async (lock, drive, folder, pageToken, newFiles = []) => {

const { data: { changes, newStartPageToken, nextPageToken } } = await drive.changes.list({ pageToken, fields: '*', includeRemoved: false, includeDeleted: false });
const { data: { changes, newStartPageToken, nextPageToken } } = await drive.changes.list({
pageToken,
fields: '*'
});

changes.forEach(change => {
if (change.changeType === 'file' && !change.removed && !change.file?.trashed && new Date(change.file?.createdTime) >= new Date(change.file?.modifiedTime)) {
Expand All @@ -29,6 +30,53 @@ const getNewFiles = async (lock, drive, folder, pageToken, newFiles = []) => {
return { newFiles, newStartPageToken };
};

const detectNewFiles = async function(context) {

const { folder = {} } = context.properties;

let lock = null;
try {
lock = await context.lock(context.componentId, { maxRetryCount: 0 });
} catch (err) {
context.log({ stage: 'lock error' });
await context.stateSet('hasSkippedMessage', true);
return context.response();
}

try {

const { startPageToken, lastChangeFileIDs = {}, processedFiles = [] } = await context.loadState();

const auth = commons.getOauth2Client(context.auth);
const drive = google.drive({ version: 'v3', auth });

const { newFiles, newStartPageToken } = await getNewFiles(lock, drive, folder.id, startPageToken);

await context.stateSet('hasSkippedMessage', false);

const processedFilesSet = x(processedFiles);

for (let file of newFiles) {
lastChangeFileIDs[file.name] = lastChangeFileIDs[file.name] || [];
lastChangeFileIDs[file.name].push(newStartPageToken);
if (!processedFilesSet.has(file.id)) {
processedFilesSet.add(newStartPageToken, file.id)

Check failure on line 63 in src/appmixer/google/drive/NewFile/NewFile.js

View workflow job for this annotation

GitHub Actions / build

Missing semicolon
lastChangeFileIDs[file.name].push('triggered!!!!');
await context.sendJson(file, 'file');
}
}

await context.stateSet('startPageToken', newStartPageToken);
await context.stateSet('lastChangeFileIDs', lastChangeFileIDs);
await context.stateSet('processedFiles', processedFilesSet.export());

} finally {
if (lock) {
await lock.unlock();
}
}
};

module.exports = {

async start(context) {
Expand All @@ -52,77 +100,22 @@ module.exports = {
return context.response();
}

// there's a bug in the drive changes API. Looks like they have a replica there and when the
// webhook arrives the change may not get to all the replicas yet, therefore the getChanges
// api sometimes does not correctly return that change (it's not there yet), component can handle
// this situation (line 86), but seems that delaying the webhook a bit helps.
await new Promise(resolve => {
setTimeout(() => {
resolve();
}, 1000);
});
context.log({ w: context.messages.webhook });

const { folder = {} } = context.properties;

let lock = null;
try {
try {
lock = await context.lock(context.componentId, { retryDelay: 4000 });
} catch (e) {
return;
}
const { startPageToken, lastChangeFileIDs } = await context.loadState();
const lastFiles = Array.isArray(lastChangeFileIDs) ? new Set(lastChangeFileIDs) : new Set();

const changesResourceURL = new URL(context.messages.webhook.content['headers']['x-goog-resource-uri']);
const params = new URLSearchParams(changesResourceURL.searchParams);

const auth = commons.getOauth2Client(context.auth);
const drive = google.drive({ version: 'v3', auth });

if (params.get('pageToken') < startPageToken) {
// if the Google Drive changes API worked properly, this section wouldn't be needed,
// but a pageToken with the same ID may be sent many times (case when multiple files
// are created at the same time) through the webhook and return different results
// when getting the list of changes starting from the same pageToken. That seems to
// be a bug, that should not happen.
const { newFiles } = await getNewFiles(lock, drive, folder.id, params.get('pageToken'));

await Promise.map(newFiles, file => {
if (!lastFiles.has(file.id)) {
lastFiles.add(file.id);
return context.sendJson(file, 'file');
}
}, { concurrency: 5 });

await context.stateSet('lastChangeFileIDs', Array.from(lastFiles));

// we have already processed that change
return context.response();
}

const { newFiles, newStartPageToken } = await getNewFiles(lock, drive, folder.id, startPageToken);

const newFileIDs = [];
await Promise.map(newFiles, file => {
newFileIDs.push(file.id);
return context.sendJson(file, 'file');
}, { concurrency: 5 });

await context.stateSet('startPageToken', newStartPageToken);
await context.stateSet('lastChangeFileIDs', newFileIDs);
} finally {
if (lock) {
await lock.unlock();
}
}
await detectNewFiles(context);

return context.response();
},

async tick(context) {

const { expiration } = await context.loadState();
const { expiration, hasSkippedMessage } = await context.loadState();

if (hasSkippedMessage) {
context.log({ stage: 'XXXXXXXXXXXXXXXXXXXXX', });

Check failure on line 115 in src/appmixer/google/drive/NewFile/NewFile.js

View workflow job for this annotation

GitHub Actions / build

Unexpected trailing comma
await detectNewFiles(context);
}

if (expiration) {
const renewDate = moment(expiration).subtract(5, 'hours');
if (moment().isSameOrAfter(renewDate)) {
Expand Down
7 changes: 2 additions & 5 deletions src/appmixer/google/drive/bundle.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "appmixer.google.drive",
"version": "1.4.3",
"version": "1.4.1",
"changelog": {
"1.0.0": [
"Initial version"
Expand Down Expand Up @@ -38,10 +38,7 @@
"Added new component DownloadFile to retrieve the content of the file."
],
"1.4.1": [
"Fixed 'ListFiles' component outPort options."
],
"1.4.3": [
"Fixed 'NewFile' trigger."
"Fixes 'ListFiles' component outPort options."
]
}
}
25 changes: 25 additions & 0 deletions src/appmixer/google/drive/drive-commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,33 @@ let defaultExportFormats = {
}
};

const x = function(data = []) {

const MAX_GROUP_COUNT = 3;
return {
has(id) {
return data.find(group => group.ids[id]);
},
add(group, id) {
const groupData = data.find(groupData => groupData.group === group);
if (!groupData) {
const ids = {};
ids[id] = true;
data.push({ group, ids });
} else {
groupData.ids[id] = true;
}
},
export() {
return data.slice(-MAX_GROUP_COUNT);
}

};
};

module.exports = {

x,
defaultExportFormats,

getOauth2Client(auth) {
Expand Down
2 changes: 1 addition & 1 deletion src/appmixer/openai/artifacts/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@
},
"max_tokens": {
"description": "The maximum number of [tokens](/tokenizer) to generate in the chat completion.\n\nThe total length of input tokens and generated tokens is limited by the model's context length. [Example Python code](https://cookbook.openai.com/examples/how_to_count_tokens_with_tiktoken) for counting tokens.\n",
"default": "inf",
"default": "Infinity",
"type": "integer",
"nullable": true
},
Expand Down
Binary file not shown.

0 comments on commit de2558c

Please sign in to comment.