Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: airtable integration #339

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions config/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"development": {
"username": "postgres",
"password": "postgres",
"database": "maxun",
"host": "localhost",
"port": 5432,
"dialect": "postgres"
}
}

35 changes: 35 additions & 0 deletions migrations/20250111140925-add-airtable-fields-to-robot.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface, Sequelize) {
// Add new Airtable-related columns to the 'robot' table
await queryInterface.addColumn('robot', 'airtable_base_id', {
type: Sequelize.STRING,
allowNull: true,
});

await queryInterface.addColumn('robot', 'airtable_table_name', {
type: Sequelize.STRING,
allowNull: true,
});

await queryInterface.addColumn('robot', 'airtable_api_key', {
type: Sequelize.STRING,
allowNull: true,
});

await queryInterface.addColumn('robot', 'airtable_access_token', {
type: Sequelize.STRING,
allowNull: true,
});
AmitChauhan63390 marked this conversation as resolved.
Show resolved Hide resolved
},

async down(queryInterface, Sequelize) {
// Remove Airtable-related columns from the 'robot' table
await queryInterface.removeColumn('robot', 'airtable_base_id');
await queryInterface.removeColumn('robot', 'airtable_table_name');
await queryInterface.removeColumn('robot', 'airtable_api_key');
await queryInterface.removeColumn('robot', 'airtable_access_token');
},
};
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"@types/react": "^18.0.5",
"@types/react-dom": "^18.0.1",
"@types/uuid": "^8.3.4",
"airtable": "^0.12.2",
"axios": "^0.26.0",
"bcrypt": "^5.1.1",
"body-parser": "^1.20.3",
Expand Down
31 changes: 28 additions & 3 deletions server/src/models/Robot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ interface RobotAttributes {
google_sheet_id?: string | null;
google_access_token?: string | null;
google_refresh_token?: string | null;
airtable_base_id?: string | null; // Airtable Base ID
airtable_table_name?: string | null; // Airtable Table Name
airtable_personal_access_token?: string | null; // Airtable Personal Access Token
airtable_access_token?: string | null; // Airtable OAuth Access Token (if using OAuth)
schedule?: ScheduleConfig | null;
}

Expand All @@ -41,18 +45,22 @@ interface ScheduleConfig {
cronExpression?: string;
}

interface RobotCreationAttributes extends Optional<RobotAttributes, 'id'> { }
interface RobotCreationAttributes extends Optional<RobotAttributes, 'id'> {}

class Robot extends Model<RobotAttributes, RobotCreationAttributes> implements RobotAttributes {
public id!: string;
public userId!: number;
public recording_meta!: RobotMeta;
public recording!: RobotWorkflow;
public google_sheet_email!: string | null;
public google_sheet_name?: string | null;
public google_sheet_id?: string | null;
public google_sheet_name!: string | null;
public google_sheet_id!: string | null;
public google_access_token!: string | null;
public google_refresh_token!: string | null;
public airtable_base_id!: string | null; // Airtable Base ID
public airtable_table_name!: string | null; // Airtable Table Name
public airtable_personal_access_token!: string | null; // Airtable Personal Access Token
public airtable_access_token!: string | null; // Airtable OAuth Access Token
public schedule!: ScheduleConfig | null;
}

Expand Down Expand Up @@ -95,6 +103,22 @@ Robot.init(
type: DataTypes.STRING,
allowNull: true,
},
airtable_base_id: {
type: DataTypes.STRING,
allowNull: true,
},
airtable_table_name: {
type: DataTypes.STRING,
allowNull: true,
},
airtable_personal_access_token: {
type: DataTypes.STRING,
allowNull: true,
},
airtable_access_token: {
type: DataTypes.STRING,
allowNull: true,
},
schedule: {
type: DataTypes.JSONB,
allowNull: true,
Expand All @@ -107,6 +131,7 @@ Robot.init(
}
);

// Uncomment and define relationships if needed
// Robot.hasMany(Run, {
// foreignKey: 'robotId',
// as: 'runs', // Alias for the relation
Expand Down
139 changes: 139 additions & 0 deletions server/src/workflow-management/integrations/airtableintegration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import Airtable from 'airtable';
import logger from '../../logger';
import Run from '../../models/Run';
import Robot from '../../models/Robot';

interface AirtableUpdateTask {
robotId: string;
runId: string;
status: 'pending' | 'completed' | 'failed';
retries: number;
}

const MAX_RETRIES = 5;

export let airtableUpdateTasks: { [runId: string]: AirtableUpdateTask } = {};

