Skip to content

Commit

Permalink
Test case for message publishing retries
Browse files Browse the repository at this point in the history
Plus bug fix for found issue.

Also expose ability to adjust number of failed messages to keep around.
  • Loading branch information
jmattsson committed Jun 1, 2023
1 parent 82b5bac commit 59763be
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 17 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Past this the features are pick-and-choose.
- `--messages` - Instructs chariotd to watch for MQTT publish requests. The specifics are available in the [Message publishing](#message-publishing) section.
- `--default-message-concurrency` and `--message-concurrency` - Controls how many inflight messages should be allowed.
- `--default-message-retries` and `--message-retries` - Configures how many extra attempts will be made to publish a message.
- `--default-message-keep-failed` and `--message-keep-failed` - Adjusts the limit of how many failed messages are kept around.
- `--default-message-jam-timeout` and `--message-jam-timeout` - Sets the time before declaring a message queue jam.
- `--default-message-order` and `--message-order` - Sets the preferred upload order of messages.
- `--default-message-topic-prefix` and `--message-topic-prefix` - Modifies the topic in each message with the given prefix.
Expand Down Expand Up @@ -357,6 +358,8 @@ Each message publishing action directory has a setting for how many messages it
In the interest of reliability, messages may be attempted to be published more than once if at first the publishing is unsuccessful. This is controlled via the `--default-message-retries=N` or `--message-retries=/path/to/msgs:N` where `N` is the number of *retries* (i.e. one less than the total number of attempts). If no retries are desired, this may be set to zero. If a message fails after the retry limit has been reached, the message will be moved to the `failed` directory.
The number of messages kept around in the `failed` directory is limited, in order to avoid filling up the disk. By default only 100 failed messages are kept, but this can be adjusted by the `--default-message-keep-failed=N` and `--message-keep-failed=DIR:N` commandline options.
To prevent message publishing from stalling silently, each message publishing action directory has an associated "jam" detection time that can be configured. Whenever the publisher reaches the maximum concurrency this timer starts, and if none of the inflight messages get acknowledged before the timer expires, a queue jam is declared. This will trigger chariotd to exit in order to be relaunched with fresh state, in the hope that this will resolve the publishing blockage.
The queue jam timer is configured with `--default-message-jam-timeout=SEC` or `--message-jam-timeout=/path/to/msgs:SEC` where `SEC` is the number of seconds to allow before declaring a jam. The default is 300 (5 minutes), which should be an excessive but safe default.
Expand Down
4 changes: 4 additions & 0 deletions src/cmdline_opts.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const cmdline = Getopt.create([
'how many times to re-attempt to publish a message (default 2)' ],
[ '', 'message-retries=DIR:N+',
'how many times to re-attempt to publish a message (default 2)' ],
[ '', 'default-message-keep-failed=N',
'number of failed messages to keep in the failed/ directory (default 100)' ],
[ '', 'message-keep-failed=DIR:N+',
'number of failed messages to keep in the failed/ directory (default 100)' ],
[ '', 'default-message-order=ORDER',
'preferred upload order of messages, for un-prioritised messages; one of "lexical", "reverse-lexical", "newest-first", "oldest-first" (default "lexical")' ],
[ '', 'message-order=DIR:ORDER+',
Expand Down
43 changes: 28 additions & 15 deletions src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { SecureTunnel } = require('./secure_tunnel.js');
const services = require('./services.js');
const { options } = require('./cmdline_opts.js');

const DEFAULT_FAILED_MESSAGE_KEEP_COUNT = 100;

// --- Helper functions --------------------------------------------------

Expand Down Expand Up @@ -140,6 +141,24 @@ function bestEffortExecWrite(fname, data) {
}


function optionsForDirectory(dir, opt_keys) {
const opts = {};
for (const key of opt_keys) {
// Start with default, if set
const def = 'default-'+key;
if (options[def] != null)
opts[key] = options[def];
// Override with per-dir, if set
for (const arg of (options[key] || [])) {
const [ optdir, val ] = splitArg(arg);
if (optdir == dir)
opts[key] = val;
}
}
return opts;
}


// -- Setup phase --------------------------------------------------------

var comms = null;
Expand Down Expand Up @@ -414,25 +433,14 @@ function createMessagePublisher(comm, dir) {
);
});
};
// Work out configuration for this message publisher
const opts = {};

const opt_keys = [
'message-concurrency', 'message-retries', 'message-order',
'message-jam-timeout', 'message-topic-prefix', 'message-topic-suffix',
'letterhead-file', 'letterhead-generator'
];
for (const key of opt_keys) {
// Start with default, if set
const def = 'default-'+key;
if (options[def] != null)
opts[key] = options[def];
// Override with per-dir, if set
for (const arg of (options[key] || [])) {
const [ optdir, val ] = splitArg(arg);
if (optdir == dir)
opts[key] = val;
}
}
const opts = optionsForDirectory(dir, opt_keys);

const on_jam = () => {
console.error(
`Error: Message queue on ${dir} has jammed. Exiting for clean retry.`);
Expand Down Expand Up @@ -469,9 +477,14 @@ else {
for (const message_dir of (options.messages || [])) {
console.info(`Establishing messages watcher on ${message_dir}`);
message_publisher = createMessagePublisher(comms, message_dir);
const keep_failed_key = 'message-keep-failed';
const opts = optionsForDirectory(message_dir, [ keep_failed_key ]);
if (opts[keep_failed_key] == null)
opts[keep_failed_key] = DEFAULT_FAILED_MESSAGE_KEEP_COUNT;
msgwatch = new DirWatch(
message_dir,
(dir, fname) => message_publisher.add(dir, fname)
(dir, fname) => message_publisher.add(dir, fname),
{ max_failed: opts['message-keep-failed'] }
);
msgwatch.rescan();
}
Expand Down
5 changes: 3 additions & 2 deletions src/message_publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class MessagePublisher {
}
catch(e) {
console.warn(`Error loading letterhead/message: ${e}`);
this._q.complete(item);
item.promise.reject(e);
return;
}
Expand All @@ -176,10 +177,10 @@ class MessagePublisher {
.catch(e => {
this._q.complete(item);
item.retries = (item.retries || 0) + 1;
if (item.retries < this._max_retries)
if (item.retries <= this._max_retries)
this._q.add(item); // Not uploaded, try again
else
item.promise.reject(e);
item.promise.reject('retries exhausted');
})
}

Expand Down
18 changes: 18 additions & 0 deletions test/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ async function checkAgainstReference(title, dir, cfg) {
}).then(() => console.log('Ok'));
}

async function checkRetries(dir, n) {
process.stdout.write(`Checking: Retries (${n})...`);
dir = `${__dirname}/${dir}`;
const ta = new TestAdapter();
var attempts = 0;
ta.publish = () => { attempts = attempts + 1; return Promise.reject(); };
const mp = new MessagePublisher(
{ ['message-retries'] : n }, ta, () => assert(false));
const p = mp.add(dir, 'msg-a');
setImmediate(() => ta.connect());
return p.then(() => process.exit(1))
.catch(e => {
assert.equal(attempts, n+1);
console.log('Ok');
});
}

(async () => {
await check(
Expand Down Expand Up @@ -228,4 +244,6 @@ async function checkAgainstReference(title, dir, cfg) {
[ 0 ]
);

await checkRetries('msg-test-2', 4);
await checkRetries('msg-test-2', 0);
})();

0 comments on commit 59763be

Please sign in to comment.