diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 5ed3982ae..cd0dbeb95 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -148,11 +148,23 @@ class LogReader { }); this.logConsumer.readRecords({ limit: 1 }, (err, res) => { if (err) { - this.log.error('error while reading log', { - method: 'LogReader._initializeLogOffset', - error: err, - }); - return done(err); + // FIXME: getRaftLog metadata route returns 500 when + // its cache does not contain the queried raft + // session. For the sake of simplicity, in order to + // allow the populator to make progress, we choose to + // accept errors fetching the current offset during + // setup phase and fallback to starting from offset + // 1. It would be better to have metadata return + // special success statuses in such case. + this.log.warn( + 'error reading initial log offset, ' + + 'default to initial offset 1', { + method: 'LogReader._initializeLogOffset', + zkPath: pathToLogOffset, + logOffset: 1, + error: err, + }); + return done(null, 1); } const logOffset = res.info.cseq + 1; this.log.info('starting after latest log sequence', { diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index d93a637f6..d39be2830 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -2,7 +2,7 @@ const assert = require('assert'); const ZookeeperMock = require('zookeeper-mock'); -const { versioning } = require('arsenal'); +const { errors, versioning } = require('arsenal'); const { DbPrefixes } = versioning.VersioningConstants; const { Logger } = require('werelogs'); @@ -10,13 +10,21 @@ const { Logger } = require('werelogs'); const LogReader = require('../../../../lib/queuePopulator/LogReader'); class MockLogConsumer { + constructor(params) { + this.params = params || {}; + } + readRecords(params, cb) { process.nextTick(() => { - cb(null, { - info: { - cseq: 12345, - }, - }); + if (this.params.readRecordsError) { + cb(this.params.readRecordsError); + } else { + cb(null, { + info: { + cseq: 12345, + }, + }); + } }); } } @@ -78,4 +86,20 @@ describe('LogReader', () => { done(); }); }); + + it('should start from offset 1 on log consumer readRecords error', done => { + const errorLogReader = new LogReader({ + logId: 'test-log-reader', + zkClient: zkMock.createClient('localhost:2181'), + logConsumer: new MockLogConsumer({ + readRecordsError: errors.InternalError, + }), + logger: new Logger('test:ErrorLogReader'), + }); + errorLogReader.setup(err => { + assert.ifError(err); + assert.strictEqual(errorLogReader.logOffset, 1); + done(); + }); + }); });