Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logging): Forward logs to an OTLP collector #5

Merged
merged 6 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 344 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 12 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ oci-spec = { version = "0.6.4", default-features = false, features = ["image", "
url = "2.3.1"
regex = "1.10.2"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["time", "env-filter"] }
reqwest = { version = "0.12.4", features = ["json", "stream"] }
tracing-subscriber = { version = "0.3.18", features = ["time", "env-filter", "fmt", "json"] }
reqwest = { version = "0.12.5", features = ["json", "stream"] }
askama = "0.12.1"
base64 = "0.22.1"
serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117"
sha2 = "0.10.8"
base16ct = { version = "0.2.0", features = ["alloc"] }
urlencoding = "2.1.3"
anyhow = "1.0.86"

# wasm dependencies
time = { version = "0.3.36", features = ["wasm-bindgen"] }
Expand All @@ -39,10 +43,12 @@ wasm-bindgen = "0.2.92"
worker = {version = "0.3.0"}
tracing-web = "0.1.3"
futures = "0.3.30"
sha2 = "0.10.8"
base16ct = { version = "0.2.0", features = ["alloc"] }
urlencoding = "2.1.3"
anyhow = "1.0.86"

# OTLP dependencies
opentelemetry-proto = { version ="0.6.0", features = ["gen-tonic-messages", "logs"] }
tracing-core = "0.1.32"
prost = "0.12.6"
async-channel = "2.3.1"

[dependencies.web-sys]
version = "0.3.63"
Expand Down
8 changes: 8 additions & 0 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,11 @@ services:
- TAGLIST_PAGE_SIZE=100
- REGISTRY_SECURED=false
- CATALOG_ELEMENTS_LIMIT=1000

otlp-collector:
image: otel/opentelemetry-collector:0.103.0
restart: always
ports:
- 4317:4317
- 4318:4318
- 55679:55679
58 changes: 47 additions & 11 deletions src/cf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{bail, Result};
use askama::Template;
use opentelemetry_proto::tonic::logs::v1::LogRecord;
use std::{str::FromStr, sync::OnceLock};
use tracing_subscriber::fmt::time::UtcTime;
use tracing_subscriber::prelude::*;
Expand Down Expand Up @@ -38,8 +39,8 @@ fn start() {
console_error_panic_hook::set_once();
}

fn init(env: &Env) {
static INIT: OnceLock<bool> = OnceLock::new();
fn init(env: &Env) -> &'static Option<async_channel::Receiver<Vec<LogRecord>>> {
static INIT: OnceLock<Option<async_channel::Receiver<Vec<LogRecord>>>> = OnceLock::new();
INIT.get_or_init(|| {
let rust_log = match env.var("RUST_LOG") {
Ok(log) => log.to_string(),
Expand All @@ -48,22 +49,57 @@ fn init(env: &Env) {

// Setup tracing
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false) // Only partially supported across browsers
.with_ansi(false)
.with_timer(UtcTime::rfc_3339())
.with_writer(MakeWebConsoleWriter::new())
.with_filter(EnvFilter::new(rust_log));
.with_filter(EnvFilter::new(&rust_log));

tracing_subscriber::registry().with(fmt_layer).init();
let registry = tracing_subscriber::registry().with(fmt_layer);
// OTLP exporter
let (otlp_layer, receiver) = if env.secret("OTLP_ENDPOINT").is_err() {
(None, None)
} else {
let (otlp_layer, receiver) = crate::otlp::OtlpLogLayer::new();
(
Some(otlp_layer.with_filter(EnvFilter::new(rust_log))),
Some(receiver),
)
};

registry.with(otlp_layer).init();
console_log!("Worker initialized");
true
});
receiver
})
}

/// Called for each request to the worker
#[tracing::instrument(skip(req, env, _ctx), fields(path = %req.path(), method = %req.method()))]
/// Entrypoint for the fetch event
#[event(fetch, respond_with_errors)]
async fn main(req: Request, env: Env, _ctx: Context) -> worker::Result<Response> {
init(&env);
async fn fetch(req: Request, env: Env, ctx: Context) -> worker::Result<Response> {
let receiver = init(&env);
let cf = req.cf().expect("valid cf").clone();
let otlp_endpoint = match env.secret("OTLP_ENDPOINT") {
Ok(endpoint) => endpoint.to_string(),
Err(_) => "".to_string(),
};
let otlp_auth = match env.secret("OTLP_AUTH") {
Ok(auth) => auth.to_string(),
Err(_) => "".to_string(),
};

let result = _fetch(req, env, ctx).await;

if let Some(receiver) = receiver {
crate::otlp::flush(receiver, otlp_endpoint, otlp_auth, &cf).await;
}
result
}

#[tracing::instrument(
name="fetch",
skip(req, env, _ctx),
fields(path = %req.path(), method = %req.method()))
]
async fn _fetch(req: Request, env: Env, _ctx: Context) -> worker::Result<Response> {
router().run(req, env).await
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

// Request handlers for the cloudflare worker
mod cf;
mod otlp;
// Helper for parsing and managing Python/OCI packages
mod package;
// PyOci client
Expand Down
210 changes: 210 additions & 0 deletions src/otlp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use std::fmt::{self, Write};
use std::sync::RwLock;

use anyhow::Result;
use prost::Message;
use time::OffsetDateTime;
use tracing::Subscriber;
use tracing_core::Event;
use tracing_subscriber::{layer::Context, Layer};
use worker::{console_debug, console_error, console_log, Cf};

use tracing::field::{Field, Visit};

use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
use opentelemetry_proto::tonic::resource::v1::Resource;

use crate::USER_AGENT;

/// Convert a batch of log records into a byte array and content type
fn build_logs_export_body(logs: Vec<LogRecord>, cf: &Cf) -> ExportLogsServiceRequest {
let scope_logs = ScopeLogs {
scope: None,
log_records: logs,
schema_url: "".to_string(),
};

let region = cf.region().map(|region| AnyValue {
value: Some(any_value::Value::StringValue(region)),
});
let zone = Some(AnyValue {
value: Some(any_value::Value::StringValue(cf.colo())),
});

let resource_logs = ResourceLogs {
resource: Some(Resource {
attributes: vec![
KeyValue {
key: "service.name".to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("pyoci".to_string())),
}),
},
KeyValue {
key: "cloud.region".to_string(),
value: region,
},
KeyValue {
key: "cloud.availability_zone".to_string(),
value: zone,
},
],
dropped_attributes_count: 0,
}),
scope_logs: vec![scope_logs],
schema_url: "".to_string(),
};
ExportLogsServiceRequest {
resource_logs: vec![resource_logs],
}
}

