Skip to content

Commit ce61c07

Browse files
committed
resolved issues with mediator registration
1 parent 79a9245 commit ce61c07

File tree

7 files changed

+228
-40
lines changed

7 files changed

+228
-40
lines changed

src/index.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import path from 'path';
33
import { getConfig } from './config/config';
44
import logger from './logger';
55
import routes from './routes/index';
6-
import { getRegisterBuckets, setupMediator } from './openhim/openhim';
6+
import { getRegisteredBuckets, setupMediator } from './openhim/openhim';
77
import { createMinioBucketListeners, ensureBucketExists } from './utils/minioClient';
88

99
const app = express();
@@ -14,10 +14,10 @@ app.listen(getConfig().port, async () => {
1414
logger.info(`Server is running on port - ${getConfig().port}`);
1515

1616
if (getConfig().runningMode !== 'testing' && getConfig().registerMediator) {
17-
setupMediator();
17+
await setupMediator();
1818
}
1919

20-
const buckets = await getRegisterBuckets();
20+
const buckets = await getRegisteredBuckets();
2121

2222
buckets.length === 0 && logger.warn('No buckets specified in the configuration');
2323

src/openhim/mediatorConfig.json

+6-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"routes": [
1111
{
1212
"name": "Climate Mediator Endpoint",
13-
"host": "172.17.0.1",
13+
"host": "climate-mediator",
1414
"port": "3000",
1515
"primary": true,
1616
"type": "http"
@@ -29,7 +29,7 @@
2929
"endpoints": [
3030
{
3131
"name": "Climate Endpoint",
32-
"host": "172.17.0.1",
32+
"host": "climate-mediator",
3333
"path": "/test",
3434
"port": "3000",
3535
"primary": true,
@@ -57,5 +57,8 @@
5757
}
5858
]
5959
}
60-
]
60+
],
61+
"config": {
62+
"minio_buckets_registry": []
63+
}
6164
}

src/openhim/openhim.ts

+98-19
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
import logger from '../logger';
2+
import { MinioBucketsRegistry, Mediator as OpenHimAPIMediator } from '../types';
23
import { MediatorConfig } from '../types/mediatorConfig';
34
import { RequestOptions } from '../types/request';
45
import { getConfig } from '../config/config';
56
import axios, { AxiosError } from 'axios';
67
import https from 'https';
7-
import {
8-
activateHeartbeat,
9-
fetchConfig,
10-
registerMediator,
11-
authenticate,
12-
genAuthHeaders,
13-
} from 'openhim-mediator-utils';
8+
import { activateHeartbeat, fetchConfig, registerMediator } from 'openhim-mediator-utils';
149
import { Bucket, createMinioBucketListeners, ensureBucketExists } from '../utils/minioClient';
1510
import path from 'path';
1611

