Skip to content

Commit c957b41

Browse files
author
Sajad Masjoodi
committed
Fix: answer the _write function call back, immediately. and remove the call backs mechanism for delayed answering.
1 parent 2aa5d9c commit c957b41

File tree

1 file changed

+10
-24
lines changed

1 file changed

+10
-24
lines changed

lib/sender.js

+10-24
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class FluentSender {
3737
if (this._eventMode === 'Message') {
3838
this._sendQueue = []; // queue for items waiting for being sent.
3939
this._flushInterval = 0;
40-
this._messageQueueSizeLimit = options.messageQueueSizeLimit || 0;
40+
this._messageQueueSizeLimit = options.messageQueueSizeLimit || 1000;
4141
} else {
4242
this._sendQueue = new Map();
4343
this._flushInterval = options.flushInterval || 100;
@@ -110,10 +110,11 @@ class FluentSender {
110110
return;
111111
}
112112

113-
this._push(tag, timestamp, data, callback);
113+
this._push(tag, timestamp, data);
114114
this._connect(() => {
115115
this._flushSendQueue();
116116
});
117+
callback()
117118
}
118119

119120
end(label, data, callback) {
@@ -182,11 +183,10 @@ class FluentSender {
182183
return msgpack.encode([time, data], { codec: codec });
183184
}
184185

185-
_push(tag, time, data, callback) {
186+
_push(tag, time, data) {
186187
if (this._eventMode === 'Message') {
187188
// Message mode
188189
const item = this._makePacketItem(tag, time, data);
189-
item.callback = callback;
190190
if (this._messageQueueSizeLimit && this._sendQueue.length === this._messageQueueSizeLimit) {
191191
this._sendQueue.shift();
192192
}
@@ -199,13 +199,10 @@ class FluentSender {
199199
const eventEntryData = this._sendQueue.get(tag);
200200
eventEntryData.eventEntries.push(eventEntry);
201201
eventEntryData.size += eventEntry.length;
202-
if (callback) eventEntryData.callbacks.push(callback);
203202
} else {
204-
const callbacks = callback ? [callback] : [];
205203
this._sendQueue.set(tag, {
206204
eventEntries: [eventEntry],
207205
size: eventEntry.length,
208-
callbacks: callbacks
209206
});
210207
}
211208
}
@@ -292,7 +289,7 @@ class FluentSender {
292289
this._socket && this._socket.destroy();
293290
this._socket = null;
294291
this._status = null;
295-
this._connecting = false;
292+
setTimeout(()=>{this._connecting = false},this.reconnectInterval)
296293
}
297294

298295
_handshake(callback) {
@@ -375,7 +372,7 @@ class FluentSender {
375372
// nothing written;
376373
return;
377374
}
378-
this._doWrite(item.packet, item.options, timeoutId, [item.callback]);
375+
this._doWrite(item.packet, item.options, timeoutId);
379376
} else {
380377
if (this._sendQueue.size === 0) {
381378
this._flushingSendQueue = false;
@@ -398,11 +395,11 @@ class FluentSender {
398395
eventEntryDataSize: eventEntryData.size
399396
};
400397
const packet = msgpack.encode([tag, entries, options], { codec: codec });
401-
this._doWrite(packet, options, timeoutId, eventEntryData.callbacks);
398+
this._doWrite(packet, options, timeoutId);
402399
}
403400
}
404401

405-
_doWrite(packet, options, timeoutId, callbacks) {
402+
_doWrite(packet, options, timeoutId) {
406403
const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize;
407404
this._socket.write(packet, () => {
408405
if (this.requireAckResponse) {
@@ -414,13 +411,7 @@ class FluentSender {
414411
'ack in response and chunk id in sent data are different',
415412
{ ack: response.ack, chunk: options.chunk }
416413
);
417-
callbacks.forEach((callback) => {
418-
this._handleEvent('error', error, callback);
419-
});
420-
} else { // no error on ack
421-
callbacks.forEach((callback) => {
422-
callback && callback();
423-
});
414+
this._handleEvent('error', error);
424415
}
425416
this._sendQueueSize -= sendPacketSize;
426417
process.nextTick(() => {
@@ -429,15 +420,10 @@ class FluentSender {
429420
});
430421
timeoutId = setTimeout(() => {
431422
const error = new FluentLoggerError.ResponseTimeout('ack response timeout');
432-
callbacks.forEach((callback) => {
433-
this._handleEvent('error', error, callback);
434-
});
423+
this._handleEvent('error', error);
435424
}, this.ackResponseTimeout);
436425
} else {
437426
this._sendQueueSize -= sendPacketSize;
438-
callbacks.forEach((callback) => {
439-
callback && callback();
440-
});
441427
process.nextTick(() => {
442428
this._waitToWrite();
443429
});

0 commit comments

Comments
 (0)