Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding initial fix for bug issue #1967 #2267

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/blob/generated/ExpressMiddlewareFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ export default class ExpressMiddlewareFactory extends MiddlewareFactory {
handlers,
this.logger
);
return (req: Request, res: Response, next: NextFunction) => {
return (req: Request, res: Response, next: NextFunction) => {
const request = new ExpressRequestAdapter(req);
const response = new ExpressResponseAdapter(res);
let newContext = new Context(res.locals, this.contextPath, request, response);
handlerMiddlewareFactory.createHandlerMiddleware()(
new Context(res.locals, this.contextPath, request, response),
newContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason change this?

next
);
res.on("close", () => handlers.blobHandler.cleanUpBlob(newContext));
};
}

Expand Down
1 change: 1 addition & 0 deletions src/blob/generated/handlers/IBlobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Context from "../Context";

export default interface IBlobHandler {
download(options: Models.BlobDownloadOptionalParams, context: Context): Promise<Models.BlobDownloadResponse>;
cleanUpBlob(context: Context): Promise<void>;
getProperties(options: Models.BlobGetPropertiesOptionalParams, context: Context): Promise<Models.BlobGetPropertiesResponse>;
delete(options: Models.BlobDeleteMethodOptionalParams, context: Context): Promise<Models.BlobDeleteResponse>;
undelete(options: Models.BlobUndeleteOptionalParams, context: Context): Promise<Models.BlobUndeleteResponse>;
Expand Down
9 changes: 9 additions & 0 deletions src/blob/handlers/BlobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
}
}

/**
* Clean up file handles for blob to prevent leak.
*
* @memberof BlobHandler
*/
public async cleanUpBlob(context: Context): Promise<void> {
this.extentStore.cleanStreams(context.contextId);
}

/**
* Get blob properties.
*
Expand Down
33 changes: 33 additions & 0 deletions src/common/persistence/FSExtentStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
fdatasync,
mkdir,
open,
ReadStream,
stat,
unlink
} from "fs";
Expand Down Expand Up @@ -304,6 +305,8 @@ export default class FSExtentStore implements IExtentStore {
return this.appendQueue.operate(op, contextId);
}

private streams: Map<string, ReadStream[]> = new Map<string, ReadStream[]>();

/**
* Read data from persistency layer according to the given IExtentChunk.
*
Expand Down Expand Up @@ -353,12 +356,42 @@ export default class FSExtentStore implements IExtentStore {
contextId
);
});

if (contextId != null) {
let existingStreams = this.streams.get(contextId);
if (existingStreams == null) {
let newStreamsArray: ReadStream[] = [stream];
this.streams.set(contextId, newStreamsArray);
}
else {
existingStreams.push(stream);
this.streams.set(contextId, existingStreams);
}
}

resolve(stream);
});

return this.readQueue.operate(op, contextId);
}

public async cleanStreams(contextId?: string): Promise<void> {
this.logger.verbose(
"FSExtentStore:cleanStreams() Response object closed unexpectedly, cleaning up after streams",
contextId
);

if (contextId != null) {
let streamsToCleanup = this.streams.get(contextId);

if (streamsToCleanup != null) {
for (const stream of streamsToCleanup) {
stream.destroy();
}
}
}
}

/**
* Merge several extent chunks to a ReadableStream according to the offset and count.
*
Expand Down
5 changes: 5 additions & 0 deletions src/common/persistence/IExtentStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ export default interface IExtentStore extends IDataStore, ICleaner {
contextId?: string
): Promise<NodeJS.ReadableStream>;

/**
* Clean up file handles from an extent when a response object is closed
*/
cleanStreams(contextId?: string): Promise<void>;

/**
* Merge several extent chunks to a ReadableStream according to the offset and count.
*
Expand Down