Skip to content

CDP4ServicesMessaging

samatrhea edited this page Aug 21, 2024 · 2 revisions

Overview

The CDP4ServicesMessaging library is a library that provides an implementation with handy access to functionalities and managed sessions for the AMQP protocol 0-9-1 . It depends on the following .Net client which is maintained by the RabbitMQ team at VMware

Current state

  • Support for Direct and Broadcast messages
  • Sessions reuse per thread, using client level disposable pattern
  • IObservable Listener.
  • Pushing messages in a 'Fire and forget' manner.
  • Background service to offload publishing messages.
  • Auto-disposal of sessions when pushing messages.
  • Routing for broadcasted messages.

For any exchange type, the message receival acknowledgement is set to automatic for any exchange type. Meaning that one message is never delivered twice to the same queue. It is a choice of design to not expose the choice to set the message receival acknowledgement mode.

Issues

Open Issues

Purpose

Beyond exchanging messages in a network between services, the CDP4ServicesMessaging, provides a ThingMessageProducer and a ThingMessageConsumer. These two distinct services are designed to specifically send and receive ThingsChangedMessages which carries a collections of things that have been either deleted, created or updated.

The ThingMessageProducer and a ThingMessageConsumer depends on an implementation of the ICdp4MessageSerializer w.

Basic usage

Sending a message

/// <summary>
/// Broadcast a message about the changed <see cref="Thing"/> instances
/// </summary>
/// <param name="changedThings">The collection of changed <see cref="Thing"/> instances.</param>
/// <param name="thingMessageProducer">The <see cref="IThingMessageProducer"/></param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/></param>
/// <returns>An asynchronous task representing the operation.</returns>
public async Task Send(IEnumerable<Thing> changedThings, [FromServices] IThingMessageProducer thingMessageProducer, CancellationToken cancellationToken = default)
{
    var message = new ThingChangedMessage()
    {
        ChangedThings = changedThings.ToList()
    };

    await thingMessageProducer.Push(message, cancellationToken);
}
/// <summary>
/// Queue a message to be in the future broadcasted about the changed <see cref="Thing"/> instances
/// </summary>
/// <param name="changedThings">The collection of changed <see cref="Thing"/> instances.</param>
/// <param name="thingMessageProducer">The <see cref="IBackgroundThingsMessageProducer"/></param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/></param>
/// <returns>An asynchronous task representing the operation.</returns>
public async Task Send(IEnumerable<Thing> changedThings, [FromServices] IBackgroundThingsMessageProducer thingMessageProducer)
{
    var message = new ThingChangedMessage()
    {
        ChangedThings = changedThings.ToList()
    };

    await thingsMessageProducer.EnqueueAsync(message);
}

Listening for messages

/// <summary>
/// Add observable that yield messages of type <see cref="string"> when ever there is any available</see>
/// </summary>
/// <returns>An asynchronous task representing the operation.</returns>
public async Task AddListener([FromServices] IMessageClientService messageQueueClient)
{
    IObservable<string> observable = await messageQueueClient.Listen<string>("QueueName", ExchangeType.Direct);

    observable.Subscribe(x => Console.WriteLine(x));
}

When using the following method

Task<IObservable<TMessage>> Listen<TMessage>(string queueName, ExchangeType exchangeType = ExchangeType.Default, CancellationToken cancellationToken = default) where TMessage : class;

as in the example above the implementation takes care of opening/getting a connection, opening/using a channel, ensuring the proper exchange is declared depending on the provided string queue and ExchangeType exchangeType and lastly de-serializing the messages.

Below is an example of using the other method to receive messages

Task<IDisposable> AddListener(string queueName, EventHandler<BasicDeliverEventArgs> onReceive, ExchangeType exchangeType = ExchangeType.Default, CancellationToken cancellationToken = default);

This one, does the same as the previous one except that it does not take care of de-serializing the message. Meaning that the whole message envelope can be use for inspection or custom de-serialization.

```CSharp
/// <summary>
/// Adds a listener and assign an event handler when ever there is any message available</see>
/// </summary>
/// <returns>An asynchronous task representing the operation.</returns>
public async Task AddListener([FromServices] IMessageClientService messageQueueClient)
{
    IDisposable channel = await messageQueue.AddListener("QueueName", (_, message ) => this.MessageReceived(message));
}

private void MessageReceived(BasicDeliverEventArgs message)
{
    string message = Encoding.UTF8.GetString(envelope.Body.ToArray()); // Convert the body as byte array to string
    Console.WriteLine($"Received message: {message}");
}

Note when disposing of the channel initialized in the above example, the connection and the channel are closed and disposed and the even handler will not be called anymore.

Clone this wiki locally