/**
* Updates Airtable with data from a successful run.
* @param robotId - The ID of the robot.
* @param runId - The ID of the run.
*/
export async function updateAirtable(robotId: string, runId: string) {
try {
const run = await Run.findOne({ where: { runId } });

if (!run) {
throw new Error(`Run not found for runId: ${runId}`);
}

const plainRun = run.toJSON();

if (plainRun.status === 'success') {
let data: { [key: string]: any }[] = [];
if (plainRun.serializableOutput && Object.keys(plainRun.serializableOutput).length > 0) {
data = plainRun.serializableOutput['item-0'] as { [key: string]: any }[];
} else if (plainRun.binaryOutput && plainRun.binaryOutput['item-0']) {
const binaryUrl = plainRun.binaryOutput['item-0'] as string;
data = [{ "Screenshot URL": binaryUrl }];
}

const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });

if (!robot) {
throw new Error(`Robot not found for robotId: ${robotId}`);
}

const plainRobot = robot.toJSON();

const tableName = plainRobot.airtable_table_name;
const baseId = plainRobot.airtable_base_id;
const personalAccessToken = plainRobot.airtable_personal_access_token;

if (tableName && baseId && personalAccessToken) {
console.log(`Preparing to write data to Airtable for robot: ${robotId}, table: ${tableName}`);

await writeDataToAirtable(baseId, tableName, personalAccessToken, data);
console.log(`Data written to Airtable successfully for Robot: ${robotId} and Run: ${runId}`);
} else {
console.log('Airtable integration not configured.');
}
} else {
console.log('Run status is not success or serializableOutput is missing.');
}
} catch (error: any) {
console.error(`Failed to write data to Airtable for Robot: ${robotId} and Run: ${runId}: ${error.message}`);
}
}

/**
* Writes data to Airtable.
* @param baseId - The ID of the Airtable base.
* @param tableName - The name of the Airtable table.
* @param personalAccessToken - The Airtable Personal Access Token.
* @param data - The data to write to Airtable.
*/
export async function writeDataToAirtable(baseId: string, tableName: string, personalAccessToken: string, data: any[]) {
try {
// Initialize Airtable with Personal Access Token
const base = new Airtable({ apiKey: personalAccessToken }).base(baseId);

const table = base(tableName);

// Prepare records for Airtable
const records = data.map((row) => ({ fields: row }));

// Write data to Airtable
const response = await table.create(records);

if (response) {
console.log('Data successfully appended to Airtable.');
} else {
console.error('Airtable append failed:', response);
}

logger.log(`info`, `Data written to Airtable: ${tableName}`);
} catch (error: any) {
logger.log(`error`, `Error writing data to Airtable: ${error.message}`);
throw error;
}
}

/**
* Processes pending Airtable update tasks.
*/
export const processAirtableUpdates = async () => {
while (true) {
let hasPendingTasks = false;
for (const runId in airtableUpdateTasks) {
const task = airtableUpdateTasks[runId];
console.log(`Processing task for runId: ${runId}, status: ${task.status}`);

if (task.status === 'pending') {
hasPendingTasks = true;
try {
await updateAirtable(task.robotId, task.runId);
console.log(`Successfully updated Airtable for runId: ${runId}`);
delete airtableUpdateTasks[runId];
} catch (error: any) {
console.error(`Failed to update Airtable for run ${task.runId}:`, error);
if (task.retries < MAX_RETRIES) {
airtableUpdateTasks[runId].retries += 1;
console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries}`);
} else {
airtableUpdateTasks[runId].status = 'failed';
console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`);
}
}
}
}
Comment on lines +108 to +129
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid modifying airtableUpdateTasks while iterating over it.

Deleting properties from airtableUpdateTasks during iteration can lead to unexpected behavior or skipped entries. This might cause some tasks to be missed during processing.

Refactor the code to collect the tasks to delete and remove them after the iteration. Apply this diff:

 export const processAirtableUpdates = async () => {
   while (true) {
     let hasPendingTasks = false;
+    const tasksToDelete: string[] = [];

     for (const runId in airtableUpdateTasks) {
       const task = airtableUpdateTasks[runId];
       console.log(`Processing task for runId: ${runId}, status: ${task.status}`);

       if (task.status === 'pending') {
         hasPendingTasks = true;
         try {
           await updateAirtable(task.robotId, task.runId);
           console.log(`Successfully updated Airtable for runId: ${runId}`);
-          delete airtableUpdateTasks[runId];
+          tasksToDelete.push(runId);
         } catch (error: any) {
           console.error(`Failed to update Airtable for run ${task.runId}:`, error);
           if (task.retries < MAX_RETRIES) {
             airtableUpdateTasks[runId].retries += 1;
             console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries}`);
           } else {
             airtableUpdateTasks[runId].status = 'failed';
             console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`);
           }
         }
       }
     }

