This is a dependency which provides some functionality to read Zipkin Span
list as Opentracing
implementation of metrics. Messages with metrics content are read from Kafka
topics, these can be serialized and de-serialized
to make an abstraction from common work across the requirement of this feature in our projects.
Exposing metrics to be treated by tools let us have a knowledge about how our system works, the traffic which is supporting,
when they have some weaknesses or when definitely system breaks. We are plenty of those tools,
such as Zipkin UI
, Prometheus
, etc… they all provide a good bunch of views to monitor our service metrics,
but we might custom something appropriated for our tailored solutions.
The purpose in here is abstract the implementation to gather the correlated spans in a linked list which gives us the next span across the multiple services involved in the request, as well as the client which did the request to next server go gather durations and details of how the connection to a third party was.
At the moment, implementation has been done focused in the Http and RPC metrics with the conventions in next link.
More work and test is needed to rely in this idea.
Working in observability feature in conjunction with Spring Boot
project, which comes with micrometer
as metrics
provider which lets push them into a kafka topic to be consumed by tools to monitor the traffic with the Opentracing
standard, such as Zipkin
, Jaeger
, OpenCensus
, etc.
The convention for Opentracing
can be found in:
As it says, it is just convention, but we need also implementation to get the information and deal with their content, in our Java world there is a well known Open Source project which works quite well with this convention and has evolved along the different versions.
and its Github page as:
From the Zipkin project we will get the Span
definition as well as the serializer and deserializer with the Span
detector
in case the metrics definition is in any of the known message types (JSON_V1, JSON_V2, THRIFT, PROTO3).
Putting more tech on the table and given the nature of the project, Kafka
in the transport channel which brings us the messages
to be treated firstly by Zipkin
deserializer somehow.
The way Kafka
deal the message to be parsed into a format know by our application is through Serdes
(SErializerDESerializer),
some of them provided by Kafka and some of them implemented by third parties, like the one we have implemented in here.
One of them to stand out is from Spring Project
which reads those messages as json
and if this is enough for you, I encourage you to use it
rather creating a new one.
In our particular case, the Span
doesn’t meet with those requirements to convert into a json object, so we have implemented our own.
com.currofy.kafka.extensions.opentracing.serialization.OpentracingSerde
With the message read from topic and given the type of content-type
as application/opentracing
it will deserialize the
messages as NodeTrace
which is a bunch of Zipkin Span
and the common traceId
which all the spans should have.
If we are working with Kafka itself we have to build the pipeline to workout the linked spans, but with kafka streams
we
are able to group information, transform… essentially operate with the functionality given by this library. So, now on we
suppose we are with KAFKA STREAMS
.
A request is solved (or should be solved) in an amount of time (hopefully short). Once we are starting receiving the metrics
from the different tasks involved in the request we want to group them in a list of span with same traceId
and if possible
when the events are emitted we can have them all to reconstruct the followed path.
This work to have the linked spans is an aggregator, which gets the spans and link them to follow that path.
com.currofy.kafka.extensions.group.aggregator.NodeTraceAggregator
The result will be a LinkedSpanList
containing all the LinkedSpan
and from which we can get the root or initial Span
( the one without parentId
). This LinkedSpan
does not contain now all the information in the Zipkin Span
, but something
essential (this needs to be revisited, What is essential
?) in order to deal with smaller objects in the new messages which go
to kafka topics.
These list of LinkedSpan
can be now used as java.util.stream.Stream
from which we can filter within the attributes in the object.
This dependency is also a Spring Started
, which provides MessageConverter
for the Opentracing messages.
So, by importing the dependency and the required one from spring-cloud-stream
which creates the StreamMessageConverter
in the Spring context.
if it is used in a Spring project with Spring-Cloud:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>com.currofy.kafka.extensions</groupId>
<artifactId>kafka-extension</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
Note
|
At the moment this dependency is not upload to any repository, so as to use it, you have to clone in your host and build it to be included into your local repository. |
We are taking for granted you have a kafka topic from which your application is reading the metrics. In our case we are
naming it on app_metrics
:
The reference guide to configure kafka-streams-binder
within Spring Cloud
application can be found at:
but we have some particular needs to bring the messages to our converters, our configuration properties would look like something similar to this:
spring:
cloud:
stream:
bindings:
metrics_in:
destination: app_metrics
content-type: application/opentracing
group: metrics
kafka:
streams:
binder:
brokers: localhost:9092
bindings:
metrics_in:
consumer:
application-id: metrics_in-1
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: com.currofy.kafka.extensions.opentracing.serialization.OpentracingSerde
By reading the configuration yaml file, we can see that the key
in the topic (unused) is a string, and the value
needs
to be de-serialized with our implementation.
Bare in mind the content-type
which is one of the keys to make it redirect the messages to our Serde
.
From Spring Cloud
we can enable the bindings with @EnableBinding
in the configuration file. We might give an example such as:
public interface MetricBinding {
String METRICS_IN = "metrics_in";
@Input(METRICS_IN)
KStream<String, NodeTrace> metrics_in();
}
with configuration class as
@Configuration
@EnableBinding(MetricBinding.class)
public class BindingConfig {
}
This is linking the topics with the ones given in our application.yaml
file.
Once we have this burden configured is time to build what really matters, your logic.
As an example of method to read, group and convert those methods we might think in an implementation as:
@Slf4j
@Service
class MetricConsumerService {
@StreamListener
public void metrics(@Input(MetricBinding.METRICS_IN) final KStream<String, NodeTrace> stream) {
stream.groupBy((key, value) -> value.getTraceId(), Grouped.with(Serdes.String(), new OpentracingSerde()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(20L)))
.aggregate(
LinkedSpanList::new,
new NodeTraceAggregator(),
Materialized.with(Serdes.String(), new LinkedSpanListSerde())
)
.toStream()
.foreach((key, v) -> {
log.info("{}", v.httpTraces()
.filter(s -> s.getKind().equals(Span.Kind.SERVER))
.map(LinkedSpan::getId).collect(Collectors.joining(",")));
}
);
}
}
I have written a first approach of this dependency which suits with Spring Cloud 2.x
, as Spring Cloud 3.x
is being released
I know some of this functionality is going to be deprecated, such as the StreamMessageConverter
.
Also, I am using as example @StreamListener
which is discouraged to be used by Pivotal, in favour of functional programming with
interfaces Supplier
, Function
and Consumer
.