Skip to content

Commit

Permalink
Added doc for kafka pubsub Avro schema registry support
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Assuied <[email protected]>
  • Loading branch information
passuied committed Jan 10, 2024
1 parent 4822030 commit 8c51263
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ spec:
value: "2.0.0"
- name: direction
value: "input, output"
- name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
value: http://localhost:8081
- name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
value: XYAXXAZ
- name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
value: "ABCDEFGMEADFF"
- name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
value: true
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
value: 5m
```
## Spec metadata fields
Expand All @@ -75,6 +85,11 @@ spec:
| `version` | N | Input/Output | Kafka cluster version. Defaults to 2.0.0. Please note that this needs to be mandatorily set to `1.0.0` for EventHubs with Kafka. | `"1.0.0"` |
| `direction` | N | Input/Output | The direction of the binding. | `"input"`, `"output"`, `"input, output"` |
| `oidcExtensions` | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` |
| `schemaRegistryURL` | N | Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. | `http://localhost:8081` |
| `schemaRegistryAPIKey` | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. | `XYAXXAZ` |
| `schemaRegistryAPISecret` | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` |
| `schemaCachingEnabled` | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` |
| `schemaLatestVersionCacheTTL` | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` |

#### Note
The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ spec:
value: 2.0.0
- name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
value: "true"
- name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
value: http://localhost:8081
- name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
value: XYAXXAZ
- name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
value: "ABCDEFGMEADFF"
- name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
value: true
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
value: 5m

```

> For details on using `secretKeyRef`, see the guide on [how to reference secrets in components]({{< ref component-secrets.md >}}).
Expand Down Expand Up @@ -81,6 +92,11 @@ spec:
| oidcClientSecret | N | The OAuth2 client secret that has been provisioned in the identity provider: Required when `authType` is set to `oidc` | `"KeFg23!"` |
| oidcScopes | N | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when `authType` is set to `oidc`. Defaults to `"openid"` | `"openid,kafka-prod"` |
| oidcExtensions | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` |
| schemaRegistryURL | N | Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. | `http://localhost:8081` |
| schemaRegistryAPIKey | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. | `XYAXXAZ` |
| schemaRegistryAPISecret | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` |
| schemaCachingEnabled | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` |
| schemaLatestVersionCacheTTL | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` |

The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component.

Expand Down Expand Up @@ -348,6 +364,100 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
}'
```

## Avro Schema Registry serialization/deserialization
You can configure publishers/subscribers to publish/consume data encoded using [Avro binary serialization](https://avro.apache.org/docs/), leveraging an [Apache Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/) (e.g. [Confluent Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/), [Apicurio](https://www.apicur.io/registry/))

### Configuration
- The kafka pubsub component metadata must at least have the schema registry URL defined, and API key/secret if applicable.
- Only message value serialization/deserialization is supported at the moment.
- Cloud events are not supported. Therefore, the `rawPayload=true` metadata must be passed.
- Schema subjects are automatically derived from topic names, using standard naming convention:
- e.g. given topic `my-topic`, the schema subject will be `my-topic-value`
- When interacting with the message payload within the service, it will be in JSON format. The payload will be transparently serialized/deserialized within the Dapr component.
- Date/Datetime fields must be passed as their [Epoch Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) equivalent (rather than typical Iso8601)
- E.g.
- `2024-01-10T04:36:05.986Z` should be passed as `1704861365986`. Number of milliseconds since Jan 1st, 1970
- `2024-01-10` should be passed as `19732`. Number of days since Jan 1st, 1970

### Publishing Avro messages
- In order to indicate to the kafka pubsub component that the message should be using avro serialization, the `valueSchemaType` metadata must be set to `Avro`.

{{< tabs curl "Python SDK">}}

{{% codetab %}}
```bash
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload=true&metadata.valueSchemaType=Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}'
```
{{% /codetab %}}

{{% codetab %}}
```python
from dapr.clients import DaprClient
with DaprClient() as d:
req_data = {
'order_number': '345',
'created_date': 1704861365986
}
# Create a typed message with content type and body
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='my-topic',
data=json.dumps(req_data),
publish_metadata={'rawPayload': 'true', 'valueSchemaType': 'Avro'}
)
# Print the request
print(req_data, flush=True)
```
{{% /codetab %}}

{{< /tabs >}}


### Subscribing to Avro topics
- In order to indicate to the kafka pubsub component that the message should be deserialized using avro, the `valueSchemaType` metadata must be set to `Avro` in the subscription metadata.

{{< tabs "Python (FastAPI)" >}}

{{% codetab %}}

```python
from dapr.ext.fastapi import DaprApp
from fastapi import APIRouter, Body, Response, status
import json
import sys
app = FastAPI()
dapr_app = DaprApp(app)
router = APIRouter()
@router.get('/dapr/subscribe')
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'my-topic',
'route': 'my_topic_subscriber',
'metadata': {
'rawPayload': 'true',
'valueSchemaType': 'Avro',
} }]
return subscriptions
@router.post('/my_topic_subscriber')
def my_topic_subscriber(event_data=Body()):
print(event_data, flush=True)
return Response(status_code=status.HTTP_200_OK)
```

app.include_router(router)

{{% /codetab %}}

{{< /tabs >}}



## Create a Kafka instance

{{< tabs "Self-Hosted" "Kubernetes">}}
Expand Down

0 comments on commit 8c51263

Please sign in to comment.