Skip to content

Commit

Permalink
Merge pull request #23 from pagopa/add_support_for_native_event_hub_lib
Browse files Browse the repository at this point in the history
[#IOPLT-363] Add Support for native Event Hub Library
  • Loading branch information
AleDore authored Feb 21, 2024
2 parents e6f8109 + b58e2ab commit 11135c2
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
KafkaConfig,
} from "kafkajs";
import {
createEventHubService,
createKafkaService,
createPlainEventHubService,
} from "../../../../src/queue/eventhub/service";
import {
Expand Down Expand Up @@ -141,7 +141,7 @@ describe("EventHubService", () => {
it("Sending event to EventHub with error", async () => {
const message = getRandomKeyValueObject();
const result = await pipe(
createEventHubService("fake-connection"),
createKafkaService("fake-connection"),
TE.fromEither,
TE.chain((producer) => producer.produce([message])),
)();
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
},
"dependencies": {
"@azure/cosmos": "^4.0.0",
"@pagopa/fp-ts-kafkajs": "^0.3.0",
"@azure/event-hubs": "^5.11.3",
"@pagopa/fp-ts-kafkajs": "^1.0.1",
"@pagopa/winston-ts": "^2.2.0",
"@pagopa/ts-commons": "^12.4.1",
"@pagopa/data-indexer-commons": "^0.0.6",
Expand Down
21 changes: 13 additions & 8 deletions src/capturer/mongo/mongo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as E from "fp-ts/Either";
import * as O from "fp-ts/Option";
import * as B from "fp-ts/boolean";
import { pipe } from "fp-ts/lib/function";
import {
Binary,
Expand Down Expand Up @@ -46,15 +45,21 @@ export const closeStream = <T = Document>(
opts: IOpts,
): E.Either<Error, void> =>
pipe(
opts?.timeout !== null,
B.fold(
() => E.right(void 0),
() =>
E.tryCatch(
() => setTimeout(() => stream.close(), opts?.timeout),
() => new Error(`Impossible to close the stream`),
opts?.timeout,
O.fromNullable,
O.chain((timeout) =>
pipe(
stream,
O.fromNullable,
O.map((nonNullableStream) =>
E.tryCatch(
() => setTimeout(() => nonNullableStream.close(), timeout),
() => new Error(`Impossible to close the stream`),
),
),
),
),
O.getOrElse(() => E.right(void 0)),
);

export const watchMongoCollection = <T = Document>(
Expand Down
77 changes: 77 additions & 0 deletions src/queue/__tests__/factory.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { pipe } from "fp-ts/lib/function";
import * as E from "fp-ts/lib/Either";
import * as S from "../eventhub/service";
import { QueueType, getInternalQueueService } from "../factory";

const spiedCreateKafkaService = jest.spyOn(S, "createKafkaService");
const spiedCreateNativeEventHubService = jest.spyOn(
S,
"createNativeEventHubService",
);

const aConnectionString =
"Endpoint=sb://foo.windows.net/;SharedAccessKeyName=foo;SharedAccessKey=SharedAccessKey=;EntityPath=foo";
const failTest = (msg: string) => {
throw Error(msg);
};
describe("getInternalQueueService", () => {
afterEach(() => {
jest.clearAllMocks();
})
it("should return Error if factory cannot get QueueService", () => {
spiedCreateNativeEventHubService.mockImplementationOnce(() =>
E.left(Error("Cannot reach EventHub")),
);
pipe(
getInternalQueueService(aConnectionString),
E.mapLeft((e) => {
expect(e).toBeDefined();
expect(e.message).toEqual("Cannot reach EventHub");
}),
E.map(() =>
failTest(
"Cannot instantiate queue service with wrong connection string",
),
),
);
});

it("should create native EventHub QueueService if no QueueType is provided", () => {
spiedCreateNativeEventHubService.mockImplementationOnce(() =>
E.right({} as any),
);
pipe(
getInternalQueueService(aConnectionString),
E.mapLeft(() => failTest(
"Should not fail",
)),
E.map(service =>{
expect(service).toBeDefined();
expect(spiedCreateNativeEventHubService).toHaveBeenCalled();
expect(spiedCreateNativeEventHubService).toHaveBeenCalledWith(aConnectionString);
}
),
);
});
it("should create Kafka QueueService if QueueType is defined as Kafka", () => {
spiedCreateNativeEventHubService.mockImplementationOnce(() =>
E.right({} as any),
);
spiedCreateKafkaService.mockImplementationOnce(() =>
E.right({} as any),
);
pipe(
getInternalQueueService(aConnectionString, QueueType.Kafka),
E.mapLeft(() => failTest(
"Should not fail",
)),
E.map(service =>{
expect(service).toBeDefined();
expect(spiedCreateNativeEventHubService).not.toHaveBeenCalled();
expect(spiedCreateKafkaService).toHaveBeenCalled();
expect(spiedCreateKafkaService).toHaveBeenCalledWith(aConnectionString);
}
),
);
});
});
10 changes: 5 additions & 5 deletions src/queue/eventhub/__test__/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { KafkaProducerCompact } from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaProducerCompact";
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";
import { createEventHubService } from "../service";
import { createKafkaService } from "../service";
import * as EventHubUtils from "../utils";
import { IMessageType } from "./utils.test";

Expand All @@ -13,14 +13,14 @@ const connectionString = "your_connection_string";
const mockError = new Error("Failed to get event hub producer");
const mockProducer = {} as KafkaProducerCompact<IMessageType>;

describe("EventHubService", () => {
it("should create EventHubService", async () => {
describe("KafkaService", () => {
it("should create KafkaService", async () => {
getEventHubProducerSpy.mockImplementationOnce(() => E.right(mockProducer));
sendMessageEventHubSpy.mockImplementationOnce(
(_) => () => TE.right(void 0),
);

const result = createEventHubService(connectionString);
const result = createKafkaService(connectionString);

expect(getEventHubProducerSpy).toHaveBeenCalledWith(connectionString);
expect(result).toEqual(
Expand All @@ -31,7 +31,7 @@ describe("EventHubService", () => {
it("should return an error when getEventHubProducer fails", async () => {
getEventHubProducerSpy.mockImplementationOnce(() => E.left(mockError));

const result = createEventHubService(connectionString);
const result = createKafkaService(connectionString);

expect(getEventHubProducerSpy).toHaveBeenCalledWith(connectionString);
expect(result).toEqual(E.left(mockError));
Expand Down
21 changes: 14 additions & 7 deletions src/queue/eventhub/service.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
import { KafkaProducerCompact } from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaProducerCompact";
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";

import { pipe } from "fp-ts/lib/function";
import { IQueueService } from "../factory";
import {
fromSasPlain,
getEventHubProducer,
getNativeEventHubProducer,
sendMessageEventHub,
sendMessageNativeEventHub,
} from "./utils";

export type QueueProducer<T> = KafkaProducerCompact<T>;
export interface IQueueService {
readonly produce: <T>(
messages: ReadonlyArray<T>,
) => TE.TaskEither<Error, void>;
}

export const createEventHubService = (
export const createNativeEventHubService = (
connectionString: string,
): E.Either<Error, IQueueService> =>
pipe(
getNativeEventHubProducer(connectionString),
E.map((producer) => ({
produce: sendMessageNativeEventHub(producer),
})),
);

export const createKafkaService = (
connectionString: string,
): E.Either<Error, IQueueService> =>
pipe(
Expand Down
19 changes: 19 additions & 0 deletions src/queue/eventhub/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";
import { constVoid, pipe } from "fp-ts/lib/function";
import { EventHubProducerClient } from "@azure/event-hubs";

export const getEventHubProducer = <T>(
connectionString: string,
Expand Down Expand Up @@ -65,3 +66,21 @@ export const sendMessageEventHub =
),
),
);

export const getNativeEventHubProducer = (
connectionString: string,
): E.Either<Error, EventHubProducerClient> =>
pipe(
AzureEventhubSasFromString.decode(connectionString),
E.map(() => new EventHubProducerClient(connectionString)),
E.mapLeft(() => new Error(`Error during decoding Event Hub SAS`)),
);

export const sendMessageNativeEventHub =
<T>(messagingClient: EventHubProducerClient) =>
(messages: ReadonlyArray<T>): TE.TaskEither<Error, void> =>
pipe(
messages.map((msg) => ({ body: msg })),
(msgEventData) =>
TE.tryCatch(() => messagingClient.sendBatch(msgEventData), E.toError),
);
45 changes: 45 additions & 0 deletions src/queue/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as E from "fp-ts/lib/Either";
import * as O from "fp-ts/lib/Option";
import { pipe } from "fp-ts/lib/function";
import {
createKafkaService,
createNativeEventHubService,
} from "./eventhub/service";

export interface IQueueService {
readonly produce: <T>(
messages: ReadonlyArray<T>,
) => TE.TaskEither<Error, void>;
}
export enum QueueType {
EventHub,
Kafka,
}

export const notSupportedError = "Queue type still not supported";

export const createInternalQueueService = (
type: QueueType,
connectionString: string,
): E.Either<Error, IQueueService> => {
switch (type) {
case QueueType.Kafka:
return createKafkaService(connectionString);
case QueueType.EventHub:
return createNativeEventHubService(connectionString);
default:
E.left(new Error(notSupportedError));
}
};

export const getInternalQueueService = (
connectionString: string,
queueType?: QueueType,
): E.Either<Error, IQueueService> =>
pipe(
queueType,
O.fromNullable,
O.map((type) => createInternalQueueService(type, connectionString)),
O.getOrElse(() => createNativeEventHubService(connectionString)),
);
Loading

0 comments on commit 11135c2

Please sign in to comment.