From e52caea2be8f9d27b4eb1df11a66276f6a604a86 Mon Sep 17 00:00:00 2001 From: David Durman Date: Wed, 8 Jan 2025 16:53:10 +0100 Subject: [PATCH] NewAttachment, NewEmail: improvements --- .../google/gmail/FindEmails/component.json | 2 +- .../gmail/NewAttachment/NewAttachment.js | 136 +++++------------- .../google/gmail/NewAttachment/component.json | 66 ++++----- .../google/gmail/NewEmail/NewEmail.js | 44 ++---- .../google/gmail/NewEmail/component.json | 106 +++++++------- src/appmixer/google/gmail/gmail-commons.js | 118 +++++++++++---- 6 files changed, 212 insertions(+), 260 deletions(-) diff --git a/src/appmixer/google/gmail/FindEmails/component.json b/src/appmixer/google/gmail/FindEmails/component.json index 011bea63e..376e61c8d 100644 --- a/src/appmixer/google/gmail/FindEmails/component.json +++ b/src/appmixer/google/gmail/FindEmails/component.json @@ -26,7 +26,7 @@ "type": "text", "label": "Search Query", "index": 1, - "tooltip": "The search query to find emails." + "tooltip": "The search query to find emails. See Google Documentation for more info. Example: from:amy@example.com AND subject:dinner." }, "outputType": { "type": "select", diff --git a/src/appmixer/google/gmail/NewAttachment/NewAttachment.js b/src/appmixer/google/gmail/NewAttachment/NewAttachment.js index 71ce940db..9c079df57 100644 --- a/src/appmixer/google/gmail/NewAttachment/NewAttachment.js +++ b/src/appmixer/google/gmail/NewAttachment/NewAttachment.js @@ -1,116 +1,58 @@ 'use strict'; + const emailCommons = require('../gmail-commons'); const Promise = require('bluebird'); module.exports = { - async tick(context) { - let newState = {}; - - const { labels: { AND: labels } = { AND: [] } } = context.properties; - const isLabelsEmpty = !labels.some(label => label.name); - // Fetch new messages from the inbox - const data = await emailCommons.listNewMessages( - { context, userId: 'me' }, - context.state.id || null - ); - - // Update the state with the latest message ID - newState.id = data.lastMessageId; - - // Fetch the full email data for new messages - const emails = await Promise.map(data.newMessages, async message => { - return emailCommons.callEndpoint(context, `/users/me/messages/${message.id}`, { - method: 'GET', - params: { format: 'full' } - }).then(response => response.data).catch(err => { - // email can be deleted (permanently) in gmail between listNewMessages call and - // this getMessage call, in such case - ignore it and return null - if (err && err.response && err.response.status === 404) { - return null; - } - throw err; - }); - }, { concurrency: 10 }); + async tick(context) { - // Extract attachments from emails - let attachments = await Promise.map(emails, email => { - if (!email || !email.labelIds) { - // Skip if the email was deleted or labelIds is missing - return []; + const { download } = context.properties; + const state = context.state; + let query = context.properties.query; + query = (query ? query + ' AND ' : '') + 'has:attachment'; + const { emails, state: newState } = await emailCommons.listNewMessages(context, query, state); + + // Fetch attachments from emails. + const output = []; + await Promise.map(emails, async (email) => { + if (!email) { + // Skip if the email was deleted. + return; } - - // Filter emails based on selected labels - if (isLabelsEmpty || labels.some(label => email.labelIds.includes(label.name))) { - return downloadAttachments(context, email); + if (!emailCommons.isNewInboxEmail(email.labelIds || [])) { + // Skip SENT and DRAFT emails. + return; } - return []; - }); - - // Flatten the array of attachments - attachments = attachments.reduce((a, b) => a.concat(b), []); - // Save attachments and send them to the output port - let attachmentsOutput; - - if (context.properties.download) { - attachmentsOutput = await Promise.map(attachments, attachment => { - const buffer = Buffer.from(attachment.data, 'base64'); - return context.saveFileStream( - attachment.filename, - buffer - ).then(res => { - return Object.assign(res, { - email: attachment.email, - attachment - }); - }); - }); - } else { - attachmentsOutput = attachments.map(attachment => { - return Object.assign(attachment, { - email: attachment.email, + return Promise.map(email.attachments || [], async (attachment) => { + const out = { + email, attachment - }); + }; + if (download) { + const savedFile = await downloadAttachment(context, email.id, attachment.id, attachment.filename); + out.fileId = savedFile.fileId; + out.filename = savedFile.filename; + out.contentType = savedFile.contentType; + } + output.push(out); }); - } - - await Promise.map(attachmentsOutput, out => { - return context.sendJson(out, 'attachment'); }); - await context.saveState(newState); + await context.sendArray(output, 'attachment'); + if (JSON.stringify(state != JSON.stringify(newState))) { + return context.saveState(newState); + } } }; -/** - * Download attachments from an email. - * @param {Context} context - * @param {Object} email - * @return {Array} returns array with attachments - */ -const downloadAttachments = async (context, email) => { - if (!emailCommons.isNewInboxEmail(email.labelIds)) { - return []; // skip SENT and DRAFT emails - } - - // Parse the email content to extract attachments - const parsedEmail = emailCommons.normalizeEmail(email); - - return Promise.map(parsedEmail.attachments || [], async (attachment) => { - const out = { - filename: attachment.filename, - mimetype: attachment.mimeType || 'application/octet-stream', // Ensure mimetype is set - size: attachment.size, - email: parsedEmail, - attachment: attachment - }; - if (context.properties.download) { - const response = await emailCommons.callEndpoint(context, `/users/me/messages/${email.id}/attachments/${attachment.id}`, { - method: 'GET' - }); - out.data = response.data.data; - } - return out; +const downloadAttachment = async (context, emailId, attachmentId, filename) => { + const response = await emailCommons.callEndpoint(context, `/users/me/messages/${emailId}/attachments/${attachmentId}`, { + method: 'GET' }); + const base64 = response.data.data; + const buffer = Buffer.from(base64, 'base64'); + const savedFile = await context.saveFileStream(filename, buffer); + return savedFile; }; diff --git a/src/appmixer/google/gmail/NewAttachment/component.json b/src/appmixer/google/gmail/NewAttachment/component.json index 051ddffa0..a70c50d2a 100644 --- a/src/appmixer/google/gmail/NewAttachment/component.json +++ b/src/appmixer/google/gmail/NewAttachment/component.json @@ -21,37 +21,17 @@ "properties": { "schema": { "properties": { - "labels": { "type": "object" }, + "query": { "type": "string" }, "download": { "type": "boolean" } } }, "inspector": { "inputs": { - "labels": { - "type": "expression", - "label": "Labels", - "levels": [ - "AND" - ], + "query": { + "type": "text", + "label": "Email Query", "index": 1, - "tooltip": "Select one or more labels to filter the emails that contain attachments. The trigger will activate if an attachment is received in an email that matches any of the selected labels. If no labels are selected, the trigger will activate for all attachments in all received emails.", - "fields": { - "name": { - "type": "select", - "label": "Name", - "index": 1, - "tooltip": "Select a name of the existing label.", - "source": { - "url": "/component/appmixer/google/gmail/ListLabels?outPort=out", - "data": { - "properties": { - "sendWholeArray": true - }, - "transform": "./ListLabels#labelsToSelectArrayFiltered" - } - } - } - } + "tooltip": "The search query to find new emails. This allows you to only consider email messages that can be found using the query. See Google Documentation for more info. Example: from:amy@example.com AND subject:dinner." }, "download": { "type": "toggle", @@ -67,36 +47,38 @@ { "name": "attachment", "options": [ - { "label": "File ID", "value": "fileId" }, - { "label": "File Name", "value": "filename" }, - { "label": "Content Type", "value": "contentType" }, + { "label": "File ID", "value": "fileId", "schema": { "type": "string", "format": "appmixer-file-id" } }, + { "label": "File Name", "value": "filename", "schema": { "type": "string" } }, + { "label": "Content Type", "value": "contentType", "schema": { "type": "string" } }, { "label": "Attachment", "value": "attachment", "schema": { "type": "object", "properties": { - "id": { "type": "string" }, - "filename": { "type": "string" }, - "size": { "type": "integer" }, - "mimeType": { "type": "string" } + "id": { "type": "string", "title": "Attachment ID" }, + "filename": { "type": "string", "title": "Attachment File Name" }, + "size": { "type": "integer", "title":"Attachment File Size" }, + "mimeType": { "type": "string", "title": "Attachment MIME Type" } } } }, { "label": "Email", "value": "email", "schema": { "type": "object", "properties": { - "id": { "type": "string" }, - "threadId": { "type": "string" }, - "labelIds": { "type": "array", "items": { "type": "string" } }, - "snippet": { "type": "string" }, - "sizeEstimate": { "type": "integer" }, + "id": { "type": "string", "title": "Email ID" }, + "threadId": { "type": "string", "title": "Email Thread ID" }, + "labelIds": { "type": "array", "items": { "type": "string" }, "title": "Email Label IDs" }, + "snippet": { "type": "string", "title": "Email Snippet" }, + "sizeEstimate": { "type": "integer", "title": "Email Size Estimate" }, "payload": { "type": "object", + "title": "Email Payload", "properties": { - "date": { "type": "string", "format": "date-time" }, - "subject": { "type": "string" }, - "text": { "type": "string" }, - "html": { "type": "string" }, + "date": { "type": "string", "format": "date-time", "title": "Email Date" }, + "subject": { "type": "string", "title": "Email Subject" }, + "text": { "type": "string", "title": "Email Text" }, + "html": { "type": "string", "title": "Email HTML" }, "from": { "type": "array", + "title": "Email Senders", "items": { "type": "object", "properties": { @@ -107,6 +89,7 @@ }, "to": { "type": "array", + "title": "Email Recipients", "items": { "type": "object", "properties": { @@ -119,6 +102,7 @@ }, "attachments": { "type": "array", + "title": "Email Attachments", "items": { "type": "object", "properties": { diff --git a/src/appmixer/google/gmail/NewEmail/NewEmail.js b/src/appmixer/google/gmail/NewEmail/NewEmail.js index 5efac35cf..e28b5244b 100644 --- a/src/appmixer/google/gmail/NewEmail/NewEmail.js +++ b/src/appmixer/google/gmail/NewEmail/NewEmail.js @@ -3,44 +3,16 @@ const emailCommons = require('../gmail-commons'); const Promise = require('bluebird'); module.exports = { - async tick(context) { - let newState = {}; - - const { labels: { AND: labels } = { AND: [] } } = context.properties; - const isLabelsEmpty = !labels.some(label => label.name); - - // Fetch all messages without sending labelIds - const data = await emailCommons.listNewMessages( - { context, userId: 'me' }, - context.state.id || null - ); - newState.id = data.lastMessageId; - - const emails = await Promise.map(data.newMessages, async message => { - return emailCommons.callEndpoint(context, `/users/me/messages/${message.id}`, { - method: 'GET', - params: { format: 'full' } - }).then(response => response.data).catch(err => { - // email can be deleted (permanently) in gmail between listNewMessages call and - // this getMessage call, in such case - ignore it and return null - if (err && err.response && err.response.status === 404) { - return null; - } - throw err; - }); - }, { concurrency: 10 }); - // Filter the emails based on selected labels, if any - await Promise.each(emails || [], async email => { - if (!email || !email.labelIds) { - throw new context.CancelError('Invalid email or email label'); - } + async tick(context) { - if (isLabelsEmpty || labels.some(label => email.labelIds.includes(label.name))) { - await context.sendJson(emailCommons.normalizeEmail(email), 'out'); - } - }); + const { query } = context.properties; + const state = context.state; + const { emails, state: newState } = await emailCommons.listNewMessages(context, query, state); - return context.saveState(newState); + await context.sendArray(emails, 'out'); + if (JSON.stringify(state != JSON.stringify(newState))) { + return context.saveState(newState); + } } }; diff --git a/src/appmixer/google/gmail/NewEmail/component.json b/src/appmixer/google/gmail/NewEmail/component.json index a2f3b3a99..5b57b78ef 100644 --- a/src/appmixer/google/gmail/NewEmail/component.json +++ b/src/appmixer/google/gmail/NewEmail/component.json @@ -24,38 +24,18 @@ "properties": { "schema": { "properties": { - "labels": { - "type": "object" + "query": { + "type": "string" } } }, "inspector": { "inputs": { - "labels": { - "type": "expression", - "label": "Labels", - "levels": [ - "AND" - ], - "index": 2, - "tooltip": "Select one or more labels to filter the emails that will trigger this event. The trigger will activate if an email matches any of the selected labels. If no labels are selected, the trigger will activate for all emails, including drafts and sent messages.", - "fields": { - "name": { - "type": "select", - "label": "Name", - "index": 1, - "tooltip": "Select a name of the existing label.", - "source": { - "url": "/component/appmixer/google/gmail/ListLabels?outPort=out", - "data": { - "properties": { - "sendWholeArray": true - }, - "transform": "./ListLabels#labelsToSelectArrayFiltered" - } - } - } - } + "query": { + "type": "text", + "label": "Email Query", + "index": 1, + "tooltip": "The search query to find new emails. This allows you to only consider email messages that can be found using the query. See Google Documentation for more info. Example: from:amy@example.com AND subject:dinner." } } } @@ -66,53 +46,71 @@ "options": [ { "label": "Email Message ID", - "value": "id" + "value": "id", + "schema": { "type": "string" } }, { "label": "Thread ID", - "value": "threadId" + "value": "threadId", + "schema": { "type": "string" } }, { "label": "Label IDs", - "value": "labelIds" + "value": "labelIds", + "schema": { "type": "array", "items": { "type": "string" } } }, { "label": "Snippet", - "value": "snippet" + "value": "snippet", + "schema": { "type": "string" } }, { "label": "SizeEstimate", - "value": "sizeEstimate" - }, - { - "label": "Date", - "value": "payload.date" - }, - { - "label": "From", - "value": "payload.from.[0].address" + "value": "sizeEstimate", + "schema": { "type": "integer" } }, { - "label": "To", - "value": "payload.to.[0].address" - }, - { - "label": "Subject", - "value": "payload.subject" - }, - { - "label": "Body", - "value": "payload.text" - }, - { - "label": "HTML", - "value": "payload.html" + "label": "Payload", + "value": "payload", + "schema": { + "type": "object", + "title": "Email Payload", + "properties": { + "date": { "type": "string", "format": "date-time", "title": "Email Date" }, + "subject": { "type": "string", "title": "Email Subject" }, + "text": { "type": "string", "title": "Email Text" }, + "html": { "type": "string", "title": "Email HTML" }, + "from": { + "type": "array", + "title": "Email Senders", + "items": { + "type": "object", + "properties": { + "address": { "type": "string" }, + "name": { "type": "string" } + } + } + }, + "to": { + "type": "array", + "title": "Email Recipients", + "items": { + "type": "object", + "properties": { + "address": { "type": "string" }, + "name": { "type": "string" } + } + } + } + } + } }, { "label": "Attachments", "value": "attachments", "schema": { "type": "array", + "title": "Email Attachments", "items": { "type": "object", "properties": { diff --git a/src/appmixer/google/gmail/gmail-commons.js b/src/appmixer/google/gmail/gmail-commons.js index a888617e6..a415be9f3 100644 --- a/src/appmixer/google/gmail/gmail-commons.js +++ b/src/appmixer/google/gmail/gmail-commons.js @@ -2,6 +2,7 @@ const mailcomposer = require('mailcomposer'); const Promise = require('bluebird'); const mimelib = require('mimelib'); +const { last } = require('lodash'); const BASE_URL = 'https://gmail.googleapis.com/gmail/v1'; @@ -33,7 +34,7 @@ function getGmailPartContent(part, _content) { } module.exports = { - async callEndpoint(context, endpoint, { method = 'GET', params = {}, data = null, headers = {} } = {}) { + async callEndpoint(context, endpoint, { method = 'GET', params = {}, data = null, headers = {}, responseType = 'json' } = {}) { const options = { method, url: `${BASE_URL}${endpoint}`, @@ -41,7 +42,8 @@ module.exports = { Authorization: `Bearer ${context.auth.accessToken}`, ...headers }, - params + params, + responseType }; if (data) { @@ -107,6 +109,7 @@ module.exports = { labelIds: gmailMessageResource.labelIds, snippet: gmailMessageResource.snippet, sizeEstimate: gmailMessageResource.sizeEstimate, + internalDate: gmailMessageResource.internalDate, payload: { date: new Date(parseInt(gmailMessageResource.internalDate, 10)), to: [], @@ -188,47 +191,100 @@ module.exports = { return labelIds.length !== 1 || (labelIds.indexOf('SENT') === -1 && labelIds.indexOf('DRAFT') === -1); }, - async listNewMessages(options, lastMessageId, result = {}, limit = null) { - const maxResults = options.maxResults || 300; - const endpoint = `/users/${options.userId}/messages`; - - do { - const response = await this.callEndpoint(options.context, endpoint, { - params: { - maxResults, - pageToken: options.pageToken || '' - } + async listNewMessages(context, query, state) { + + const newState = {}; + let maxResults; + + if (state.lastMessageInternalDate) { + // The size of the max results is not that important since + // if not all messages are fetched, the next run will fetch them + // (or the run before which the influx of emails stabilized). + maxResults = 50; + // Fetch only new messages that we haven't seen. Subtract 2 seconds to avoid + // missing messages due to the internalDate resolution (the 'after' term only + // accepts seconds while the internal date is in milliseconds). + const internalDateSeconds = Math.floor(state.lastMessageInternalDate / 1000) - 2; + query = (query ? query + ' AND ' : '') + 'after:' + internalDateSeconds; + } else { + maxResults = 1; + await context.log({ + step: 'initialization', + message: 'Fetching the latest message internal date from the inbox to be able to detect new incoming messages.', + query }); + // During the initialization phase, we only need to get the latest message + // regardless of whether it matches our query. Otherwise, we would always + // miss the first email matching the query. + query = ''; + } - if (!lastMessageId) { - return { - lastMessageId: response.data.messages ? response.data.messages[0].id : null, - newMessages: [] - }; - } + const endpoint = '/users/me/messages'; + const params = { + maxResults, + q: query + }; + const response = await this.callEndpoint(context, endpoint, { params }); + let messages = response.data.messages || []; + await context.log({ + step: 'query', + query, + messagesReturned: messages.length, + lastMessageInternalDate: state.lastMessageInternalDate, + lastMessageId: state.lastMessageId + }); - if (!result.hasOwnProperty('lastMessageId')) { - result.lastMessageId = response.data.messages ? response.data.messages[0].id : undefined; + if (state.lastMessageId) { + // Get emails that we haven't seen. state.lastMessageId contains the ID of the last + // message we have processed in the previous run. + for (let i = 0; i < messages.length; i++) { + if (messages[i].id === state.lastMessageId) { + messages = messages.slice(0, i); + break; + } } + } + + if (messages.length === 0) { + // No new messages found. + return { emails: [], state }; + } - if (!result.hasOwnProperty('newMessages')) { - result.newMessages = []; + // Fetch the full email data for new messages. + let emails = await Promise.map(messages, async (message) => { + try { + const response = await this.callEndpoint(context, `/users/me/messages/${message.id}`, { + method: 'GET', + params: { format: 'full' } + }); + return this.normalizeEmail(response.data); + } catch (err) { + // email can be deleted (permanently) in gmail between listNewMessages call and + // this getMessage call, in such case - ignore it and return null. + if (err && err.response && err.response.status === 404) { + return null; + } + throw err; } + }, { concurrency: 10 }); - const diff = this.getNewMessages(lastMessageId, response.data.messages || []); - result.newMessages = result.newMessages.concat(diff); + // Update the state with the latest message ID. + const lastMessage = emails[0]; + newState.lastMessageInternalDate = lastMessage.internalDate; + newState.lastMessageId = lastMessage.id; - if (limit && result.newMessages.length >= limit) { - return result; - } + await context.log({ step: 'emails-fetched', count: emails.length, lastMessage }); - options.pageToken = response.data.nextPageToken; - } while (options.pageToken); + if (!state.lastMessageId) { + // Init phase. Just remember the last internalDate; + await context.log({ step: 'initialized', lastMessage }); + return { emails: [], state: newState }; + } - return result; + return { emails, state: newState }; }, - getNewMessages(latestMessageId, messages) { + getNewMessages(latestMessageInternalDate, messages) { let differences = []; messages.sort((a, b) => {