Skip to content

Commit

Permalink
metric reader consumes exporter
Browse files Browse the repository at this point in the history
Signed-off-by: inge4pres <[email protected]>
  • Loading branch information
inge4pres committed Sep 27, 2024
1 parent d5f7bc4 commit 8f1deaf
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/metrics/meter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub const MeterProvider = struct {
/// Delete the meter provider and free up the memory allocated for it.
/// as well as its child objects: Meters and MetricReaders.
pub fn shutdown(self: *Self) void {
// TODO call shutdown on all readers.
var meters = self.meters.valueIterator();
while (meters.next()) |m| {
m.deinit();
Expand Down
114 changes: 105 additions & 9 deletions src/metrics/reader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const std = @import("std");
const protobuf = @import("protobuf");
const ManagedString = protobuf.ManagedString;
const pbcommon = @import("../opentelemetry/proto/common/v1.pb.zig");
const pbresource = @import("../opentelemetry/proto/resource/v1.pb.zig");
const pbmetrics = @import("../opentelemetry/proto/metrics/v1.pb.zig");
const pbutils = @import("../pbutils.zig");
const instr = @import("instrument.zig");
Expand Down Expand Up @@ -29,34 +30,62 @@ pub const MetricReader = struct {
aggregation: *const fn (Kind) view.Aggregation = view.DefaultAggregationFor,
// Signal that shutdown has been called.
hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
// TODO add exporter
// exporter: MetricExporter,
// Exporter is the destination of the metrics data.
// FIXME
// the default metric exporter should be the
exporter: MetricExporter = MetricExporter.new(noopExporter),

const Self = @This();

pub fn collect(self: Self) !void {
if (self.hasShutDown.load(.acquire)) {
// Shutdown has already been called so this is a no-op.
// When shutdown has already been called, collect is a no-op.
return;
}
var metricsData = pbmetrics.MetricsData{ .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(self.allocator) };
defer metricsData.deinit();

if (self.meterProvider) |mp| {
// Collect the data from the meter provider.
// Collect the data from each meter provider.
var mpIter = mp.meters.valueIterator();
while (mpIter.next()) |meter| {
// Create a resourceMetric for each Meter.
var rm = pbmetrics.ResourceMetrics{
.resource = pbresource.Resource{ .attributes = if (meter.attributes) |a| a.values else std.ArrayList(pbcommon.KeyValue).init(self.allocator) },
.scope_metrics = std.ArrayList(pbmetrics.ScopeMetrics).init(self.allocator),
};
// We only use a single ScopeMetric for each ResourceMetric.
var sm = pbmetrics.ScopeMetrics{
.metrics = std.ArrayList(pbmetrics.Metric).init(self.allocator),
};
var instrIter = meter.instruments.valueIterator();
while (instrIter.next()) |i| {
const metric = try toMetric(self.allocator, i.*);
// FIXME: only call deinit() after populating the exporter output.
defer metric.deinit();
if (toMetric(self.allocator, i.*)) |metric| {
try sm.metrics.append(metric);
} else |err| {
std.debug.print("MetricReader collect: failed conversion to proto Metric: {?}\n", .{err});
}

// const metric = toMetric(self.allocator, i.*) catch |e| std.debug.print("MetricReader collect: failed conversion to proto Metric: {?}", .{e});
// try sm.metrics.append(metric);
}
try rm.scope_metrics.append(sm);
try metricsData.resource_metrics.append(rm);
}
} else {
// No meter provider to collect from.
return MetricReadError.CollectFailedOnMissingMeterProvider;
}
switch (self.exporter.exportBatch(metricsData)) {
ExportResult.Success => return,
ExportResult.Failure => return MetricReadError.ExportFailed,
}
}

pub fn shutdown(self: *Self) void {
self.collect() catch |e| {
std.debug.print("MetricReader shutdown: error while collecting metrics: {?}\n", .{e});
};
self.hasShutDown.store(true, .release);
}

Expand Down Expand Up @@ -134,7 +163,10 @@ test "metric reader shutdown prevents collect() to execute" {
test "metric reader collects data from meter provider" {
var mp = try MeterProvider.init(std.testing.allocator);
defer mp.shutdown();
var reader = MetricReader{ .allocator = std.testing.allocator };
var reader = MetricReader{
.allocator = std.testing.allocator,
.exporter = MetricExporter.new(noopExporter),
};
defer reader.shutdown();

try mp.addReader(&reader);
Expand All @@ -155,6 +187,70 @@ test "metric reader collects data from meter provider" {
try reader.collect();
}

pub const ExportResult = enum {
Success,
Failure,
};

pub const ExportFn = fn (pbmetrics.MetricsData) MetricReadError!void;

pub const MetricExporter = struct {
exporter: *const fn (pbmetrics.ExportMetricsServiceRequest) MetricReadError!void,
const Self = @This();
exporter: *const ExportFn,

pub fn new(exporter: *const ExportFn) Self {
return Self{
.exporter = exporter,
};
}

pub fn exportBatch(self: Self, metrics: pbmetrics.MetricsData) ExportResult {
self.exporter(metrics) catch |e| {
std.debug.print("MetricExporter exportBatch failed: {?}\n", .{e});
return ExportResult.Failure;
};
return ExportResult.Success;
}
};

// test harness to build a noop exporter.
fn noopExporter(_: pbmetrics.MetricsData) MetricReadError!void {
return;
}
// mocked metric exporter to assert metrics data are read once exported.
fn mockExporter(metrics: pbmetrics.MetricsData) MetricReadError!void {
if (metrics.resource_metrics.items.len != 1) {
return MetricReadError.ExportFailed;
} // only one resource metrics is expected in this mock
}

test "build no-op metric exporter" {
const exporter: *const ExportFn = noopExporter;
var me = MetricExporter.new(exporter);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
};
defer metrics.deinit();
const result = me.exportBatch(metrics);
try std.testing.expectEqual(ExportResult.Success, result);
}

test "exported metrics by calling metric reader" {
var mp = try MeterProvider.init(std.testing.allocator);
defer mp.shutdown();
const me = MetricExporter.new(mockExporter);

var reader = MetricReader{ .allocator = std.testing.allocator, .exporter = me };
defer reader.shutdown();

try mp.addReader(&reader);

const m = try mp.getMeter(.{ .name = "my-meter" });

// only 1 metric should be in metrics data when we use the mock exporter
var counter = try m.createCounter(u32, .{ .name = "my-counter" });
try counter.add(1, null);

try reader.collect();
}

0 comments on commit 8f1deaf

Please sign in to comment.