pub struct OtlpLogLayer {
records: RwLock<Vec<LogRecord>>,
sender: async_channel::Sender<Vec<LogRecord>>,
}

// Public methods
impl OtlpLogLayer {
pub fn new() -> (Self, async_channel::Receiver<Vec<LogRecord>>) {
let (sender, receiver) = async_channel::bounded(10);
(
Self {
records: RwLock::new(vec![]),
sender,
},
receiver,
)
}
}

/// Flush all messages from the queue
/// In normal operation this will be called after every request and should only every consume 1
/// message
pub async fn flush(
receiver: &async_channel::Receiver<Vec<LogRecord>>,
otlp_endpoint: String,
otlp_auth: String,
cf: &Cf,
) {
let client = reqwest::Client::builder()
.user_agent(USER_AGENT)
.build()
.unwrap();
loop {
// Wait for messages from the OtlpLogLayer
// These are send at the end of every request
let log_records = match receiver.recv().await {
Ok(request) => request,
// Channel is empty and closed, so we're done here
Err(_) => break,
};
let body = build_logs_export_body(log_records, cf).encode_to_vec();
let mut url = url::Url::parse(&otlp_endpoint).unwrap();
url.path_segments_mut().unwrap().extend(&["v1", "logs"]);
// send to OTLP Collector
match client
.post(url)
.header("Content-Type", "application/x-protobuf")
.header("Authorization", &otlp_auth)
.body(body)
.send()
.await
{
Ok(response) => {
if !response.status().is_success() {
console_error!("Failed to send logs to OTLP: {:?}", response);
} else {
console_debug!("Logs sent to OTLP: {:?}", response);
};
}
Err(err) => {
console_error!("Failed to send logs to OTLP: {:?}", err);
}
};
// If the channel is empty, we're done
// New messages will be handled by the next request
if receiver.is_empty() {
break;
}
}
}

// Private methods
impl OtlpLogLayer {
fn flush(&self) -> Result<()> {
let records: Vec<LogRecord> = self.records.write().unwrap().drain(..).collect();
console_log!("Sending {} log records to OTLP", records.len());
self.sender.try_send(records)?;
Ok(())
}
}

impl<S: Subscriber> Layer<S> for OtlpLogLayer {
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let level = event.metadata().level();
let mut visitor = LogVisitor::default();
event.record(&mut visitor);

let Some(time_ns) = time_unix_ns() else {
return;
};

let log_record = LogRecord {
time_unix_nano: time_ns,
observed_time_unix_nano: time_ns,
severity_number: severity_number(level),
severity_text: level.to_string().to_uppercase(),
body: Some(AnyValue {
value: Some(any_value::Value::StringValue(visitor.string)),
}),
attributes: vec![],
dropped_attributes_count: 0,
trace_id: vec![],
span_id: vec![],
flags: 0,
};

self.records.write().unwrap().push(log_record);
}

fn on_close(&self, _id: tracing_core::span::Id, _ctx: Context<'_, S>) {
if let Err(err) = self.flush() {
console_error!("Failed to flush log records: {:?}", err);
}
}
}

fn time_unix_ns() -> Option<u64> {
match OffsetDateTime::now_utc().unix_timestamp_nanos().try_into() {
Ok(value) => Some(value),
Err(_) => {
console_error!("SystemTime out of range for conversion to u64!");
None
}
}
}

fn severity_number(level: &tracing::Level) -> i32 {
match *level {
tracing::Level::ERROR => 17,
tracing::Level::WARN => 13,
tracing::Level::INFO => 9,
tracing::Level::DEBUG => 5,
tracing::Level::TRACE => 1,
}
}

#[derive(Default)]
pub struct LogVisitor {
string: String,
}

impl Visit for LogVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
write!(self.string, "{}={:?} ", field.name(), value).unwrap();
}
}
10 changes: 3 additions & 7 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,10 @@ impl HttpTransport {
let request = request.build().expect("valid request");
tracing::debug!("Request: {:#?}", request);
let method = request.method().as_str().to_string();
let url = request.url().to_owned();
let url = request.url().to_owned().to_string();
let response = self.client.execute(request).await.expect("valid response");
tracing::info!(
"[{method}] {status} {url}",
method = method,
status = response.status(),
url = url
);
let status: u16 = response.status().into();
tracing::info!(method, status, url);
tracing::debug!("Response Headers: {:#?}", response.headers());
Ok(response)
}
Expand Down
2 changes: 0 additions & 2 deletions wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ route = { pattern = "pyoci.allexveldman.nl", custom_domain = true }
RUST_LOG = "info"

[build]
# command = "cargo install -q worker-build && worker-build --release"
command = "just build --release"

[env.dev]
# build = { command = "cargo install -q worker-build && worker-build --dev" }
build = { command = "just build --dev" }

# The rate limiting API is in open beta.
Expand Down