Skip to content

Commit

Permalink
🚑 Ingest data
Browse files Browse the repository at this point in the history
  • Loading branch information
naelob committed Jun 26, 2024
1 parent 7c5c29f commit 3d358cb
Show file tree
Hide file tree
Showing 206 changed files with 1,017 additions and 1,247 deletions.
2 changes: 1 addition & 1 deletion packages/api/scripts/commonObject.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ import { Unified${ObjectCap}Output } from '../types/model.unified';
import { I${ObjectCap}Service } from '../types';
@Injectable()
export class SyncService implements OnModuleInit {
export class SyncService implements OnModuleInit, IBaseSync {
constructor(
private prisma: PrismaService,
private logger: LoggerService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import { IBaseSync } from '@@core/utils/types/interface';
import { Injectable } from '@nestjs/common';

@Injectable()
export class CoreSyncRegistry {
private serviceMap: Map<string, any>;
private serviceMap: Map<string, IBaseSync>;

constructor() {
this.serviceMap = new Map<string, any>();
this.serviceMap = new Map<string, IBaseSync>();
}

// Register a service with a composite key
registerService(
category_vertical: string,
common_object: string,
service: any,
service: IBaseSync,
) {
const compositeKey = this.createCompositeKey(
category_vertical,
Expand All @@ -22,7 +23,7 @@ export class CoreSyncRegistry {
}

// Retrieve a service using the composite key
getService(category_vertical: string, common_object: string): any {
getService(category_vertical: string, common_object: string): IBaseSync {
const compositeKey = this.createCompositeKey(
category_vertical,
common_object,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Injectable } from '@nestjs/common';
import { CoreSyncRegistry } from '../registries/core-sync.registry';
import { CoreUnification } from './core-unification.service';
import { v4 as uuidv4 } from 'uuid';
import { PrismaService } from '../prisma/prisma.service';
import { TargetObject } from '@@core/utils/types';
import { UnifySourceType } from '@@core/utils/types/unify.output';
import { WebhookService } from '../webhooks/panora-webhooks/webhook.service';
import { ConnectionUtils } from '@@core/connections/@utils';

@Injectable()
export class IngestDataService {
constructor(
private syncRegistry: CoreSyncRegistry,
private coreUnification: CoreUnification,
private webhook: WebhookService,
private prisma: PrismaService,
private connectionUtils: ConnectionUtils,
) {}

async ingestData<T, U = UnifySourceType>(
sourceObject: U[],
integrationId: string,
connectionId: string,
vertical: string,
commonObject: string,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
): Promise<any[]> {
const unifiedObject = (await this.coreUnification.unify<U[]>({
sourceObject,
targetType: commonObject as TargetObject,
providerName: integrationId,
vertical: vertical,
connectionId: connectionId,
customFieldMappings,
})) as T[];

const { linkedUserId, projectId } =
await this.connectionUtils.getConnectionMetadataFromConnectionId(
connectionId,
);

// insert the data in the DB with the fieldMappings (value table)
const data = await this.syncRegistry
.getService(vertical, commonObject)
.saveToDb(
connectionId,
linkedUserId,
unifiedObject,
integrationId,
sourceObject,
);
const event = await this.prisma.events.create({
data: {
id_event: uuidv4(),
status: 'success',
type: `${vertical}.${commonObject}.synced`,
method: 'SYNC',
url: '/sync',
provider: integrationId,
direction: '0',
timestamp: new Date(),
id_linked_user: linkedUserId,
},
});
await this.webhook.dispatchWebhook(
data,
`${vertical}.${commonObject}.pulled`,
projectId,
event.id_event,
);
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class WebhookProcessor {
});

//re-insert the webhook in the queue
await this.webhookService.handleFailedWebhook(id_webhook_delivery);
await this.webhookService.dispatchFailedWebhook(id_webhook_delivery);

this.logger.log(
'Webhook delivery failed. Job reinserted in the queue for retry.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export class WebhookService {
}
}

async handleWebhook(
async dispatchWebhook(
data: any,
eventType: string,
projectId: string,
Expand Down Expand Up @@ -144,7 +144,7 @@ export class WebhookService {
}
}

async handlePriorityWebhook(
async deliverWebhook(
data: any,
eventType: string,
projectId: string,
Expand Down Expand Up @@ -258,7 +258,7 @@ export class WebhookService {
});

//re-insert the webhook in the queue
await this.handleFailedWebhook(
await this.dispatchFailedWebhook(
w_delivery.id_webhook_delivery_attempt,
);
}
Expand All @@ -268,7 +268,7 @@ export class WebhookService {
}
}

async handleFailedWebhook(failed_id_delivery_webhook: string) {
async dispatchFailedWebhook(failed_id_delivery_webhook: string) {
try {
await this.queues.getPanoraWebhookSender().add(
{
Expand Down
14 changes: 14 additions & 0 deletions packages/api/src/@core/connections/@utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ export class ConnectionUtils {
}
}

async getConnectionMetadataFromConnectionId(uuid: string) {
try {
const conn = await this.prisma.connections.findUnique({
where: {
id_connection: uuid,
},
});
return {
linkedUserId: conn.id_linked_user,
projectId: conn.id_project,
};
} catch (error) {}
}

async getLinkedUserId(
projectId: string,
linkedUserId: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class AccountingConnectionsService implements IConnectionCategory {
},
});
//directly send the webhook
await this.webhook.handlePriorityWebhook(
await this.webhook.deliverWebhook(
data,
'connection.created',
callbackOpts.projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class AtsConnectionsService implements IConnectionCategory {
},
});
//directly send the webhook
await this.webhook.handlePriorityWebhook(
await this.webhook.deliverWebhook(
data,
'connection.created',
callbackOpts.projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class CrmConnectionsService implements IConnectionCategory {
},
});
//directly send the webhook
await this.webhook.handlePriorityWebhook(
await this.webhook.deliverWebhook(
data,
'connection.created',
callbackOpts.projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class FilestorageConnectionsService implements IConnectionCategory {
},
});
//directly send the webhook
await this.webhook.handlePriorityWebhook(
await this.webhook.deliverWebhook(
data,
'connection.created',
callbackOpts.projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class HrisConnectionsService implements IConnectionCategory {
},
});
//directly send the webhook
await this.webhook.handlePriorityWebhook(
await this.webhook.deliverWebhook(
data,
'connection.created',
callbackOpts.projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class ManagementConnectionsService implements IConnectionCategory {
},
});
//directly send the webhook
await this.webhook.handlePriorityWebhook(
await this.webhook.deliverWebhook(
data,
'connection.created',
callbackOpts.projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class MarketingAutomationConnectionsService
},
});
//directly send the webhook
await this.webhook.handlePriorityWebhook(
await this.webhook.deliverWebhook(
data,
'connection.created',
callbackOpts.projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class TicketingConnectionsService implements IConnectionCategory {
},
});
//directly send the webhook
await this.webhook.handlePriorityWebhook(
await this.webhook.deliverWebhook(
data,
'connection.created',
callbackOpts.projectId,
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/@core/core.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { BullQueueModule } from './@core-services/queues/queue.module';
import { WebhookModule } from './@core-services/webhooks/panora-webhooks/webhook.module';
import { ManagedWebhooksModule } from './@core-services/webhooks/third-parties-webhooks/managed-webhooks.module';
import { CategoryConnectionRegistry } from './@core-services/registries/connections-categories.registry';
import { IngestDataService } from './@core-services/unification/ingest-data.service';

@Module({
imports: [
Expand Down Expand Up @@ -71,6 +72,7 @@ import { CategoryConnectionRegistry } from './@core-services/registries/connecti
UnificationRegistry,
CoreUnification,
CoreSyncRegistry,
IngestDataService,
CategoryConnectionRegistry,
],
})
Expand Down
10 changes: 10 additions & 0 deletions packages/api/src/@core/utils/types/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,13 @@ export interface IUnification {
}[];
}): Promise<UnifyReturnType>;
}

export interface IBaseSync {
saveToDb(
connection_id: string,
linkedUserId: string,
data: any[],
originSource: string,
remote_data: Record<string, any>[],
): Promise<any[]>;
}
2 changes: 2 additions & 0 deletions packages/api/src/accounting/account/account.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { AccountController } from './account.controller';
import { AccountService } from './services/account.service';
import { ServiceRegistry } from './services/registry.service';
import { SyncService } from './sync/sync.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';

@Module({
controllers: [AccountController],
Expand All @@ -20,6 +21,7 @@ import { SyncService } from './sync/sync.service';
FieldMappingService,
ServiceRegistry,
ConnectionUtils,
IngestDataService,
/* PROVIDERS SERVICES */
],
exports: [SyncService],
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/accounting/account/sync/sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Injectable, OnModuleInit } from '@nestjs/common';
import { ServiceRegistry } from '../services/registry.service';

@Injectable()
export class SyncService implements OnModuleInit {
export class SyncService implements OnModuleInit, IBaseSync {
constructor(
private prisma: PrismaService,
private logger: LoggerService,
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/accounting/address/address.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { AddressController } from './address.controller';
import { AddressService } from './services/address.service';
import { ServiceRegistry } from './services/registry.service';
import { SyncService } from './sync/sync.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';

@Module({
controllers: [AddressController],
Expand All @@ -20,6 +21,7 @@ import { SyncService } from './sync/sync.service';
FieldMappingService,
ServiceRegistry,
ConnectionUtils,
IngestDataService,
/* PROVIDERS SERVICES */
],
exports: [SyncService],
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/accounting/address/sync/sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { UnifiedAddressOutput } from '../types/model.unified';
import { IAddressService } from '../types';

@Injectable()
export class SyncService implements OnModuleInit {
export class SyncService implements OnModuleInit, IBaseSync {
constructor(
private prisma: PrismaService,
private logger: LoggerService,
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/accounting/attachment/attachment.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { AttachmentController } from './attachment.controller';
import { AttachmentService } from './services/attachment.service';
import { ServiceRegistry } from './services/registry.service';
import { SyncService } from './sync/sync.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';

@Module({
controllers: [AttachmentController],
Expand All @@ -20,6 +21,7 @@ import { SyncService } from './sync/sync.service';
FieldMappingService,
ServiceRegistry,
ConnectionUtils,
IngestDataService,
/* PROVIDERS SERVICES */
],
exports: [SyncService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { UnifiedAttachmentOutput } from '../types/model.unified';
import { IAttachmentService } from '../types';

@Injectable()
export class SyncService implements OnModuleInit {
export class SyncService implements OnModuleInit, IBaseSync {
constructor(
private prisma: PrismaService,
private logger: LoggerService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { BalanceSheetController } from './balancesheet.controller';
import { BalanceSheetService } from './services/balancesheet.service';
import { ServiceRegistry } from './services/registry.service';
import { SyncService } from './sync/sync.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';

@Module({
controllers: [BalanceSheetController],
Expand All @@ -20,6 +21,7 @@ import { SyncService } from './sync/sync.service';
FieldMappingService,
ServiceRegistry,
ConnectionUtils,
IngestDataService,
/* PROVIDERS SERVICES */
],
exports: [SyncService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { UnifiedBalanceSheetOutput } from '../types/model.unified';
import { IBalanceSheetService } from '../types';

@Injectable()
export class SyncService implements OnModuleInit {
export class SyncService implements OnModuleInit, IBaseSync {
constructor(
private prisma: PrismaService,
private logger: LoggerService,
Expand Down
Loading

0 comments on commit 3d358cb

Please sign in to comment.