Skip to content

Commit

Permalink
fix: improve fetcher usage (#209)
Browse files Browse the repository at this point in the history
# What ❔

1. Log timestamp in ISO8601 format.
2. Internal retries for `DataFetcherService`.

## Why ❔

1. To have timestamps properly processed so logs are in the correct
order.
2. Otherwise in case of an error the whole batch is retried which is
unnecessary.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [X] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [X] Tests for the changes have been added / updated.
  • Loading branch information
vasyl-ivanchuk authored Mar 25, 2024
1 parent ebfbf5a commit ae76723
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 20 deletions.
8 changes: 5 additions & 3 deletions packages/api/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { format, transports, Logform } from "winston";
export const getLogger = (environment: string, logLevel: string): LoggerService => {
let defaultLogLevel = "debug";
const loggerFormatters: Logform.Format[] = [
format.timestamp({
format: "DD/MM/YYYY HH:mm:ss.SSS",
}),
environment === "production"
? format.timestamp()
: format.timestamp({
format: "DD/MM/YYYY HH:mm:ss.SSS",
}),
format.ms(),
utilities.format.nestLike("API", {}),
];
Expand Down
8 changes: 5 additions & 3 deletions packages/data-fetcher/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ const { NODE_ENV, LOG_LEVEL } = process.env;

let defaultLogLevel = "debug";
const loggerFormatters: Logform.Format[] = [
format.timestamp({
format: "DD/MM/YYYY HH:mm:ss.SSS",
}),
NODE_ENV === "production"
? format.timestamp()
: format.timestamp({
format: "DD/MM/YYYY HH:mm:ss.SSS",
}),
format.ms(),
utilities.format.nestLike("DataFetcher", {}),
];
Expand Down
2 changes: 1 addition & 1 deletion packages/worker/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe("config", () => {
},
dataFetcher: {
url: "http://localhost:3040",
requestTimeout: 120_000,
requestTimeout: 60_000,
},
blocks: {
waitForBlocksInterval: 1000,
Expand Down
2 changes: 1 addition & 1 deletion packages/worker/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export default () => {
},
dataFetcher: {
url: DATA_FETCHER_URL || "http://localhost:3040",
requestTimeout: parseInt(DATA_FETCHER_REQUEST_TIMEOUT, 10) || 120_000,
requestTimeout: parseInt(DATA_FETCHER_REQUEST_TIMEOUT, 10) || 60_000,
},
blocks: {
waitForBlocksInterval: parseInt(WAIT_FOR_BLOCKS_INTERVAL, 10) || 1000,
Expand Down
41 changes: 33 additions & 8 deletions packages/worker/src/dataFetcher/dataFetcher.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { HttpService } from "@nestjs/axios";
import { ConfigService } from "@nestjs/config";
import { AxiosResponse, AxiosError } from "axios";
import * as rxjs from "rxjs";
import { setTimeout } from "node:timers/promises";
import { DataFetcherService } from "./dataFetcher.service";

jest.mock("timers/promises", () => ({
jest.mock("node:timers/promises", () => ({
setTimeout: jest.fn().mockResolvedValue(null),
}));

Expand Down Expand Up @@ -48,6 +49,7 @@ describe("DataFetcherService", () => {

describe("getBlockData", () => {
let pipeMock = jest.fn();
const blockData = { block: { number: 1 } };

beforeEach(() => {
pipeMock = jest.fn();
Expand All @@ -58,7 +60,6 @@ describe("DataFetcherService", () => {
});

it("returns block data", async () => {
const blockData = { block: { number: 1 } };
pipeMock.mockReturnValueOnce(
new rxjs.Observable((subscriber) => {
subscriber.next({
Expand All @@ -75,25 +76,49 @@ describe("DataFetcherService", () => {
expect(returnedBlockData).toEqual(blockData);
});

it("throws an error if the request fails with populated response details", async () => {
it("retries the request if the request fails with populated response details", async () => {
const error = new AxiosError("server error", "500", null, null, {
status: 500,
data: "error data",
} as AxiosResponse<string, any>);
pipeMock.mockImplementation((callback) => {
pipeMock.mockImplementationOnce((callback) => {
callback(error);
});

await expect(dataFetcherService.getBlockData(1)).rejects.toThrowError(error);
pipeMock.mockReturnValueOnce(
new rxjs.Observable((subscriber) => {
subscriber.next({
data: [blockData],
});
})
);

const returnedBlockData = await dataFetcherService.getBlockData(1);
expect(httpServiceMock.get).toBeCalledTimes(2);
expect(setTimeout).toBeCalledTimes(1);
expect(setTimeout).toBeCalledWith(1000);
expect(returnedBlockData).toEqual(blockData);
});

it("throws an error if the request fails without populated response details", async () => {
it("retries the request if the request fails without populated response details", async () => {
const error = new AxiosError("server error", "500");
pipeMock.mockImplementation((callback) => {
pipeMock.mockImplementationOnce((callback) => {
callback(error);
});

await expect(dataFetcherService.getBlockData(1)).rejects.toThrowError(error);
pipeMock.mockReturnValueOnce(
new rxjs.Observable((subscriber) => {
subscriber.next({
data: [blockData],
});
})
);

const returnedBlockData = await dataFetcherService.getBlockData(1);
expect(httpServiceMock.get).toBeCalledTimes(2);
expect(setTimeout).toBeCalledTimes(1);
expect(setTimeout).toBeCalledWith(1000);
expect(returnedBlockData).toEqual(blockData);
});
});
});
17 changes: 16 additions & 1 deletion packages/worker/src/dataFetcher/dataFetcher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import { ConfigService } from "@nestjs/config";
import { catchError, firstValueFrom } from "rxjs";
import { AxiosError } from "axios";
import { BlockData } from "./types";
import { setTimeout } from "node:timers/promises";

const DATA_FETCHER_RETRY_TIMEOUT = 1000;

@Injectable()
export class DataFetcherService {
Expand All @@ -18,10 +21,22 @@ export class DataFetcherService {
}

public async getBlockData(blockNumber: number): Promise<BlockData> {
const blocksData = await this.getBlocksData(blockNumber, blockNumber);
const blocksData = await this.getBlocksDataRetryable(blockNumber, blockNumber);
return blocksData[0];
}

private async getBlocksDataRetryable(from: number, to: number): Promise<BlockData[]> {
try {
return await this.getBlocksData(from, to);
} catch {
this.logger.debug({
message: `Retrying to fetch data for blocks: [${from}, ${to}]`,
});
await setTimeout(DATA_FETCHER_RETRY_TIMEOUT);
return this.getBlocksDataRetryable(from, to);
}
}

private async getBlocksData(from: number, to: number): Promise<BlockData[]> {
const queryString = new URLSearchParams({
from: from.toString(),
Expand Down
8 changes: 5 additions & 3 deletions packages/worker/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ const { NODE_ENV, LOG_LEVEL } = process.env;

let defaultLogLevel = "debug";
const loggerFormatters: Logform.Format[] = [
format.timestamp({
format: "DD/MM/YYYY HH:mm:ss.SSS",
}),
NODE_ENV === "production"
? format.timestamp()
: format.timestamp({
format: "DD/MM/YYYY HH:mm:ss.SSS",
}),
format.ms(),
utilities.format.nestLike("Worker", {}),
];
Expand Down

0 comments on commit ae76723

Please sign in to comment.