Skip to content

Commit

Permalink
[#IOPLT-385] Add support for Managed Identity on Event Hub client con…
Browse files Browse the repository at this point in the history
…nection
  • Loading branch information
AleDore committed Mar 19, 2024
1 parent cc52cf6 commit dada0f2
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 54 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
]
},
"dependencies": {
"@azure/identity": "^4.0.1",
"@azure/cosmos": "^4.0.0",
"@azure/event-hubs": "^5.11.3",
"@pagopa/fp-ts-kafkajs": "^1.0.1",
Expand Down
70 changes: 39 additions & 31 deletions src/queue/__tests__/factory.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { pipe } from "fp-ts/lib/function";
import * as E from "fp-ts/lib/Either";
import * as S from "../eventhub/service";
import { QueueType, getInternalQueueService } from "../factory";
import { QueueType, createInternalQueueService } from "../factory";

const spiedCreateKafkaService = jest.spyOn(S, "createKafkaService");
const spiedCreateNativeEventHubService = jest.spyOn(
Expand All @@ -14,16 +14,20 @@ const aConnectionString =
const failTest = (msg: string) => {
throw Error(msg);
};
describe("getInternalQueueService", () => {
afterEach(() => {
describe("createInternalQueueService", () => {
afterEach(() => {
jest.clearAllMocks();
})
});
it("should return Error if factory cannot get QueueService", () => {
spiedCreateNativeEventHubService.mockImplementationOnce(() =>
E.left(Error("Cannot reach EventHub")),
);
pipe(
getInternalQueueService(aConnectionString),
createInternalQueueService({
queueType: QueueType.EventHub,
useManagedIdentity: false,
connectionString: aConnectionString,
}),
E.mapLeft((e) => {
expect(e).toBeDefined();
expect(e.message).toEqual("Cannot reach EventHub");
Expand All @@ -35,43 +39,47 @@ afterEach(() => {
),
);
});

it("should create native EventHub QueueService if no QueueType is provided", () => {
it("should create native EventHub QueueService with connection string", () => {
spiedCreateNativeEventHubService.mockImplementationOnce(() =>
E.right({} as any),
);
pipe(
getInternalQueueService(aConnectionString),
E.mapLeft(() => failTest(
"Should not fail",
)),
E.map(service =>{
expect(service).toBeDefined();
expect(spiedCreateNativeEventHubService).toHaveBeenCalled();
expect(spiedCreateNativeEventHubService).toHaveBeenCalledWith(aConnectionString);
}
),
createInternalQueueService({
queueType: QueueType.EventHub,
useManagedIdentity: false,
connectionString: aConnectionString,
}),
E.mapLeft(() => failTest("Should not fail")),
E.map((service) => {
expect(service).toBeDefined();
expect(spiedCreateNativeEventHubService).toHaveBeenCalled();
expect(spiedCreateNativeEventHubService).toHaveBeenCalledWith({
queueType: QueueType.EventHub,
useManagedIdentity: false,
connectionString: aConnectionString,
});
}),
);
});
it("should create Kafka QueueService if QueueType is defined as Kafka", () => {
spiedCreateNativeEventHubService.mockImplementationOnce(() =>
E.right({} as any),
);
spiedCreateKafkaService.mockImplementationOnce(() =>
E.right({} as any),
);
spiedCreateKafkaService.mockImplementationOnce(() => E.right({} as any));
pipe(
getInternalQueueService(aConnectionString, QueueType.Kafka),
E.mapLeft(() => failTest(
"Should not fail",
)),
E.map(service =>{
expect(service).toBeDefined();
expect(spiedCreateNativeEventHubService).not.toHaveBeenCalled();
expect(spiedCreateKafkaService).toHaveBeenCalled();
expect(spiedCreateKafkaService).toHaveBeenCalledWith(aConnectionString);
}
),
createInternalQueueService({
queueType: QueueType.Kafka,
connectionString: aConnectionString,
}),
E.mapLeft(() => failTest("Should not fail")),
E.map((service) => {
expect(service).toBeDefined();
expect(spiedCreateNativeEventHubService).not.toHaveBeenCalled();
expect(spiedCreateKafkaService).toHaveBeenCalled();
expect(spiedCreateKafkaService).toHaveBeenCalledWith(
aConnectionString
);
}),
);
});
});
43 changes: 40 additions & 3 deletions src/queue/eventhub/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,59 @@ import { KafkaProducerCompact } from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaProduc
import * as E from "fp-ts/Either";

import { pipe } from "fp-ts/lib/function";
import { IQueueService } from "../factory";
import { errorsToReadableMessages } from "@pagopa/ts-commons/lib/reporters";
import {
EvhAuthQueueParams,
EvhPasswordLessQueueParams,
EvhQueueParams,
IQueueService,
} from "../factory";
import {
fromSasPlain,
getEventHubProducer,
getNativeEventHubProducer,
getPasswordLessNativeEventHubProducer,
sendMessageEventHub,
sendMessageNativeEventHub,
} from "./utils";

export type QueueProducer<T> = KafkaProducerCompact<T>;

export const createNativeEventHubService = (
connectionString: string,
params: EvhQueueParams,
): E.Either<Error, IQueueService> =>
pipe(
getNativeEventHubProducer(connectionString),
params,
EvhPasswordLessQueueParams.decode,
E.mapLeft((errs) =>
Error(
`Cannot decode Event Hub passwordless params|ERROR=${errorsToReadableMessages(
errs,
)}`,
),
),
E.chain((passwordLessParams) =>
getPasswordLessNativeEventHubProducer(
passwordLessParams.hostName,
passwordLessParams.topicName,
),
),
E.orElse(() =>
pipe(
params,
EvhAuthQueueParams.decode,
E.mapLeft((errs) =>
Error(
`Cannot decode Event Hub plain connection params|ERROR=${errorsToReadableMessages(
errs,
)}`,
),
),
E.chain((connectionParams) =>
getNativeEventHubProducer(connectionParams.connectionString),
),
),
),
E.map((producer) => ({
produce: sendMessageNativeEventHub(producer),
})),
Expand Down
12 changes: 12 additions & 0 deletions src/queue/eventhub/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";
import { constVoid, pipe } from "fp-ts/lib/function";
import { EventHubProducerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";

export const getEventHubProducer = <T>(
connectionString: string,
Expand Down Expand Up @@ -76,6 +77,17 @@ export const getNativeEventHubProducer = (
E.mapLeft(() => new Error(`Error during decoding Event Hub SAS`)),
);

export const getPasswordLessNativeEventHubProducer = (
hostName: string,
topicName: string,
): E.Either<Error, EventHubProducerClient> =>
pipe(
new DefaultAzureCredential(),
(credentials) =>
E.right(new EventHubProducerClient(hostName, topicName, credentials)),
E.mapLeft(() => new Error(`Error during decoding Event Hub SAS`)),
);

export const sendMessageNativeEventHub =
<T>(messagingClient: EventHubProducerClient) =>
(messages: ReadonlyArray<T>): TE.TaskEither<Error, void> =>
Expand Down
59 changes: 41 additions & 18 deletions src/queue/factory.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as E from "fp-ts/lib/Either";
import * as O from "fp-ts/lib/Option";
import { pipe } from "fp-ts/lib/function";
import * as t from "io-ts";
import {
createKafkaService,
createNativeEventHubService,
Expand All @@ -17,29 +16,53 @@ export enum QueueType {
Kafka,
}

export const BaseQueueParams = t.type({
connectionString: t.string,
});
export type BaseQueueParams = t.TypeOf<typeof BaseQueueParams>;

export const EvhPasswordLessQueueParams = t.type({
hostName: t.string,
queueType: t.literal(QueueType.EventHub),
topicName: t.string,
useManagedIdentity: t.literal(true),
});
export type EvhPasswordLessQueueParams = t.TypeOf<
typeof EvhPasswordLessQueueParams
>;
export const EvhAuthQueueParams = t.intersection([
BaseQueueParams,
t.type({
queueType: t.literal(QueueType.EventHub),
useManagedIdentity: t.literal(false),
}),
]);
export type EvhAuthQueueParams = t.TypeOf<typeof EvhAuthQueueParams>;

export const KafkaQueueParams = t.intersection([
BaseQueueParams,
t.type({
queueType: t.literal(QueueType.Kafka),
}),
]);

export type KafkaQueueParams = t.TypeOf<typeof KafkaQueueParams>;

export type EvhQueueParams = EvhAuthQueueParams | EvhPasswordLessQueueParams;

export type QueueParams = EvhQueueParams | KafkaQueueParams;

export const notSupportedError = "Queue type still not supported";

export const createInternalQueueService = (
type: QueueType,
connectionString: string,
queueParams: QueueParams,
): E.Either<Error, IQueueService> => {
switch (type) {
switch (queueParams.queueType) {
case QueueType.Kafka:
return createKafkaService(connectionString);
return createKafkaService(queueParams.connectionString);
case QueueType.EventHub:
return createNativeEventHubService(connectionString);
return createNativeEventHubService(queueParams);
default:
E.left(new Error(notSupportedError));
}
};

export const getInternalQueueService = (
connectionString: string,
queueType?: QueueType,
): E.Either<Error, IQueueService> =>
pipe(
queueType,
O.fromNullable,
O.map((type) => createInternalQueueService(type, connectionString)),
O.getOrElse(() => createNativeEventHubService(connectionString)),
);
Loading

0 comments on commit dada0f2

Please sign in to comment.