Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling shutdown action again #72

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lib/kcl/kcl_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ KCLManager.VERSION2 = Symbol("version2");
* @param {object} kclManagerInput - Object containing the recordprocessor and the version of the record processor.
* @param {file} inputFile - A file to read action messages from.
* @param {file} outputFile - A file to write action messages to.
* @param {file} errorfile - A file to write error messages to.
* @param {file} errorFile - A file to write error messages to.
*/
function KCLManager(kclManagerInput, inputFile, outputFile, errorFile) {
this._version = kclManagerInput.version;
Expand All @@ -53,6 +53,7 @@ KCLManager.prototype._onAction = function(action) {
switch (actionType) {
case 'initialize':
case 'processRecords':
case 'shutdown':
Copy link

@micah-jaffe micah-jaffe Oct 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case should actually be shutdownRequested and ditto for below.

The most recent release of the KCL for NodeJS migrated to mirror the KCL 2.x for Java, in which the API that the user implements changes from KCL 1.x. One such change renamed shutdown to shutdownRequested to differentiate from other types of shutdown, such as leaseLost or shardEnded. See https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html.

It looks like that was missed here, but is included elsewhere in the release migration. See the new sample app for example: https://github.com/awslabs/amazon-kinesis-client-nodejs/blob/master/samples/basic_sample/consumer/sample_kcl_app.js#L68

Thanks for submitting this update.

case 'leaseLost':
case 'shardEnded':
this._onRecordProcessorAction(action);
Expand Down Expand Up @@ -98,6 +99,10 @@ KCLManager.prototype._onRecordProcessorAction = function(action) {
recordProcessorFuncInput.checkpointer = checkpointer;
recordProcessorFunc = recordProcessor.processRecords;
break;
case 'shutdown':
recordProcessorFuncInput.checkpointer = checkpointer;
recordProcessorFunc = recordProcessor.shutdown;
break;
case 'leaseLost':
if (this._version === KCLManager.VERSION1) {
recordProcessorFuncInput.reason = 'ZOMBIE';
Expand Down
4 changes: 2 additions & 2 deletions lib/kcl/kcl_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var KCLManager = require('./kcl_manager');
* @param {object} recordProcessor - A record processor to use for processing a shard.
* @param {file} inputFile - A file to read action messages from. Defaults to STDIN.
* @param {file} outputFile - A file to write action messages to. Defaults to STDOUT.
* @param {file} errorfile - A file to write error messages to. Defaults to STDERR.
* @param {file} errorFile - A file to write error messages to. Defaults to STDERR.
*/
function KCLProcess(recordProcessor, inputFile, outputFile, errorFile) {
var allMethodsPresent = typeof recordProcessor.initialize === 'function' &&
Expand All @@ -84,7 +84,7 @@ function KCLProcess(recordProcessor, inputFile, outputFile, errorFile) {
version: version
};

var kclManager = new KCLManager(kclManagerInput, inputFile, outputFile, errorFile, version);
var kclManager = new KCLManager(kclManagerInput, inputFile, outputFile, errorFile);

return {
// For testing only.
Expand Down