+    // Remove tasks after iteration
+    for (const runId of tasksToDelete) {
+      delete airtableUpdateTasks[runId];
+    }

     if (!hasPendingTasks) {
       console.log('No pending tasks. Exiting loop.');
       break;
     }

     console.log('Waiting for 5 seconds before checking again...');
     await new Promise((resolve) => setTimeout(resolve, 5000));
   }
 };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (const runId in airtableUpdateTasks) {
const task = airtableUpdateTasks[runId];
console.log(`Processing task for runId: ${runId}, status: ${task.status}`);
if (task.status === 'pending') {
hasPendingTasks = true;
try {
await updateAirtable(task.robotId, task.runId);
console.log(`Successfully updated Airtable for runId: ${runId}`);
delete airtableUpdateTasks[runId];
} catch (error: any) {
console.error(`Failed to update Airtable for run ${task.runId}:`, error);
if (task.retries < MAX_RETRIES) {
airtableUpdateTasks[runId].retries += 1;
console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries}`);
} else {
airtableUpdateTasks[runId].status = 'failed';
console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`);
}
}
}
}
for (const runId in airtableUpdateTasks) {
const task = airtableUpdateTasks[runId];
console.log(`Processing task for runId: ${runId}, status: ${task.status}`);
if (task.status === 'pending') {
hasPendingTasks = true;
try {
await updateAirtable(task.robotId, task.runId);
console.log(`Successfully updated Airtable for runId: ${runId}`);
tasksToDelete.push(runId);
} catch (error: any) {
console.error(`Failed to update Airtable for run ${task.runId}:`, error);
if (task.retries < MAX_RETRIES) {
airtableUpdateTasks[runId].retries += 1;
console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries}`);
} else {
airtableUpdateTasks[runId].status = 'failed';
console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`);
}
}
}
}
// Remove tasks after iteration
for (const runId of tasksToDelete) {
delete airtableUpdateTasks[runId];
}


if (!hasPendingTasks) {
console.log('No pending tasks. Exiting loop.');
break;
}

console.log('Waiting for 5 seconds before checking again...');
await new Promise((resolve) => setTimeout(resolve, 5000));
}
};
14 changes: 13 additions & 1 deletion server/src/workflow-management/scheduler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { createRemoteBrowserForRun, destroyRemoteBrowser } from '../../browser-m
import logger from '../../logger';
import { browserPool } from "../../server";
import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integrations/gsheet";
import { airtableUpdateTasks, processAirtableUpdates } from "../integrations/airtableintegration"; // Import Airtable functions
import Robot from "../../models/Robot";
import Run from "../../models/Run";
import { getDecryptedProxyConfig } from "../../routes/proxy";
Expand Down Expand Up @@ -44,7 +45,7 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) {
};
}

const browserId = createRemoteBrowserForRun( userId);
const browserId = createRemoteBrowserForRun(userId);
const runId = uuid();

const run = await Run.create({
Expand Down Expand Up @@ -177,13 +178,24 @@ async function executeRun(id: string) {
}
);

// Add task for Google Sheets update
googleSheetUpdateTasks[id] = {
robotId: plainRun.robotMetaId,
runId: id,
status: 'pending',
retries: 5,
};
processGoogleSheetUpdates();

// Add task for Airtable update
airtableUpdateTasks[id] = {
robotId: plainRun.robotMetaId,
runId: id,
status: 'pending',
retries: 5,
};
processAirtableUpdates();

Comment on lines +181 to +198
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and consider refactoring update tasks.

While the implementation mirrors the Google Sheets structure, there are opportunities for improvement:

  1. Error handling is missing for both processGoogleSheetUpdates() and processAirtableUpdates() calls
  2. The task creation logic is duplicated and could be abstracted

Consider refactoring to:

+ const createIntegrationTask = (type: 'googleSheet' | 'airtable', id: string, robotId: string) => {
+   const tasks = type === 'googleSheet' ? googleSheetUpdateTasks : airtableUpdateTasks;
+   tasks[id] = {
+     robotId,
+     runId: id,
+     status: 'pending',
+     retries: 5,
+   };
+ };

+ try {
+   // Create and process Google Sheets task
+   createIntegrationTask('googleSheet', id, plainRun.robotMetaId);
+   await processGoogleSheetUpdates();
+
+   // Create and process Airtable task
+   createIntegrationTask('airtable', id, plainRun.robotMetaId);
+   await processAirtableUpdates();
+ } catch (error) {
+   logger.error(`Failed to process integration updates: ${error.message}`);
+   // Consider whether to fail the run or just log the error
+ }
-    // Add task for Google Sheets update
-    googleSheetUpdateTasks[id] = {
-      robotId: plainRun.robotMetaId,
-      runId: id,
-      status: 'pending',
-      retries: 5,
-    };
-    processGoogleSheetUpdates();
-
-    // Add task for Airtable update
-    airtableUpdateTasks[id] = {
-      robotId: plainRun.robotMetaId,
-      runId: id,
-      status: 'pending',
-      retries: 5,
-    };
-    processAirtableUpdates();

Committable suggestion skipped: line range outside the PR's diff.

return true;
} catch (error: any) {
logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`);
Expand Down
Loading