You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The KubeMQ SDK for NodeJS enables Node JS/Typescript developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.
Prerequisites
Node.js (Ensure you have a recent version of Node.js installed)
TypeScript Compiler
KubeMQ server running locally or accessible over the network
Installation
The recommended way to use the SDK for Node in your project is to consume it from Node package manager.
npm install kubemq-js
Payload Details
Metadata: The metadata allows us to pass additional information with the event. Can be in any form that can be presented as a string, i.e., struct, JSON, XML and many more.
Body: The actual content of the event. Can be in any form that is serializable into a byte array, i.e., string, struct, JSON, XML, Collection, binary file and many more.
ClientID: Displayed in logs, tracing, and KubeMQ dashboard(When using Events Store, it must be unique).
Tags: Set of Key-value pair that help categorize the message
KubeMQ PubSub Client
For executing PubSub operation we have to create the instance of PubsubClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.
PubSub Client Configuration
Name
Type
Description
Default Value
Mandatory
address
String
The address of the KubeMQ server.
None
Yes
clientId
String
The client ID used for authentication.
None
Yes
authToken
String
The authorization token for secure communication.
None
No
tls
boolean
Indicates if TLS (Transport Layer Security) is enabled.
None
No
tlsCertFile
String
The path to the TLS certificate file.
None
No (Yes if tls is true)
tlsKeyFile
String
The path to the TLS key file.
None
No (Yes if tls is true)
tlsCaCertFile
String
The path to the TLS CA cert file.
None
No (Yes if tls is true)
maxReceiveSize
int
The maximum size of the messages to receive (in bytes).
104857600 (100MB)
No
reconnectIntervalSeconds
int
The interval in seconds between reconnection attempts.
1
No
Pubsub Client connection establishment example code
The example below demonstrates to construct PubSubClient with ssl and other configurations:
constconfig: Config={address: 'localhost:50000',// KubeMQ gRPC endpoint addressclientId: 'your-client-id',// Connection clientIdauthToken: 'your-jwt-auth-token',// Optional JWT authorization tokentls: true,// Indicates if TLS is enabledtlsCertFile: 'path/to/tls-cert.pem',// Path to the TLS certificate filetlsKeyFile: 'path/to/tls-key.pem',// Path to the TLS key filetlsCaCertFile: 'path/to/tls-key.pem',// Path to the TLS key filemaxReceiveSize: 1024*1024*100,// Maximum size of the messages to receive (100MB)reconnectIntervalSeconds: 1// Interval in milliseconds between reconnect attempts (1 second)};
Ping To KubeMQ server
You can ping the server to check connection is established or not.
Tags associated with the event message as key-value pairs.
Empty Map
No
Note:-metadata or body or tags any one is required
Response: NONE
awaitpubsubClient.sendEventStoreMessage({id: '987',channel: 'events_store.single',body: Utils.stringToBytes('event store message'),});
PubSub SubscribeEvents Example:
Request: EventsSubscription Class Attributes
Name
Type
Description
Default Value
Mandatory
channel
String
The channel to subscribe to.
None
Yes
group
String
The group to subscribe with.
None
No
onReceiveEventCallback
Consumer
Callback function to be called when an event message is received.
None
Yes
onErrorCallback
Consumer
Callback function to be called when an error occurs.
None
No
Response: NONE
Callback: EventMessageReceived class details
Name
Type
Description
id
String
The unique identifier of the message.
fromClientId
String
The ID of the client that sent the message.
timestamp
long
The timestamp when the message was received, in seconds.
channel
String
The channel to which the message belongs.
metadata
String
The metadata associated with the message.
body
byte[]
The body of the message.
sequence
long
The sequence number of the message.
tags
Map<String, String>
The tags associated with the message.
asyncfunctionsubscribeToEvent(){//Subscribes to events from the specified channel and processes received events. consteventsSubscriptionRequest=newEventsSubscriptionRequest('events.A','');// Define the callback for receiving events eventsSubscriptionRequest.onReceiveEventCallback=(event: EventMessageReceived)=>{console.log('SubscriberA received event:',{id: event.id,fromClientId: event.fromClientId,timestamp: event.timestamp,channel: event.channel,metadata: event.metadata,body: event.body,tags: event.tags,});};// Define the callback for handling errors eventsSubscriptionRequest.onErrorCallback=(error: string)=>{console.error('SubscriberA error:',error);};pubsubClient.subscribeToEvents(eventsSubscriptionRequest).then(()=>{console.log('Subscription successful');}).catch((reason: any)=>{console.error('Subscription failed:',reason);});}
Callback function to be called when an event message is received.
None
Yes
onErrorCallback
Consumer
Callback function to be called when an error occurs.
None
No
Response: None
Callback: EventStoreMessageReceived class details
Name
Type
Description
id
String
The unique identifier of the message.
fromClientId
String
The ID of the client that sent the message.
timestamp
long
The timestamp when the message was received, in seconds.
channel
String
The channel to which the message belongs.
metadata
String
The metadata associated with the message.
body
byte[]
The body of the message.
sequence
long
The sequence number of the message.
tags
Map<String, String>
The tags associated with the message.
asyncfunctionsubscribeToEventStore(){//Subscribes to events store messages from the specified channel with a specific configuration. consteventsSubscriptionRequest=newEventsStoreSubscriptionRequest('events_store.A','');eventsSubscriptionRequest.eventsStoreType=EventStoreType.StartAtSequence;eventsSubscriptionRequest.eventsStoreSequenceValue=1;// Define the callback for receiving events eventsSubscriptionRequest.onReceiveEventCallback=(event: EventStoreMessageReceived)=>{console.log('SubscriberA received event:',{id: event.id,fromClientId: event.fromClientId,timestamp: event.timestamp,channel: event.channel,metadata: event.metadata,body: event.body,tags: event.tags,sequence: event.sequence,});};// Define the callback for handling errors eventsSubscriptionRequest.onErrorCallback=(error: string)=>{console.error('SubscriberA error:',error);};pubsubClient.subscribeToEvents(eventsSubscriptionRequest).then(()=>{console.log('Events Subscription successful');}).catch((reason: any)=>{console.error('Event Subscription failed:',reason);});}
KubeMQ Queues Operations
The examples below demonstrate the usage of KubeMQ Queues client. The examples include creating, deleting, listing channels, and sending/receiving queues messages.
Construct the Queues Client
For executing Queues operation we have to create the instance of QueuesClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.
QueuesClient Configuration
Name
Type
Description
Default Value
Mandatory
address
String
The address of the KubeMQ server.
None
Yes
clientId
String
The client ID used for authentication.
None
Yes
authToken
String
The authorization token for secure communication.
None
No
tls
boolean
Indicates if TLS (Transport Layer Security) is enabled.
None
No
tlsCertFile
String
The path to the TLS certificate file.
None
No (Yes if tls is true)
tlsKeyFile
String
The path to the TLS key file.
None
No (Yes if tls is true)
tlsCaCertFile
String
The path to the TLS CA cert file.
None
No (Yes if tls is true)
maxReceiveSize
int
The maximum size of the messages to receive (in bytes).
104857600 (100MB)
No
reconnectIntervalSeconds
int
The interval in seconds between reconnection attempts.
1
No
Queues Client establishing a connection example code
The example below demonstrates to construct PubSubClient with ssl and other configurations:
constopts: Config={address: 'localhost:50000',// KubeMQ gRPC endpoint addressclientId: 'your-client-id',// Connection clientIdauthToken: 'your-jwt-auth-token',// Optional JWT authorization tokentls: true,// Indicates if TLS is enabledtlsCertFile: 'path/to/tls-cert.pem',// Path to the TLS certificate filetlsKeyFile: 'path/to/tls-key.pem',// Path to the TLS key filetlsCaCertFile: 'path/to/tls-ca-cert.pem',// Path to the TLS CA cert filemaxReceiveSize: 1024*1024*100,// The Maximum size of the messages to receive (100MB)reconnectIntervalSeconds: 1// Interval in milliseconds between reconnect attempts (1 second)};constqueuesClient=newQueuesClient(opts);
Ping To KubeMQ server
You can ping the server to check connection is established or not.
Indicates if messages should be auto-acknowledged.
false
No
visibilitySeconds
int
Add a visibility timeout feature for messages.
0
No
Response: QueuesMessagesPulledResponse Class Attributes
Name
Type
Description
id
String
The reference ID of the request.
messages
QueueMessageReceived[]
The list of received queue messages.
messagesReceived
number
Number of valid messages received.
messagesExpired
number
Number of messages expired.
isPeek
boolean
Indicates if the operation is a peek or pull.
error
String
The error message, if any error occurred.
isError
boolean
Indicates if there was an error.
visibilitySeconds
int
The visibility timeout for the message in seconds.
isAutoAcked
boolean
Indicates whether the message was auto-acknowledged.
Response: QueueMessageReceived class attributes
Here's the requested Markdown table for the QueueMessageReceived class:
Name
Type
Description
id
String
The unique identifier for the message.
channel
String
The channel from which the message was received.
metadata
String
Metadata associated with the message.
body
byte[]
The body of the message in byte array format.
fromClientId
String
The ID of the client that sent the message.
tags
Map<String, String>
Key-value pairs representing tags for the message.
timestamp
Instant
The timestamp when the message was created.
sequence
long
The sequence number of the message.
receiveCount
int
The number of times the message has been received.
isReRouted
boolean
Indicates whether the message was rerouted.
reRouteFromQueue
String
The name of the queue from which the message was rerouted.
expiredAt
Instant
The expiration time of the message, if applicable.
delayedTo
Instant
The time the message is delayed until, if applicable.
transactionId
String
The transaction ID associated with the message.
isTransactionCompleted
boolean
Indicates whether the transaction for the message is completed.
responseHandler
StreamObserver<QueuesDownstreamRequest>
The response handler for processing downstream requests.
receiverClientId
String
The ID of the client receiving the message.
visibilitySeconds
int
The visibility timeout for the message in seconds.
isAutoAcked
boolean
Indicates whether the message was auto-acknowledged.
Example
asyncfunctionmain(){constopts: Config={address: 'localhost:50000',clientId: 'kubeMQClientId-ts',};constqueuesClient=newQueuesClient(opts);// Receive with message visibility asyncfunctionreceiveWithVisibility(visibilitySeconds: number){console.log("\n============================== Receive with Visibility =============================\n");try{constpollRequest=newQueuesPollRequest({channel: 'visibility_channel',pollMaxMessages: 1,pollWaitTimeoutInSeconds: 10,visibilitySeconds: visibilitySeconds,autoAckMessages: false,});constpollResponse=awaitqueuesClient.receiveQueuesMessages(pollRequest);console.log("Received Message Response:",pollResponse);if(pollResponse.isError){console.log("Error: "+pollResponse.error);}else{pollResponse.messages.forEach(async(msg)=>{console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`);try{awaitnewPromise(resolve=>setTimeout(resolve,1000));awaitmsg.ack();console.log("Acknowledged message");}catch(err){console.error("Error acknowledging message:",err);}});}}catch(error){console.error('Failed to receive queue messages:',error);}}// Test visibility expiration asyncfunctionreceiveWithVisibilityExpired(){console.log("\n============================== Receive with Visibility Expired =============================\n");awaitreceiveWithVisibility(2);}// Test visibility extension asyncfunctionreceiveWithVisibilityExtension(){console.log("\n============================== Receive with Visibility Extension =============================\n");try{constpollRequest=newQueuesPollRequest({channel: 'visibility_channel',pollMaxMessages: 1,pollWaitTimeoutInSeconds: 10,visibilitySeconds: 3,autoAckMessages: false,});constpollResponse=awaitqueuesClient.receiveQueuesMessages(pollRequest);console.log("Received Message Response:",pollResponse);if(pollResponse.isError){console.log("Error: "+pollResponse.error);}else{pollResponse.messages.forEach(async(msg)=>{console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`);try{awaitnewPromise(resolve=>setTimeout(resolve,1000));awaitmsg.extendVisibilityTimer(3);awaitnewPromise(resolve=>setTimeout(resolve,2000));awaitmsg.ack();console.log("Acknowledged message after extending visibility");}catch(err){console.error("Error during visibility extension:",err);}});}}catch(error){console.error('Failed to receive queue messages:',error);}}awaitreceiveWithVisibilityExpired();awaitreceiveWithVisibilityExtension();}main();
This method allows you to receive messages from a specified Queue channel. You can configure the polling behavior, including the maximum number of messages to receive and the wait timeout. The response provides detailed information about the received messages and the transaction.
Message Handling Options:
Acknowledge (ack): Mark the message as processed and remove it from the queue.
Reject: Reject the message. It won't be requeued.
Requeue: Send the message back to the queue for later processing.
Choose the appropriate handling option based on your application's logic and requirements.
KubeMQ Command & Query Operations
Construct the CQClient
For executing command & query operation we have to create the instance of CQClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.
CQClient Configuration
Name
Type
Description
Default Value
Mandatory
address
String
The address of the KubeMQ server.
None
Yes
clientId
String
The client ID used for authentication.
None
Yes
authToken
String
The authorization token for secure communication.
None
No
tls
boolean
Indicates if TLS (Transport Layer Security) is enabled.
None
No
tlsCertFile
String
The path to the TLS certificate file.
None
No (Yes if tls is true)
tlsKeyFile
String
The path to the TLS key file.
None
No (Yes if tls is true)
tlsCaCertFile
String
The path to the TLS CA cert file.
None
No (Yes if tls is true)
maxReceiveSize
int
The maximum size of the messages to receive (in bytes).
104857600 (100MB)
No
reconnectIntervalSeconds
int
The interval in seconds between reconnection attempts.
The example below demonstrates to construct CQClient with ssl and other configurations:
constconfig: Config={address: 'localhost:50000',// KubeMQ gRPC endpoint addressclientId: 'your-client-id',// Connection clientIdauthToken: 'your-jwt-auth-token',// Optional JWT authorization tokentls: true,// Indicates if TLS is enabledtlsCertFile: 'path/to/tls-cert.pem',// Path to the TLS certificate filetlsKeyFile: 'path/to/tls-key.pem',// Path to the TLS key filetlsCaCertFile: 'path/to/tls-ca-cert.pem',// Path to the TLS CA cert filemaxReceiveSize: 1024*1024*100,// Maximum size of the messages to receive (100MB)reconnectIntervalSeconds: 1,// Interval in milliseconds between reconnect attempts (1 second)};constcqClient=newCQClient(opts);
Ping To KubeMQ server
You can ping the server to check connection is established or not.
asyncfunctionsubscribeToCommands(channelName: string){//Subscribes to commands from the specified channel with a specific configuration. constcommandSubscriptionRequest=newCommandsSubscriptionRequest(channelName,'group1');// Define the callback for receiving commandMessage commandSubscriptionRequest.onReceiveEventCallback=(commandMessage: CommandMessageReceived)=>{console.log('SubscriberA received commandMessage:',{id: commandMessage.id,fromClientId: commandMessage.fromClientId,timestamp: commandMessage.timestamp,channel: commandMessage.channel,metadata: commandMessage.metadata,body: commandMessage.body,tags: commandMessage.tags,});};// Define the callback for handling errors commandSubscriptionRequest.onErrorCallback=(error: string)=>{console.error('SubscriberA error:',error);};cqClient.subscribeToCommands(commandSubscriptionRequest).then(()=>{console.log('Command Subscription successful');}).catch((reason: any)=>{console.error('Command Subscription failed:',reason);});}
Queries SubscribeToQueriesChannel Example:
Request: QueriesSubscriptionRequest Class Attributes
asyncfunctionsubscribeToQueries(channelName: string){//Subscribes to queries from the specified channel with a specific configuration. constcommandSubscriptionRequest=newCommandsSubscriptionRequest(channelName,'group1');// Define the callback for receiving queriesMessage commandSubscriptionRequest.onReceiveEventCallback=(commandMessage: CommandMessageReceived)=>{console.log('SubscriberA received event:',{id: commandMessage.id,fromClientId: commandMessage.fromClientId,timestamp: commandMessage.timestamp,channel: commandMessage.channel,metadata: commandMessage.metadata,body: commandMessage.body,tags: commandMessage.tags,});};// Define the callback for handling errors commandSubscriptionRequest.onErrorCallback=(error: string)=>{console.error('SubscriberA error:',error);};cqClient.subscribeToQueries(commandSubscriptionRequest).then(()=>{console.log('Queries Subscription successful');}).catch((reason: any)=>{console.error('Queries Subscription failed:',reason);});}