@@ -23,7 +18,7 @@ const resolveMediatorConfig = (): MediatorConfig => {
2318
let mediatorConfigFile;
2419

2520
try {
26-
logger.info(`Loading mediator config from: ${mediatorConfigFilePath}`);
21+
logger.debug(`Loading mediator config from: ${mediatorConfigFilePath}`);
2722
mediatorConfigFile = require(mediatorConfigFilePath);
2823
} catch (error) {
2924
logger.error(`Failed to parse JSON: ${error}`);
@@ -43,12 +38,12 @@ const resolveOpenhimConfig = (urn: string): RequestOptions => {
4338
};
4439
};
4540

46-
export const setupMediator = () => {
41+
export const setupMediator = async () => {
4742
try {
4843
const mediatorConfig = resolveMediatorConfig();
4944
const openhimConfig = resolveOpenhimConfig(mediatorConfig.urn);
5045

51-
registerMediator(openhimConfig, mediatorConfig, (error: Error) => {
46+
await registerMediator(openhimConfig, mediatorConfig, (error: Error) => {
5247
if (error) {
5348
logger.error(`Failed to register mediator: ${JSON.stringify(error)}`);
5449
throw error;
@@ -87,8 +82,8 @@ export const setupMediator = () => {
8782
};
8883

8984
//TODO: Add Typing and error handling.
90-
async function getMediatorConfig() {
91-
logger.info('Fetching mediator config from OpenHIM');
85+
async function getMediatorConfig(): Promise<OpenHimAPIMediator | null> {
86+
logger.debug('Fetching mediator config from OpenHIM');
9287
const mediatorConfig = resolveMediatorConfig();
9388
const openhimConfig = resolveOpenhimConfig(mediatorConfig.urn);
9489

@@ -108,17 +103,65 @@ async function getMediatorConfig() {
108103
return request.data;
109104
} catch (e) {
110105
const error = e as AxiosError;
111-
logger.error(`Failed to fetch mediator config: ${JSON.stringify(error)}`);
112-
error.status === 404 && logger.warn('Mediator config not found in OpenHIM, ');
106+
107+
switch (error.status) {
108+
case 401:
109+
logger.error(`Failed to authenticate with OpenHIM, check your credentials`);
110+
break;
111+
case 404:
112+
logger.debug(
113+
'Mediator config not found in OpenHIM, This is expected on initial setup'
114+
);
115+
break;
116+
default:
117+
logger.error(`Failed to fetch mediator config: ${JSON.stringify(error)}`);
118+
break;
119+
}
113120
return null;
114121
}
115122
}
116123

117-
export async function getRegisterBuckets(): Promise<Bucket[]> {
124+
async function putMediatorConfig(mediatorUrn: string, mediatorConfig: MinioBucketsRegistry[]) {
125+
const openhimConfig = resolveOpenhimConfig(mediatorUrn);
126+
const { apiURL, username, password, trustSelfSigned } = openhimConfig;
127+
await axios.put(
128+
`${apiURL}/mediators/urn:mediator:climate-mediator/config`,
129+
{
130+
minio_buckets_registry: mediatorConfig,
131+
},
132+
{
133+
auth: { username, password },
134+
httpsAgent: new https.Agent({
135+
rejectUnauthorized: !trustSelfSigned,
136+
}),
137+
}
138+
);
139+
140+
try {
141+
logger.info('Successfully updated mediator config in OpenHIM');
142+
} catch (error) {
143+
const axiosError = error as AxiosError;
144+
switch (axiosError.status) {
145+
case 401:
146+
logger.error(`Failed to authenticate with OpenHIM, check your credentials`);
147+
break;
148+
default:
149+
logger.error(
150+
`Failed to update mediator config in OpenHIM: ${JSON.stringify(axiosError)}`
151+
);
152+
break;
153+
}
154+
}
155+
}
156+
157+
export async function getRegisteredBuckets(): Promise<Bucket[]> {
118158
if (runningMode !== 'testing') {
119159
logger.info('Fetching registered buckets from OpenHIM');
120160
const mediatorConfig = await getMediatorConfig();
121161

162+
if (!mediatorConfig) {
163+
return [];
164+
}
122165
//TODO: Handle errors, and undefined response.
123166
const buckets = mediatorConfig.config?.minio_buckets_registry as Bucket[];
124167
if (!buckets) {
@@ -133,13 +176,49 @@ export async function getRegisterBuckets(): Promise<Bucket[]> {
133176
}
134177

135178
export async function registerBucket(bucket: string, region?: string) {
136-
if (runningMode !== 'testing') {
179+
// If we are in testing mode, we don't need to have the registered buckets persisted
180+
if (runningMode === 'testing') {
181+
logger.debug('Running in testing mode, skipping bucket registration');
137182
return true;
138183
}
184+
185+
//get the mediator config from OpenHIM
139186
const mediatorConfig = await getMediatorConfig();
140-
const existingBuckets = mediatorConfig.config?.minio_buckets_registry;
141187

142-
if (!existingBuckets) {
143-
return [];
188+
//TODO: Change this to a debug log
189+
logger.debug(`Mediator config: ${JSON.stringify(mediatorConfig)}`);
190+
191+
//if the mediator config is not found, log the issue and return false
192+
if (mediatorConfig === null) {
193+
logger.error('Mediator config not found in OpenHIM, unable to register bucket');
194+
return false;
144195
}
196+
197+
const newBucket = {
198+
bucket,
199+
region: region || '',
200+
};
201+
202+
//get the existing buckets from the mediator config
203+
const existingConfig = mediatorConfig.config;
204+
205+
if (existingConfig === undefined) {
206+
logger.info('Mediator config does not have a config section, creating new config');
207+
mediatorConfig['config'] = {
208+
minio_buckets_registry: [newBucket],
209+
};
210+
} else {
211+
const existingBucket = existingConfig.minio_buckets_registry.find(
212+
(bucket) => bucket.bucket === newBucket.bucket
213+
);
214+
if (existingBucket) {
215+
logger.debug(`Bucket ${bucket} already exists in the config`);
216+
return false;
217+
}
218+
logger.info(`Adding bucket ${bucket} to OpenHIM config`);
219+
existingConfig.minio_buckets_registry.push(newBucket);
220+
await putMediatorConfig(mediatorConfig.urn, existingConfig.minio_buckets_registry);
221+
}
222+
223+
return true;
145224
}

src/routes/index.ts

+20-6
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ import { getCsvHeaders } from '../utils/file-validators';
55
import logger from '../logger';
66
import fs from 'fs/promises';
77
import path from 'path';
8-
import e from 'express';
9-
import { uploadToMinio } from '../utils/minioClient';
8+
import {
9+
BucketDoesNotExistError,
10+
ensureBucketExists,
11+
uploadToMinio,
12+
} from '../utils/minioClient';
13+
import { registerBucket } from '../openhim/openhim';
1014

1115
// Constants
1216
const VALID_MIME_TYPES = ['text/csv', 'application/json'] as const;
@@ -77,7 +81,6 @@ const handleCsvFile = async (
7781
fileUrl,
7882
file.originalname,
7983
bucket,
80-
region,
8184
file.mimetype
8285
);
8386
await fs.unlink(fileUrl);
@@ -104,6 +107,7 @@ routes.post('/upload', upload.single('file'), async (req, res) => {
104107
const file = req.file;
105108
const bucket = req.query.bucket as string;
106109
const region = req.query.region as string;
110+
const createBucketIfNotExists = req.query.createBucketIfNotExists === 'true';
107111

108112
if (!file) {
109113
logger.error('No file uploaded');
@@ -115,21 +119,31 @@ routes.post('/upload', upload.single('file'), async (req, res) => {
115119
return res.status(400).json(createErrorResponse('BUCKET_MISSING', 'No bucket provided'));
116120
}
117121

122+
await ensureBucketExists(bucket, region, createBucketIfNotExists);
123+
118124
const response =
119125
file.mimetype === 'text/csv'
120126
? await handleCsvFile(file, bucket, region)
121127
: handleJsonFile(file);
122128

129+
createBucketIfNotExists && (await registerBucket(bucket, region));
130+
123131
const statusCode = response.status === 'success' ? 201 : 400;
124132
return res.status(statusCode).json(response);
125-
} catch (error) {
126-
logger.error('Error processing upload:', error);
133+
} catch (e) {
134+
logger.error('Error processing upload:', e);
135+
136+
if (e instanceof BucketDoesNotExistError) {
137+
const error = e as BucketDoesNotExistError;
138+
return res.status(400).json(createErrorResponse('BUCKET_DOES_NOT_EXIST', error.message));
139+
}
140+
127141
return res
128142
.status(500)
129143
.json(
130144
createErrorResponse(
131145
'INTERNAL_SERVER_ERROR',
132-
error instanceof Error ? error.message : 'Unknown error'
146+
e instanceof Error ? e.message : 'Unknown error'
133147
)
134148
);
135149
}

src/types/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './openhim-api/mediator';

src/types/openhim-api/mediator.ts

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
export interface Mediator {
2+
_id: string;
3+
urn: string;
4+
version: string;
5+
name: string;
6+
description: string;
7+
endpoints: Endpoint[];
8+
defaultChannelConfig: DefaultChannelConfig[];
9+
configDefs: ConfigDef[];
10+
__v: number;
11+
_lastHeartbeat: Date;
12+
_uptime: number;
13+
_configModifiedTS: Date;
14+
config?: Config;
15+
}
16+
17+
interface Config {
18+
minio_buckets_registry: MinioBucketsRegistry[];
19+
}
20+
21+
export interface MinioBucketsRegistry {
22+
bucket: string;
23+
region?: string;
24+
}
25+
26+
interface ConfigDef {
27+
param: string;
28+
displayName: string;
29+
description: string;
30+
type: string;
31+
values: any[];
32+
template: Template[];
33+
array: boolean;
34+
_id: string;
35+
}
36+
37+
interface Template {
38+
param: string;
39+
displayName: string;
40+
type: string;
41+
optional?: boolean;
42+
}
43+
44+
interface DefaultChannelConfig {
45+
name: string;
46+
urlPattern: string;
47+
isAsynchronousProcess: boolean;
48+
methods: string[];
49+
type: string;
50+
allow: string[];
51+
whitelist: any[];
52+
authType: string;
53+
routes: Endpoint[];
54+
matchContentTypes: any[];
55+
properties: any[];
56+
txViewAcl: any[];
57+
txViewFullAcl: any[];
58+
txRerunAcl: any[];
59+
status: string;
60+
rewriteUrls: boolean;
61+
addAutoRewriteRules: boolean;
62+
autoRetryEnabled: boolean;
63+
autoRetryPeriodMinutes: number;
64+
_id: string;
65+
alerts: any[];
66+
rewriteUrlsConfig: any[];
67+
}
68+
69+
interface Endpoint {
70+
name: string;
71+
type: string;
72+
status: string;
73+
host: string;
74+
port: number;
75+
primary: boolean;
76+
forwardAuthHeader: boolean;
77+
_id: string;
78+
path?: string;
79+
}

0 commit comments

Comments
 (0)