Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
4rthem committed Dec 7, 2023
1 parent 6d8532a commit 36e7cb3
Show file tree
Hide file tree
Showing 35 changed files with 181 additions and 163 deletions.
5 changes: 5 additions & 0 deletions databox/indexer/loader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {isBuiltin} from 'node:module'

// noinspection JSUnusedGlobalSymbols
export const resolve = (specifier, context, nextResolve) => // This function can be `async` too
nextResolve(isBuiltin(specifier) || specifier.endsWith('.js') ? specifier : `${specifier}.js`, context)
1 change: 1 addition & 0 deletions databox/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"name": "@phrasea/databox-indexer",
"version": "1.0.0",
"description": "Consumes S3 events from AMQP and synchronize Databox",
"type": "module",
"main": "dist/index.js",
"license": "MIT",
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion databox/indexer/src/alternateUrl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export function getAlternateUrls(
return alternateUrls.map((c): AlternateUrl => {
return {
type: c.name,
url: c.pathPattern.replace(/\${(.+)}/g, (m, m1) => {
url: c.pathPattern.replace(/\${(.+)}/g, (_m, m1) => {
return dict[m1];
}),
}
Expand Down
11 changes: 7 additions & 4 deletions databox/indexer/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ export function listenToQueue(
): void {
logger.info(`AMQP: Connecting...`);

const connect = () => {
const connect = (): Promise<{
channel: Channel,
connection: Connection
}> => {
return amqplib
.connect(dsn)
.then(function (conn) {
Expand Down Expand Up @@ -53,17 +56,17 @@ export function listenToQueue(
});

logger.debug('AMQP: prefetching channel...');
await channel.prefetch(parseInt(getEnv('DATABOX_CONCURRENCY', '2')));
await channel.prefetch(parseInt(getEnv('DATABOX_CONCURRENCY', '2')!));

return channel;
})
.then(function (ch) {
return ch.assertQueue(queueName)
.then(function (ok) {
.then(function () {
logger.debug('AMQP: wait for events...');
return ch
.consume(queueName, function (msg) {
if (msg !== null) {
if (msg) {
callback(msg.content.toString())
.then(() => {
ch.ack(msg)
Expand Down
10 changes: 5 additions & 5 deletions databox/indexer/src/command/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {createDataboxClientFromConfig} from "../databox/client";
import {createLogger} from "../lib/logger";
import {indexers} from "../indexers";
import {getLocation} from "../locations";
import {consume} from "../databox/entrypoint";
import {createDataboxClientFromConfig} from "../databox/client.js";
import {createLogger} from "../lib/logger.js";
import {indexers} from "../indexers.js";
import {getLocation} from "../locations.js";
import {consume} from "../databox/entrypoint.js";

export type IndexOptions = {
createNewWorkspace?: boolean;
Expand Down
6 changes: 5 additions & 1 deletion databox/indexer/src/configLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ function loadConfig(): object {
function replaceEnv(str: string): string | boolean | number | undefined {
let transform;
let hasEnv = false;
let result: string | undefined = str.replace(/%env\(([^^)]+)\)%/g, (match, varName: string) => {
let result: string | undefined = str.replace(/%env\(([^^)]+)\)%/g, (_match, varName: string) => {
const s = varName;
hasEnv = true;

Expand Down Expand Up @@ -122,6 +122,10 @@ export function castToInt(value: string | number | null | undefined): number | u
return value;
}

if (!value) {
return;
}

const n = parseInt(value);

if (!isNaN(n)) {
Expand Down
2 changes: 1 addition & 1 deletion databox/indexer/src/console.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Command} from 'commander';
import indexCommand from "./command";
import indexCommand from "./command/index.js";

const program = new Command();

Expand Down
28 changes: 14 additions & 14 deletions databox/indexer/src/databox/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import {createHttpClient} from "../lib/axios";
import {configureClientCredentialsGrantType, OAuthClient} from "@alchemy/auth";
import {MemoryStorage} from "@alchemy/storage";

function createApiClient(baseURL: string, clientId: string, clientSecret: string, verifySSL: boolean) {
function createApiClient(baseURL: string, clientId: string, clientSecret: string, verifySSL: boolean, scope?: string) {
const oauthClient = new OAuthClient({
clientId,
clientSecret,
scope,
baseUrl: `${baseURL}/oauth/v2`,
storage: new MemoryStorage(),
})
Expand All @@ -33,7 +34,7 @@ type ClientParameters = {
apiUrl: string;
clientId: string;
clientSecret: string;
scope: string;
scope?: string;
verifySSL: boolean;
ownerId: string;
}
Expand All @@ -46,11 +47,7 @@ export class DataboxClient {
private readonly client: AxiosInstance;
private readonly oauthClient: OAuthClient;
private readonly logger: Logger;
private readonly clientId: string;
private readonly clientSecret: string;
private readonly ownerId: string;
private readonly scope: string;
private authPromise?: Promise<void>;

constructor({
apiUrl,
Expand All @@ -60,13 +57,16 @@ export class DataboxClient {
ownerId,
verifySSL = true,
}: ClientParameters, logger: Logger) {
const {client, oauthClient} = createApiClient(apiUrl, clientId, clientSecret, verifySSL);
const {client, oauthClient} = createApiClient(
apiUrl,
clientId,
clientSecret,
verifySSL,
scope,
);
this.client = client;
this.oauthClient = oauthClient;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.ownerId = ownerId;
this.scope = scope;
this.logger = logger;
}

Expand Down Expand Up @@ -128,10 +128,10 @@ export class DataboxClient {
}

async createCollectionTreeBranch(data: CollectionInput[]): Promise<string> {
let parentId: string = undefined;
const previousKeys = [];
let parentId: string | undefined = undefined;
const previousKeys: string[] = [];
for (let i = 0; i < data.length; ++i) {
previousKeys.push(data[i].key);
previousKeys.push(data[i].key!);

const key = previousKeys.join('/');
const id = await this.createCollection(key, {
Expand All @@ -142,7 +142,7 @@ export class DataboxClient {
parentId = `/collections/${id}`;
}

return parentId;
return parentId!;
}

async createAttributeDefinition(key: string, data: Partial<AttributeDefinition>): Promise<AttributeDefinition> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const collectionBasedOnPathStrategy: IndexAsset = async (
key: k,
title: k
})));
} catch (e) {
} catch (e: any) {
logger.error(`Failed to create collection branch "${branch.join('/')}": ${e.toString()}`);
throw e;
}
Expand All @@ -43,7 +43,7 @@ export const collectionBasedOnPathStrategy: IndexAsset = async (
attributes: asset.attributes,
renditions: asset.renditions,
});
} catch (e) {
} catch (e: any) {
logger.error(`Failed to create asset "${path}": ${e.toString()}`);
throw e;
}
Expand Down
4 changes: 2 additions & 2 deletions databox/indexer/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ export function getEnvStrict(name: string): string {
process.exit(1);
}

return v;
return v!;
}

export function getEnv(name: string, defaultValue: string = undefined): string | undefined {
export function getEnv(name: string, defaultValue?: string): string | undefined {
if (process.env.hasOwnProperty(name)) {
return process.env[name] || defaultValue;
}
Expand Down
6 changes: 3 additions & 3 deletions databox/indexer/src/eventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export async function handlePutObject(asset: Asset, location: IndexLocation<any>

try {
await collectionBasedOnPathStrategy(asset, location, databoxClient, logger);
} catch (error) {
} catch (error: any) {
if (error.response) {
console.error(error.response.data);
}
Expand All @@ -21,10 +21,10 @@ export async function handlePutObject(asset: Asset, location: IndexLocation<any>
}
}

export async function handleDeleteObject(asset: Asset, databoxClient: DataboxClient, logger: Logger) {
export async function handleDeleteObject(asset: Asset, databoxClient: DataboxClient, _logger: Logger) {
try {
await databoxClient.deleteAsset(asset.workspaceId, asset.path);
} catch (error) {
} catch (error: any) {
if (error.response) {
console.error(error.response.data);
}
Expand Down
2 changes: 1 addition & 1 deletion databox/indexer/src/handlers/fs/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {getStrict} from "../../configLoader";

export const fsIndexer: IndexIterator<FsConfig> = async function* (
location,
logger,
_logger,
databoxClient
) {
const {
Expand Down
2 changes: 1 addition & 1 deletion databox/indexer/src/handlers/fs/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export const fsAssetServerFactory: AssetServerFactory<FsConfig> = function (loca
dirPrefix,
} = getDirConfig(location.options);

return async (path, res, query) => {
return async (path, res) => {
const storagePath = dirPrefix ? watchDir + path.substring(dirPrefix.length) : path;
if (!fs.existsSync(storagePath)) {
return notFound(res, `"${storagePath}" not found`, logger);
Expand Down
16 changes: 9 additions & 7 deletions databox/indexer/src/handlers/fs/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export async function fsWatcher(location: IndexLocation<FsConfig>, databoxClient

const workspaceId = await databoxClient.getWorkspaceIdFromSlug(getStrict('workspaceSlug', location.options));

function storeEvent(eventType: string, path: string): Promise<void> {
async function storeEvent(eventType: string, path: string): Promise<void> {
logger.debug(`${eventType}: ${path}`);

const asset = createAsset(
Expand All @@ -30,9 +30,11 @@ export async function fsWatcher(location: IndexLocation<FsConfig>, databoxClient

switch (eventType) {
case 'add':
return handlePutObject(asset, location, databoxClient, logger);
handlePutObject(asset, location, databoxClient, logger);
break;
case 'unlink':
return handleDeleteObject(asset, databoxClient, logger);
handleDeleteObject(asset, databoxClient, logger);
break;
}
}

Expand All @@ -41,9 +43,9 @@ export async function fsWatcher(location: IndexLocation<FsConfig>, databoxClient
chokidar.watch(watchDir, {
ignoreInitial: true,
}).on('all', storeEvent);
} catch (err) {
if (err.name === 'AbortError')
return;
throw err;
} catch (err: any) {
if (err.name !== 'AbortError') {
throw err;
}
}
}
2 changes: 1 addition & 1 deletion databox/indexer/src/handlers/phraseanet/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {AssetServerFactory} from "../../server";
import {PhraseanetConfig} from "./types";

export const phraseanetAssetServerFactory: AssetServerFactory<PhraseanetConfig> = function () {
return async (path, res) => {
return async (_path, res) => {
res.redirect(307, 'http://localhost');
}
}
4 changes: 2 additions & 2 deletions databox/indexer/src/handlers/phraseanet/shared.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Asset} from "../../indexers";
import {PhraseanetRecord, SubDef} from "./types";
import {escapeSlashes} from "../../lib/pathUtils";
import {AttributeClass, AttributeInput} from "../../databox/types";
import {AttributeClass, AttributeInput, RenditionInput} from "../../databox/types";

const renditionDefinitionMapping = {
document: 'original',
Expand Down Expand Up @@ -67,7 +67,7 @@ export function createAsset(
type: s.mime_type,
}
};
}).filter(s => Boolean(s)),
}).filter(s => Boolean(s)) as RenditionInput[],
};
}

Expand Down
2 changes: 1 addition & 1 deletion databox/indexer/src/handlers/phraseanet/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import {DataboxClient} from "../../databox/client";
import {Logger} from "winston";
import {PhraseanetConfig} from "./types";

export function phraseanetWatcher(location: IndexLocation<PhraseanetConfig>, databoxClient: DataboxClient, logger: Logger) {
export function phraseanetWatcher(_location: IndexLocation<PhraseanetConfig>, _databoxClient: DataboxClient, _logger: Logger) {
}
3 changes: 1 addition & 2 deletions databox/indexer/src/handlers/s3_amqp/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ export function createS3ClientFromConfig(config: S3AmqpConfig) {
} = url.parse(getStrict('s3.endpoint', config));

return createS3Client({
type: 's3',
useSSL: protocol === 'https:',
insecure: true,
endPoint: hostname,
endPoint: hostname!,
port: port ? parseInt(port) : undefined,
accessKey: getStrict('s3.accessKey', config),
secretKey: getStrict('s3.secretKey', config),
Expand Down
7 changes: 5 additions & 2 deletions databox/indexer/src/lib/axios.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ export function createHttpClient({
shouldResetTimeout: true,
retryCondition: (error) => {
const {config} = error;
logger.warn(`Request "${config.method.toUpperCase()} ${config.url}" failed, retrying...`);
if (!config) {
return false;
}
logger.warn(`Request "${config.method?.toUpperCase()} ${config.url}" failed, retrying...`);

if (error.response) {
if ([500, 400, 422, 404, 403, 401].includes(error.response.status)) {
return false;
}

logger.debug(`Request "${config.method.toUpperCase()} ${config.url}" response ${error.response.status}: ${JSON.stringify(error.response.data)}`);
logger.debug(`Request "${config.method?.toUpperCase()} ${config.url}" response ${error.response.status}: ${JSON.stringify(error.response.data)}`);
}

return true;
Expand Down
1 change: 1 addition & 0 deletions databox/indexer/src/lib/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
const promises: Record<string, Promise<any>> = {};

export function lockPromise<T>(key: string, handler: () => Promise<T>): Promise<T> {
// @ts-expect-error wrong TS interpretation
if (promises[key]) {
return promises[key];
}
Expand Down
2 changes: 1 addition & 1 deletion databox/indexer/src/lib/streamify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export async function* streamify(stream: Readable, event: string, endEvent: stri
break;
}

yield r.value;
yield r.value!;
stream.resume();
}
}
Expand Down
3 changes: 2 additions & 1 deletion databox/indexer/src/lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ export function forceArray<D = any, T = undefined | null>(object: object | Array
}

if (typeof object === 'object') {
return Object.keys(object).map(k => object[k]);
// @ts-expect-error object can be null
return Object.keys(object).map((k) => object[k]);
}

return object;
Expand Down
Loading

0 comments on commit 36e7cb3

Please sign in to comment.