Quickstart ​
Please make sure that Node.js (version >= 16) is installed on your operating system.
A locally running Kafka Cluster is also required. You can follow the instructions at Confluent for Kubernetes Quickstart.
Install Dependencies ​
To use Kafka, the Avro schema, and the Confluent schema registry in your Node.js application, the following dependencies are required in your package.json
.
{
"dependencies": {
"@ts-messaging/client-kafka": "^1.0.0",
"@ts-messaging/schema-avro": "^1.0.0",
"@ts-messaging/registry-confluent": "^1.0.0"
}
}
Configure Typescript ​
The framework is heavily dependent on the decorators and metadata of the Typescript compiler, so it is essential to enable these functions inside your tsconfig.json
.
{
"compilerOptions": {
"experimentalDecorators": true,
"emitDecoratorMetadata": true,
}
}
WARNING
Some third-party build tools, such as ESBuild do not natively support the emitDecoratorMetadata
option and therefore need to be explicitly extended and configured to do so.
Create a Schema ​
Creating a schema is as straightforward as defining a decorated Typescript class. The framework will automatically extract the schema, build and validator and register it with the schema registry. All necessary decorators are hosted within the @Avro
namespace.
@Avro.Record()
class User {
@Avro.String()
name: string;
@Avro.Int()
age: number;
}
Compiled: Raw Schema
This is the raw schema generated by the framework and attached to the class as a static meta-property using reflections.
{
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
}
Create a Controller ​
A controller is a class that contains one or more endpoints. Each endpoint is a method that is decorated with the @Kafka.Endpoint
decorator. If no ConsumerConfig
is provided, the framework will use the default consumer.
@Kafka.Controller()
class UserController {
//...
}
Define a Endpoint ​
The framework will automatically register the endpoint with the consumer defined through the Controller. The endpoint will only be invoked if the message matches the correct schema and the message data has already been validated. The @Kafka.Value()
decorator does define the schema and param position of the message value. Additional available parameter decorators are @Kafka.Key()
and @Kafka.Meta()
.
@Kafka.Controller()
class UserController {
@Kafka.Endpoint("user.create")
async createUser(@Kafka.Payload() user: User) {
// ...
}
}
Create a Registry ​
The registry is the single source of truth inside the Framework and is responsible for registering and retrieving schemas. The registry supports multiple schema providers, so you can use different schemas for the same channel.
const registry = new Confluent({
clientConfig: {
baseUrl: 'http://localhost:8081',
},
schemaProviders: [new Avro()],
})
Create a Client ​
The Client is the entry point for the framework. It is responsible for creating the Kafka consumer and producer, registering the controllers, and starting the application. The client requires a BrokerConfig
, ConsumerConfig
, Registry
and a list of Controllers
to be initialized.
const client = new Kafka({
broker: {
brokers: ['localhost:9092']
},
consumer: {
groupId: 'user-service'
},
registry: new Confluent({
clientConfig: {
baseUrl: 'http://localhost:8081',
},
schemaProviders: [new Avro()],
}),
controllers: [UserController],
});
Initialize and start the Application ​
The client is initialized asynchronously and can be started with the init()
method. The init()
method will create the Kafka consumer and producer, register the controllers, subscribe to the channels, and start the application. The init()
method will return a promise that will resolve once the application is ready.
await client.init();
await client.connect();
Send a Message ​
The client exposes a produce()
method that can be used to send messages to a channel. The produce()
method will automatically serialize the message value and key if a schema has been provided. The send()
method will return a promise that will resolve once the message has been sent.
await client.broker.defaultProducer.produce({
channel: 'user.create',
payload: new User({
name: 'John Doe',
age: 42,
}),
});