Skip to content

Commit

Permalink
Fix EMFILE too many open files, added maxConcurrency option (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
nodkz authored Nov 15, 2024
1 parent a7b6f2b commit 9986234
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## next

- Added `maxConcurrency` option to limit concurrent FS operations, preventing "too many open files" errors (#8)
- Fixed Node.js warnings such as "Warning: Closing file descriptor # on garbage collection", which is deprecated in Node.js 22 and will result in an error being thrown in the future

## 0.1.4 (2024-10-30)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ await repo.dispose();

- `gitdir`: string - path to the git repo
- `options` – optional settings:
- `maxConcurrency` – limit the number of file system operations (default: 50)
- `cruftPacks` – defines how [cruft packs](https://git-scm.com/docs/cruft-packs) are processed:
- `'include'` or `true` (default) - process all packs
- `'exclude'` or `false` - exclude cruft packs from processing
Expand Down
36 changes: 20 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import { createPackedObjectIndex } from './packed-object-index.js';
import { createFilesMethods } from './files-methods.js';
import { createCommitMethods } from './commits.js';
import { createStatMethod } from './stat.js';
import { promiseAllThreaded } from './utils/threads.js';
import { GitReaderOptions, NormalizedGitReaderOptions, CruftPackMode } from './types';

export * from './types.js';
export * from './parse-object.js';
export { isGitDir, resolveGitDir };

export async function createGitReader(gitdir: string, options?: GitReaderOptions) {
export async function createGitReader(gitdir: string, options?: Partial<GitReaderOptions>) {
const startInitTime = Date.now();
const normalizedOptions = normalizeOptions(options);
const resolvedGitDir = await resolveGitDir(gitdir);
const [refIndex, looseObjectIndex, packedObjectIndex] = await Promise.all([
createRefIndex(resolvedGitDir),
createLooseObjectIndex(resolvedGitDir),
createRefIndex(resolvedGitDir, normalizedOptions),
createLooseObjectIndex(resolvedGitDir, normalizedOptions),
createPackedObjectIndex(resolvedGitDir, normalizedOptions)
]);
const { readObjectHeaderByHash, readObjectByHash, readObjectHeaderByOid, readObjectByOid } =
Expand All @@ -38,27 +39,30 @@ export async function createGitReader(gitdir: string, options?: GitReaderOptions
async dispose() {
await Promise.all([looseObjectIndex.dispose(), packedObjectIndex.dispose()]);
},
stat: createStatMethod({
gitdir: resolvedGitDir,
refIndex,
looseObjectIndex,
packedObjectIndex
}),
stat: createStatMethod(
resolvedGitDir,
{ refIndex, looseObjectIndex, packedObjectIndex },
normalizedOptions
),

initTime: Date.now() - startInitTime
};
}

function normalizeOptions(options?: GitReaderOptions): NormalizedGitReaderOptions {
if (!options || options.cruftPacks === undefined) {
return { cruftPacks: 'include' };
}
function normalizeOptions(options?: Partial<GitReaderOptions>): NormalizedGitReaderOptions {
const { cruftPacks = true, maxConcurrency } = options || {};
const maxConcurrencyNormalized = Number.isFinite(maxConcurrency)
? (maxConcurrency as number)
: 50;

return {
maxConcurrency: maxConcurrencyNormalized,
performConcurrent: (queue, action) =>
promiseAllThreaded(maxConcurrencyNormalized, queue, action),
cruftPacks:
typeof options.cruftPacks === 'string'
? validateCruftPackMode(options.cruftPacks)
: options.cruftPacks // expands true/false aliases
typeof cruftPacks === 'string'
? validateCruftPackMode(cruftPacks)
: cruftPacks // expands true/false aliases
? 'include'
: 'exclude'
};
Expand Down
24 changes: 13 additions & 11 deletions src/loose-object-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
GitObject,
InternalGitObjectContent,
InternalGitObjectHeader,
NormalizedGitReaderOptions,
ObjectsTypeStat,
PackedObjectType
} from './types.js';
Expand All @@ -14,20 +15,21 @@ import { createObjectsTypeStat, objectsStatFromTypes } from './utils/stat.js';
type LooseObjectMap = Map<string, string>;
type LooseObjectMapEntry = [oid: string, relpath: string];

async function createLooseObjectMap(gitdir: string): Promise<LooseObjectMap> {
async function createLooseObjectMap(
gitdir: string,
{ performConcurrent }: NormalizedGitReaderOptions
): Promise<LooseObjectMap> {
const objectsPath = pathJoin(gitdir, 'objects');
const looseDirs = (await fsPromises.readdir(objectsPath)).filter((p) =>
/^[0-9a-f]{2}$/.test(p)
);

const objectDirs = await Promise.all(
looseDirs.map((dir) =>
fsPromises
.readdir(pathJoin(objectsPath, dir))
.then((files) =>
files.map((file): LooseObjectMapEntry => [dir + file, `objects/${dir}/${file}`])
)
)
const objectDirs = await performConcurrent(looseDirs, (dir) =>
fsPromises
.readdir(pathJoin(objectsPath, dir))
.then((files) =>
files.map((file): LooseObjectMapEntry => [dir + file, `objects/${dir}/${file}`])
)
);

return new Map(objectDirs.flat().sort(([a], [b]) => (a < b ? -1 : 1)));
Expand Down Expand Up @@ -77,8 +79,8 @@ function parseLooseObject(buffer: Buffer): InternalGitObjectContent {
};
}

export async function createLooseObjectIndex(gitdir: string) {
const looseObjectMap = await createLooseObjectMap(gitdir);
export async function createLooseObjectIndex(gitdir: string, options: NormalizedGitReaderOptions) {
const looseObjectMap = await createLooseObjectMap(gitdir, options);
const { fanoutTable, binaryNames, names } = indexObjectNames([...looseObjectMap.keys()]);

const getOidFromHash = (hash: Buffer) => {
Expand Down
8 changes: 3 additions & 5 deletions src/packed-object-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const PACKDIR = 'objects/pack';
*/
export async function createPackedObjectIndex(
gitdir: string,
{ cruftPacks }: NormalizedGitReaderOptions
{ cruftPacks, performConcurrent }: NormalizedGitReaderOptions
) {
function readObjectHeaderByHash(
hash: Buffer,
Expand Down Expand Up @@ -75,10 +75,8 @@ export async function createPackedObjectIndex(
: !cruftPackFilenames.includes(filename);
});

const packFiles = await Promise.all(
packFilenames.map((filename) =>
readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
)
const packFiles = await performConcurrent(packFilenames, async (filename) =>
readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
);

return {
Expand Down
14 changes: 9 additions & 5 deletions src/resolve-ref.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { promises as fsPromises, existsSync } from 'fs';
import { join as pathJoin, basename, sep as pathSep } from 'path';
import { scanFs } from '@discoveryjs/scan-fs';
import { NormalizedGitReaderOptions } from './types.js';

type Ref = {
name: string;
Expand Down Expand Up @@ -49,7 +50,10 @@ function isOid(value: unknown) {
return typeof value === 'string' && value.length === 40 && /^[0-9a-f]{40}$/.test(value);
}

export async function createRefIndex(gitdir: string) {
export async function createRefIndex(
gitdir: string,
{ performConcurrent }: NormalizedGitReaderOptions
) {
const refResolver = await createRefResolver(gitdir);

// expand a ref into a full form
Expand Down Expand Up @@ -136,8 +140,8 @@ export async function createRefIndex(gitdir: string) {
let cachedRefsWithOid = listRefsWithOidCache.get(prefix);

if (cachedRefsWithOid === undefined) {
const oids = await Promise.all(
cachedRefs.map((name) => refResolver.resolveOid(prefix + name))
const oids = await performConcurrent(cachedRefs, (name) =>
refResolver.resolveOid(prefix + name)
);

cachedRefsWithOid = cachedRefs.map((name, index) => ({
Expand Down Expand Up @@ -210,8 +214,8 @@ export async function createRefIndex(gitdir: string) {

async stat() {
const remotes = listRemotes();
const branchesByRemote = await Promise.all(
remotes.map((remote) => listRemoteBranches(remote))
const branchesByRemote = await performConcurrent(remotes, (remote) =>
listRemoteBranches(remote)
);

return {
Expand Down
21 changes: 11 additions & 10 deletions src/stat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import { sumObjectsStat } from './utils/stat.js';
import { createRefIndex } from './resolve-ref.js';
import { createLooseObjectIndex } from './loose-object-index.js';
import { createPackedObjectIndex } from './packed-object-index.js';
import { NormalizedGitReaderOptions } from './types.js';

export function createStatMethod({
gitdir,
refIndex,
looseObjectIndex,
packedObjectIndex
}: {
gitdir: string;
type CreateStatMethodInput = {
refIndex: Awaited<ReturnType<typeof createRefIndex>>;
looseObjectIndex: Awaited<ReturnType<typeof createLooseObjectIndex>>;
packedObjectIndex: Awaited<ReturnType<typeof createPackedObjectIndex>>;
}) {
};

export function createStatMethod(
gitdir: string,
{ refIndex, looseObjectIndex, packedObjectIndex }: CreateStatMethodInput,
{ performConcurrent }: NormalizedGitReaderOptions
) {
return async function () {
const [refs, looseObjects, packedObjects, { files }] = await Promise.all([
refIndex.stat(),
Expand All @@ -25,8 +26,8 @@ export function createStatMethod({
scanFs(gitdir)
]);

const fileStats = await Promise.all(
files.map((file) => fsPromises.stat(path.join(gitdir, file.path)))
const fileStats = await performConcurrent(files, (file) =>
fsPromises.stat(path.join(gitdir, file.path))
);

const objectsTypes = looseObjects.objects.types.map((entry) => ({ ...entry }));
Expand Down
13 changes: 12 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,20 @@ export interface GitReaderOptions {
*
* @default 'include'
*/
cruftPacks?: CruftPackMode | boolean;
cruftPacks: CruftPackMode | boolean;

/**
* Maximum number of concurrent file system operations.
* @default 50
*/
maxConcurrency: number;
}

export interface NormalizedGitReaderOptions {
cruftPacks: CruftPackMode;
maxConcurrency: number;
performConcurrent: <T, R>(
queue: T[],
action: (item: T, itemIdx: number) => Promise<R>
) => Promise<R[]>;
}
47 changes: 47 additions & 0 deletions src/utils/threads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Run async tasks in queue with a maximum number of threads.
* Works like Promise.all, but with a maximum number of threads.
* - The order of the results is guaranteed to be the same as the order of the input queue.
* - If any task fails, the whole queue is rejected.
* - If the queue is empty, the result is an empty array.
* - If the queue has only one task, the result is an array with one element.
*
* @example
* // Before
* const packFiles = await Promise.all(
* packFilenames.map((filename) =>
* readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
* )
* );
*
* // After
* const packFiles = await promiseAllThreaded(50, packFilenames, async (filename) =>
* readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
* );
*/
export async function promiseAllThreaded<T, R>(
maxThreadCount: number,
queue: T[],
asyncFn: (task: T, taskIdx: number) => Promise<R>
): Promise<R[]> {
const result = Array(queue.length);
let taskProcessed = 0;
let queueSnapshot = [...queue];
const thread = async () => {
while (taskProcessed < queueSnapshot.length) {
const taskIdx = taskProcessed++;
const task = queueSnapshot[taskIdx];
result[taskIdx] = await asyncFn(task, taskIdx);
}
};

await Promise.all(
Array.from({ length: Math.min(maxThreadCount, queueSnapshot.length) }, () => thread())
).catch((err) => {
// remove all pending tasks
queueSnapshot = [];
throw err;
});

return result;
}
29 changes: 29 additions & 0 deletions test/utils.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
import assert from 'assert';
import { readEncodedOffset, BufferCursor } from '../src/utils/buffer.js';
import { promiseAllThreaded } from '../src/utils/threads.js';

it('readEncodedOffset', () => {
const buffer = Buffer.from([142, 254, 254, 254, 254, 254, 254, 127]);
const cursor = new BufferCursor(buffer);

assert.strictEqual(readEncodedOffset(cursor), Number.MAX_SAFE_INTEGER);
});

it('promiseAllThreaded', async () => {
const maxThreadCount = 2;
const queue = [1, 2, 3, 4, 5];
const asyncFn = async (task: number) => task * 2;

const result = await promiseAllThreaded(maxThreadCount, queue, asyncFn);

assert.deepStrictEqual(result, [2, 4, 6, 8, 10]);
});

it('promiseAllThreaded with error', async () => {
const maxThreadCount = 2;
const queue = [1, 2, 3, 4, 5];
const asyncFn = async (task: number) => {
if (task === 3) {
throw new Error('Task failed');
}
return task * 2;
};

try {
await promiseAllThreaded(maxThreadCount, queue, asyncFn);
assert.fail('Expected an error');
} catch (err) {
assert.strictEqual(err.message, 'Task failed');
}
});

0 comments on commit 9986234

Please sign in to comment.