Skip to content

:shipit: konsume seamlessly integrates message queues like RabbitMQ and Kafka with other services, automating data-driven actions.

License

Notifications You must be signed in to change notification settings

bugrakocabay/konsume

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

konsume logo

konsume is a powerful and flexible tool designed to consume messages from various message queues like RabbitMQ and Kafka and perform data-driven actions like HTTP requests and insertions into databases based on predefined configurations.

CI Go Version Go Report Coverage

Table of Contents

Overview

TLDR; If you want to consume messages from message queues such as RabbitMQ or Kafka and perform HTTP requests or insert data to a database based on configurations, konsume is for you.

konsume is a tool that easily connects message queues like RabbitMQ and Kafka with web services, automating data-driven actions. It bridges complex messaging systems and web APIs or databases. Its flexible setup, including various retry options and customizable request formats, suits a range of uses, from basic data transfers to intricate processing tasks.

konsume diagram

Features

  • Message Consumption: Efficiently consumes messages from specified queues.
  • Dynamic HTTP Requests: Sends HTTP requests based on message content and predefined configurations.
  • Database Insertions: Inserts data into databases based on message content and predefined configurations.
  • Retry Strategies: Supports fixed, exponential, and random retry strategies for handling request failures.
  • Request Body Templating: Dynamically constructs request bodies using templates with values extracted from incoming messages.
  • Custom HTTP Headers: Allows setting custom HTTP headers for outgoing requests.
  • Configurable via YAML and JSON: Easy configuration using a YAML or JSON file for defining queues, routes, and behaviors.
  • Monitoring: Provides a Prometheus endpoint for monitoring metrics.

Installation

Easiest way to install konsume is to run via Docker.

docker run -d --name konsume -v /path/to/config.yaml:/config/config.yaml bugrakocabay/konsume:latest

Alternatively, you can download the latest binary from the releases page and run it on your machine.

Usage

konsume depends on a YAML or JSON configuration file for defining queues, routes, and behaviors. By default, konsume looks for a file named config.yaml in the /config directory. Alternatively, you can specify the environment variable KONSUME_CONFIG_PATH to point to your configuration file.


πŸ“œ Checkout the CONFIGURATION.md for detailed information about the configuration parameters.
πŸ’‘ You can also find detailed usage examples in the examples directory.

A simple usage for konsume to send HTTP requests with RabbitMQ:

providers:
  - name: 'rabbit-queue'
    type: 'rabbitmq'
    amqp-config:
      host: 'localhost'
      port: 5672
      username: 'user'
      password: 'password'
queues:
  - name: 'queue-for-rabbit'
    provider: 'rabbit-queue'
    routes:
      - name: 'ServiceA_Queue2'
        type: 'REST'
        method: 'POST'
        url: 'https://someurl.com'

A simple usage for konsume to insert into a database with Kafka:

providers:
  - name: 'kafka-queue'
    type: 'kafka'
    kafka-config:
      brokers:
        - 'localhost:9092'
      topic: 'your_topic_name'
      group: 'group1'
databases:
  - name: "sql-database"
    type: "postgresql"
    connection-string: "postgres://postgres:password@host:5432/dbname"
queues:
  - name: "queue-for-kafka"
    provider: "kafka-queue"
    database-routes:
      - name: "sql-database-route"
        provider: "sql-database"
        table: "some_table"
        mapping:
          userName: "name"
          some-key: "value"

FAQ

Why konsume?

Think of konsume as your handy tool for making message queues and other services work together like best buddies. It's like having a super-efficient assistant who takes messages from RabbitMQ or Kafka and knows exactly when and how to insert data into databases or ping your web services, whether they speak REST or GraphQL. And guess what? If something doesn't go right the first time, konsume keeps trying until it works, thanks to its smart retry strategies. So, whether you're just moving data around or setting up some cool automated workflows, konsume is your go-to for making things simple and reliable.

What message queues does konsume support? Currently konsume supports RabbitMQ, Kafka and ActiveMQ. But it is designed to be easily extensible to support other message queues.
What databases does konsume support? Currently konsume only supports Postgres. But it is designed to be easily extensible to support other databases.
How can I dynamically insert values from consumed messages into the request body? konsume allows dynamically inserting values from consumed messages into the request body using placeholders. You can use the {{key}} syntax to insert values from consumed messages into the request body. For example, if you have a message like this:
{
	"name": "John",
	"email": "[email protected]"
}

You can use the {{name}} and {{email}} placeholders in the request body to insert the values from the consumed message into the request body.

routes:
  - name: 'test-route'
    method: 'POST'
    type: 'REST'
    headers:
      Content-Type: 'application/json'
    body:
      userName: '{{name}}'
      eMail: '{{email}}'
    url: 'http://someurl.com'
