Skip to content
This repository has been archived by the owner on Jul 20, 2023. It is now read-only.

Commit

Permalink
serialize split completed events (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
apreethi13 authored May 6, 2021
1 parent 3e449fb commit 4ff95e6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public EventStreamEventListener(KafkaProducer<String, Object> kafkaProducer)
public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
QueryCreatedEventV1 created = QueryCreatedEventV1.newBuilder()
.setQueryType(queryCreatedEvent.toString().split("\\[")[0])
.setQuery(queryCreatedEvent.getMetadata().getQuery())
.setQueryID(queryCreatedEvent.getMetadata().getQueryId())
.setPrinciple(queryCreatedEvent.getContext().getPrincipal().toString())
Expand All @@ -65,6 +66,7 @@ public void queryCreated(QueryCreatedEvent queryCreatedEvent)
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
QueryCompletedEventV1 completed = QueryCompletedEventV1.newBuilder()
.setQueryType(queryCompletedEvent.toString().split("\\[")[0])
.setQuery(queryCompletedEvent.getMetadata().getQuery())
.setQueryID(queryCompletedEvent.getMetadata().getQueryId())
.setQueryStartTime(queryCompletedEvent.getCreateTime().toString())
Expand All @@ -86,10 +88,21 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
kafkaProducer.send(
new ProducerRecord<>(TOPIC_PRESTO_EVENT,
splitCompletedEvent.getQueryId(),
splitCompletedEvent.toString()));
SplitCompletedEventV1 split = SplitCompletedEventV1.newBuilder()
.setQueryType(splitCompletedEvent.toString().split("\\[")[0])
.setQueryID(splitCompletedEvent.getQueryId().toString())
.setQueryStartTime(splitCompletedEvent.getCreateTime().toString())
.setQueryEndTime(splitCompletedEvent.getEndTime().toString())
.build();
try {
kafkaProducer.send(
new ProducerRecord<>(TOPIC_PRESTO_EVENT,
splitCompletedEvent.toString(),
split.toString()));
}
catch (Exception e) {
log.error(e);
}
log.debug("Sent splitCompleted event. query id %s", splitCompletedEvent.getQueryId());
}
}
1 change: 1 addition & 0 deletions src/main/resources/avro/querycompleted-v1.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"namespace": "io.trino.plugin.eventstream",
"name": "QueryCompletedEventV1",
"fields": [
{ "name": "QueryType", "type": "string", "doc": "Query Run Type" },
{ "name": "Query", "type": "string", "doc": "Query Statement" },
{ "name": "QueryID", "type": "string", "doc": "Query ID" },
{ "name": "QueryStartTime", "type": "string", "doc": "Query Created Time"},
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/avro/querycreated-v1.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"namespace": "io.trino.plugin.eventstream",
"name": "QueryCreatedEventV1",
"fields": [
{ "name": "QueryType", "type": "string", "doc": "Query Run Type" },
{ "name": "Query", "type": "string", "doc": "Query Statement" },
{ "name": "QueryID", "type": "string", "doc": "Query ID" },
{ "name": "Principle", "type": ["null" ,"string"], "doc": "Principle",
Expand Down
11 changes: 11 additions & 0 deletions src/main/resources/avro/splitcompleted-v1.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"type": "record",
"namespace": "io.trino.plugin.eventstream",
"name": "SplitCompletedEventV1",
"fields": [
{ "name": "QueryType", "type": "string", "doc": "Query Run Type" },
{ "name": "QueryID", "type": "string", "doc": "Query ID" },
{ "name": "QueryStartTime", "type": "string", "doc": "Query Created Time"},
{ "name": "QueryEndTime", "type": "string", "doc": "Query End Time"}
]
}

0 comments on commit 4ff95e6

Please sign in to comment.