diff --git a/package.json b/package.json index 6f5d84e..9cbffad 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "qryn-client", - "version": "1.0.1", + "version": "1.0.2", "description": "A client library for interacting with qryn, a high-performance observability backend.", "main": "src/index.js", "scripts": { diff --git a/src/types/index.js b/src/types/index.js index 352dfd1..95f7d6a 100644 --- a/src/types/index.js +++ b/src/types/index.js @@ -1,7 +1,25 @@ const QrynError = require("./qrynError"); const QrynResponse = require("./qrynResponse"); +class NetworkError extends QrynError { + constructor(message, options = {}) { + super(message, options); + this.name = 'NetworkError'; + this.statusCode = options.statusCode; + } +} + +class ValidationError extends QrynError { + constructor(message, options = {}) { + super(message, options); + this.name = 'ValidationError'; + this.field = options.field; + } +} + module.exports = { + NetworkError, + ValidationError, QrynError, QrynResponse } \ No newline at end of file diff --git a/src/utils/collector.js b/src/utils/collector.js index fc96360..20e3339 100644 --- a/src/utils/collector.js +++ b/src/utils/collector.js @@ -1,10 +1,12 @@ -const { QrynError } = require('../types'); +const { QrynError, NetworkError, ValidationError } = require('../types'); const { Stream, Metric } = require('../models'); +const EventEmitter = require('events'); /** * Collector class for collecting and pushing streams and metrics to Qryn. + * @extends EventEmitter */ -class Collector { +class Collector extends EventEmitter { /** * Create a Collector instance. * @param {Object} qrynClient - The Qryn client instance. @@ -12,17 +14,27 @@ class Collector { * @param {number} [options.maxBulkSize=1000] - The maximum bulk size for pushing data. * @param {number} [options.maxTimeout=5000] - The maximum timeout for pushing data. * @param {string} options.orgId - The organization ID. + * @param {number} [options.retryAttempts=3] - The number of retry attempts for failed pushes. + * @param {number} [options.retryDelay=1000] - The delay between retry attempts in milliseconds. */ constructor(qrynClient, options = {}) { + super(); this.qrynClient = qrynClient; this.maxBulkSize = options.maxBulkSize || 1000; this.maxTimeout = options.maxTimeout || 5000; this.orgId = options.orgId; + this.retryAttempts = options.retryAttempts || 3; + this.retryDelay = options.retryDelay || 1000; this.streams = []; this.metrics = []; this.timeoutId = null; } - get options(){ + + /** + * Get the current options for the collector. + * @returns {Object} The current options. + */ + get options() { return { orgId: this.orgId } @@ -31,11 +43,13 @@ class Collector { /** * Add a stream to the collector. * @param {Stream} stream - The stream instance to add. - * @throws {QrynError} If the stream is not a valid Stream instance. + * @throws {ValidationError} If the stream is not a valid Stream instance. */ addStream(stream) { if (!(stream instanceof Stream)) { - throw new QrynError('Invalid stream instance'); + const error = new ValidationError('Invalid stream instance'); + this.emit('error', error); + throw error; } this.streams.push(stream); this.checkBulkSize(); @@ -44,11 +58,13 @@ class Collector { /** * Add a metric to the collector. * @param {Metric} metric - The metric instance to add. - * @throws {QrynError} If the metric is not a valid Metric instance. + * @throws {ValidationError} If the metric is not a valid Metric instance. */ addMetric(metric) { if (!(metric instanceof Metric)) { - throw new QrynError('Invalid metric instance'); + const error = new ValidationError('Invalid metric instance'); + this.emit('error', error); + throw error; } this.metrics.push(metric); this.checkBulkSize(); @@ -84,13 +100,38 @@ class Collector { */ async pushBulk() { clearTimeout(this.timeoutId); - if (this.streams.length > 0) { - await this.qrynClient.loki.push(this.streams, this.options); - this.streams = []; - } - if (this.metrics.length > 0) { - await this.qrynClient.prom.push(this.metrics, this.options); - this.metrics = []; + await this.retryOperation(async () => { + if (this.streams.length > 0) { + await this.qrynClient.loki.push(this.streams, this.options); + this.streams = []; + } + if (this.metrics.length > 0) { + await this.qrynClient.prom.push(this.metrics, this.options); + this.metrics = []; + } + }); + } + + /** + * Retry an operation with exponential backoff. + * @private + * @async + * @param {Function} operation - The operation to retry. + * @throws {QrynError} If all retry attempts fail. + */ + async retryOperation(operation) { + for (let attempt = 1; attempt <= this.retryAttempts; attempt++) { + try { + await operation(); + return; + } catch (error) { + if (attempt === this.retryAttempts) { + const qrynError = new QrynError('Failed to push data after multiple attempts', { cause: error }); + this.emit('error', qrynError); + throw qrynError; + } + await new Promise(resolve => setTimeout(resolve, this.retryDelay * Math.pow(2, attempt - 1))); + } } } }