How can I dynamically map the values inside a message into columns/fields of a database? In order to dynamically map the values inside a message into columns/fields of a database, you can use the mapping section in the database route configuration. You can define the mapping between the fields of the message and the columns of the database table. For example, if you have a message like this:
{
	"name": "John",
	"email": "[email protected]"
}

You can use the mapping section to map the name field of the message to the user_name column of the database table and the email field of the message to the user_email column of the database table.

queues:
  - name: "queue-for-kafka"
    provider: "kafka-queue"
    database-routes:
      - name: "sql-database-route"
        provider: "sql-database"
        table: "some_table"
        mapping:
          name: "user_name"
          email: "user_email"
Is GraphQL supported? Yes! konsume supports GraphQL. You can use the graphql type for routes and define the GraphQL query or mutation in the body section of the route. Under body section, you can use the query or mutation key to define your GraphQL query or mutation. Also konsume allows dynamically inserting values from consumed messages into the GraphQL body using placeholders.
routes:
  - name: 'test-route'
    method: 'POST'
    type: 'graphql'
    headers:
      Content-Type: 'application/json'
    body:
      mutation: |
        mutation {
          addUser(name: {{name1}}, email: {{email1}}) {
            id
            name
            email
          }
        }
    url: 'http://someurl:4000/graphql'
How does the retry mechanism work? konsume supports three different retry strategies: fixed, expo, and random. You can define the retry strategy in the retry section of the queue configuration. If you want to enable retrying, you should set the enabled flag to true. You can also define the maximum amount of times that retrying will be triggered using the max-retries key. The interval key defines the amount of time between retries. The threshold-status key defines the minimum HTTP status code to trigger retry mechanism, any status code above or equal this will trigger retrying. If you don't define the threshold-status key, it will default to 500.
queues:
  - name: 'queue-for-rabbit'
    provider: 'rabbit-queue'
    retry:
      enabled: true
      strategy: 'fixed'
      max-retries: 5
      interval: 5s
      threshold-status: 500
    routes:
      - name: 'ServiceA_Queue2'
        type: 'REST'
        method: 'POST'
        url: 'https://someurl.com'
How to see the metrics? konsume provides a Prometheus endpoint for monitoring metrics. You can see the metrics at /metrics by default. Here you will find a list of metrics that Prometheus can scrape by default.
Also, konsume provides custom metrics for the following events:
- konsume_messages_consumed_total: Total number of messages consumed.
- konsume_http_requests_made_total: Total number of HTTP requests made.
- konsume_http_requests_succeeded_total: Total number of HTTP requests succeeded.
- konsume_http_requests_failed_total: Total number of HTTP requests failed.
What are some common troubleshooting steps if konsume is not working as expected?
  1. Enable Debug Mode and Examine Logs: Look at the logs for any error messages or warnings. If you've enabled debug mode, this will provide more detailed information.
  2. Check Configuration: Ensure your config.yaml is correctly set up for your message queues and routes. Verify all parameters, especially URLs, queue names, and credentials.
  3. Validate Queue Connectivity: Make sure Konsume can connect to the message queues. Check network configurations, access permissions, and queue settings.
  4. Test HTTP Endpoints: Ensure the endpoints for your HTTP requests are reachable and responding as expected. You can test them independently with tools like Postman or cURL.
  5. Review Message Formats: Confirm that the messages in your queues are in the expected format, especially if you're using templating features.
  6. Monitor Resource Usage: Sometimes issues arise due to resource constraints. Check CPU, memory, and network usage.
  7. Update Konsume: Ensure you're using the latest version of Konsume, as updates might fix known issues.
  8. Seek Community Help: If you're still stuck, consider asking for help in issues or discussions.

Contributing

🌟 Your contributions are welcome!

Whether you're looking to fix bugs, add new features, or improve documentation, your help is greatly appreciated. Contributing to konsume is not only a great way to enhance this tool, but it's also an excellent opportunity to get involved with a community of like-minded individuals.

Here's how you can contribute:

Report Issues: Found a bug or have a suggestion? Open an issue and let us know! Submit Pull Requests: Have a fix or a new feature? Submit a pull request! Feedback and Ideas: Share your thoughts and ideas on how we can improve konsume. Documentation: Help us improve or translate the documentation. Before contributing, please read our Contributing Guidelines for more information on how to get started.

πŸ’‘ No contribution is too small – whether it's fixing typos, improving code readability, or updating documentation, all contributions are valuable and appreciated!

Join us in making konsume better for everyone! πŸš€

About

:shipit: konsume seamlessly integrates message queues like RabbitMQ and Kafka with other services, automating data-driven actions.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages