Skip to content

An opinionated framework for type consistency in message stacks with a focus on developer experience.

License

Notifications You must be signed in to change notification settings

unaussprechlich/ts-messaging

Repository files navigation

DOI

Logo

ts-messaging (Status: Research Prototype)

An opinionated framework for type consistency in message stacks with a focus on developer experience.

The documentation can be found here.

Minimal Example

Create a connection with the Confluent Schema Registry and Avro as a schema provider.

import { Confluent } from '@ts-messaging/registry-confluent';

const confluentSchemaRegistry = new Confluent({
    clientConfig: {
        baseUrl: 'http://localhost:8081',
    },
    schemaProviders: [new Avro()],
    autoRegisterSchemas: true,
})

Create a Avro Schema with decorators.

import { Avro } from '@ts-messaging/schema-avro';

@Avro.Record({
    name: 'sampleRecord',
    namespace: 'com.mycorp.mynamespace',
    doc: 'Sample schema to help you get started.',
})
export class TestValue {
    @Avro.Int({
        doc: 'The int type is a 32-bit signed integer.',
    })
    my_field1: number;

    //If the schema is used in a endpoint all constructor arguments must be optional.
    constructor(my_field1?: number) {
        this.my_field1 = my_field1;
    }
}

Setup a Kafka Client and import the TestController

const client = new Kafka({
    broker: { brokers: ['localhost:9092'] },
    consumer: { groupId: 'minimal-kafka-example' },
    autoRegisterTopics: true,
    registry: confluentSchemaRegistry,
    controllers: [TestController],
});
@Kafka.Controller()
class TestController {
    
    @Kafka.Endpoint('test')
    async onMessage(
        @Kafka.Key() key: {id: string },
        @Kafka.Value() value: TestValue,
        @Kafka.Metadata() meta: KafkaMetadata
    ) {
        console.log('[MyEndpoint] Message offset=' + meta.offset);
    }
}

Send a message to the topic.

await client.produce({
    topic: 'test',
    data: {
        key: { id: '::1' },
        value: new TestValue('Hello World!'),
    },
});

Setup

  • Install pnpm
  • Run pnpm install
  • Run pnpm build::all
  • Run pnpm install again to resolve local dependencies

Appendix: Install Confluent Platform

#Create the namespace to use.
kubectl create namespace confluent

#Set this namespace to default for your Kubernetes context.
kubectl config set-context --current --namespace confluent

#Add the Confluent for Kubernetes Helm repository.
helm repo add confluentinc https://packages.confluent.io/helm
helm repo update

#Install Confluent for Kubernetes.
helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes

#Install all Confluent Platform components.
kubectl apply -f https://raw.githubusercontent.com/confluentinc/confluent-kubernetes-examples/master/quickstart-deploy/confluent-platform-singlenode.yaml

#Install a sample producer app and topic.
kubectl apply -f https://raw.githubusercontent.com/confluentinc/confluent-kubernetes-examples/master/quickstart-deploy/producer-app-data-singlenode.yaml

#Check that everything is deployed:
kubectl get pods

Expose with port forwarding:

#Set up port forwarding to Control Center web UI from local machine to localhost:9021
kubectl port-forward controlcenter-0 9021:9021

To deliver Kafka and the schema registry with a load balancer on localhost, apply the following files:

  • Kafka on localhost:8081 with /.kubernetes/service-kafka-localhost.yml
  • Schema Registry on localhost:9092 with /.kubernetes/service-schemaregistry-localhost.yml

About

An opinionated framework for type consistency in message stacks with a focus on developer experience.

Topics

Resources

License

Stars

Watchers

Forks