Skip to content

Commit

Permalink
add vector log integratioin
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Apr 29, 2024
1 parent 0e369be commit d40df96
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 2 deletions.
2 changes: 2 additions & 0 deletions agent/crates/public/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum SendMessageType {
Profile = 13,
ProcEvents = 14,
AlarmEvent = 15,
ApplicationLog = 17,
}

impl fmt::Display for SendMessageType {
Expand All @@ -77,6 +78,7 @@ impl fmt::Display for SendMessageType {
Self::Profile => write!(f, "profile"),
Self::ProcEvents => write!(f, "proc_events"),
Self::AlarmEvent => write!(f, "alarm_event"),
Self::ApplicationLog => write!(f, "application_log"),
}
}
}
2 changes: 2 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ pub struct YamlConfig {
pub external_profile_integration_disabled: bool,
pub external_trace_integration_disabled: bool,
pub external_metric_integration_disabled: bool,
pub external_log_integration_disabled: bool,
#[serde(with = "humantime_serde")]
pub ntp_max_interval: Duration,
#[serde(with = "humantime_serde")]
Expand Down Expand Up @@ -982,6 +983,7 @@ impl Default for YamlConfig {
external_profile_integration_disabled: false,
external_trace_integration_disabled: false,
external_metric_integration_disabled: false,
external_log_integration_disabled: false,
ntp_max_interval: Duration::from_secs(300),
ntp_min_interval: Duration::from_secs(10),
l7_protocol_advanced_features: L7ProtocolAdvancedFeatures::default(),
Expand Down
49 changes: 49 additions & 0 deletions agent/src/integration_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,22 @@ async fn aggregate_with_catch_exception(
})
}

// for log capture from vector
#[derive(Debug, PartialEq)]
pub struct ApplicationLog(Vec<u8>);

impl Sendable for ApplicationLog {
fn encode(mut self, buf: &mut Vec<u8>) -> Result<usize, prost::EncodeError> {
let length = self.0.len();
buf.append(&mut self.0);
Ok(length)
}

fn message_type(&self) -> SendMessageType {
SendMessageType::ApplicationLog
}
}

fn decode_otel_trace_data(
peer_addr: SocketAddr,
data: Vec<u8>,
Expand Down Expand Up @@ -595,6 +611,7 @@ async fn handler(
prometheus_sender: DebugSender<BoxedPrometheusExtra>,
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
exception_handler: ExceptionHandler,
compressed: bool,
counter: Arc<CompressedMetric>,
Expand All @@ -607,6 +624,7 @@ async fn handler(
external_profile_integration_disabled: bool,
external_trace_integration_disabled: bool,
external_metric_integration_disabled: bool,
external_log_integration_disabled: bool,
) -> Result<Response<Body>, GenericError> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") => {
Expand Down Expand Up @@ -785,6 +803,25 @@ async fn handler(

Ok(Response::builder().body(Body::empty()).unwrap())
}
// log integration
(&Method::POST, "/api/v1/log") => {
if external_log_integration_disabled {
return Ok(Response::builder().body(Body::empty()).unwrap());
}
let (part, body) = req.into_parts();
let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await {
Ok(b) => b,
Err(e) => {
return Ok(e);
}
};
let log_data = decode_metric(whole_body, &part.headers)?;
if let Err(e) = application_log_sender.send(ApplicationLog(log_data)) {
warn!("log sender queue has terminated");
}

Ok(Response::builder().body(Body::empty()).unwrap())
}
// Return the 404 Not Found for other routes.
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -884,6 +921,7 @@ pub struct MetricServer {
prometheus_sender: DebugSender<BoxedPrometheusExtra>,
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
port: Arc<AtomicU16>,
exception_handler: ExceptionHandler,
server_shutdown_tx: Mutex<Option<mpsc::Sender<()>>>,
Expand All @@ -897,6 +935,7 @@ pub struct MetricServer {
external_profile_integration_disabled: bool,
external_trace_integration_disabled: bool,
external_metric_integration_disabled: bool,
external_log_integration_disabled: bool,
}

impl MetricServer {
Expand All @@ -908,6 +947,7 @@ impl MetricServer {
prometheus_sender: DebugSender<BoxedPrometheusExtra>,
telegraf_sender: DebugSender<TelegrafMetric>,
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
port: u16,
exception_handler: ExceptionHandler,
compressed: bool,
Expand All @@ -919,6 +959,7 @@ impl MetricServer {
external_profile_integration_disabled: bool,
external_trace_integration_disabled: bool,
external_metric_integration_disabled: bool,
external_log_integration_disabled: bool,
) -> (Self, IntegrationCounter) {
let counter = IntegrationCounter::default();
(
Expand All @@ -932,6 +973,7 @@ impl MetricServer {
prometheus_sender,
telegraf_sender,
profile_sender,
application_log_sender,
port: Arc::new(AtomicU16::new(port)),
exception_handler,
server_shutdown_tx: Default::default(),
Expand All @@ -945,6 +987,7 @@ impl MetricServer {
external_profile_integration_disabled,
external_trace_integration_disabled,
external_metric_integration_disabled,
external_log_integration_disabled,
},
counter,
)
Expand Down Expand Up @@ -975,6 +1018,7 @@ impl MetricServer {
let prometheus_sender = self.prometheus_sender.clone();
let telegraf_sender = self.telegraf_sender.clone();
let profile_sender = self.profile_sender.clone();
let application_log_sender = self.application_log_sender.clone();
let port = self.port.clone();
let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire)));
let (mon_tx, mon_rx) = oneshot::channel();
Expand All @@ -990,6 +1034,7 @@ impl MetricServer {
let external_profile_integration_disabled = self.external_profile_integration_disabled;
let external_trace_integration_disabled = self.external_trace_integration_disabled;
let external_metric_integration_disabled = self.external_metric_integration_disabled;
let external_log_integration_disabled = self.external_log_integration_disabled;
let (tx, mut rx) = mpsc::channel(8);
self.runtime
.spawn(Self::alive_check(monitor_port.clone(), tx.clone(), mon_rx));
Expand Down Expand Up @@ -1044,6 +1089,7 @@ impl MetricServer {
let prometheus_sender = prometheus_sender.clone();
let telegraf_sender = telegraf_sender.clone();
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let exception_handler_inner = exception_handler.clone();
let counter = counter.clone();
let compressed = compressed.clone();
Expand All @@ -1059,6 +1105,7 @@ impl MetricServer {
let prometheus_sender = prometheus_sender.clone();
let telegraf_sender = telegraf_sender.clone();
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let exception_handler = exception_handler_inner.clone();
let peer_addr = conn.remote_addr();
let counter = counter.clone();
Expand All @@ -1080,6 +1127,7 @@ impl MetricServer {
prometheus_sender.clone(),
telegraf_sender.clone(),
profile_sender.clone(),
application_log_sender.clone(),
exception_handler.clone(),
compressed.load(Ordering::Relaxed),
counter.clone(),
Expand All @@ -1092,6 +1140,7 @@ impl MetricServer {
external_profile_integration_disabled,
external_trace_integration_disabled,
external_metric_integration_disabled,
external_log_integration_disabled,
)
}))
}
Expand Down
35 changes: 33 additions & 2 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ use crate::{
},
handler::{NpbBuilder, PacketHandlerBuilder},
integration_collector::{
BoxedPrometheusExtra, MetricServer, OpenTelemetry, OpenTelemetryCompressed, Profile,
TelegrafMetric,
ApplicationLog, BoxedPrometheusExtra, MetricServer, OpenTelemetry, OpenTelemetryCompressed,
Profile, TelegrafMetric,
},
metric::document::BoxedDocument,
monitor::Monitor,
Expand Down Expand Up @@ -1424,6 +1424,7 @@ pub struct AgentComponents {
pub packet_sequence_uniform_output: DebugSender<BoxedPacketSequenceBlock>, // Enterprise Edition Feature: packet-sequence
pub packet_sequence_uniform_sender: UniformSenderThread<BoxedPacketSequenceBlock>, // Enterprise Edition Feature: packet-sequence
pub proc_event_uniform_sender: UniformSenderThread<BoxedProcEvents>,
pub application_log_uniform_sender: UniformSenderThread<ApplicationLog>,
pub exception_handler: ExceptionHandler,
pub proto_log_sender: DebugSender<BoxAppProtoLogsData>,
pub pcap_batch_sender: DebugSender<BoxedPcapBatch>,
Expand Down Expand Up @@ -2194,6 +2195,27 @@ impl AgentComponents {
exception_handler.clone(),
true,
);
let application_log_queue_name = "1-application-log-to-sender";
let (application_log_sender, application_log_receiver, counter) = queue::bounded_with_debug(
yaml_config.external_metrics_sender_queue_size,
application_log_queue_name,
&queue_debugger,
);
stats_collector.register_countable(
&QueueStats {
module: application_log_queue_name,
..Default::default()
},
Countable::Owned(Box::new(counter)),
);
let application_log_uniform_sender = UniformSenderThread::new(
application_log_queue_name,
Arc::new(application_log_receiver),
config_handler.sender(),
stats_collector.clone(),
exception_handler.clone(),
true,
);

let ebpf_dispatcher_id = dispatcher_components.len();
#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -2432,6 +2454,7 @@ impl AgentComponents {
prometheus_sender,
telegraf_sender,
profile_sender,
application_log_sender,
candidate_config.metric_server.port,
exception_handler.clone(),
candidate_config.metric_server.compressed,
Expand All @@ -2449,6 +2472,9 @@ impl AgentComponents {
candidate_config
.yaml_config
.external_metric_integration_disabled,
candidate_config
.yaml_config
.external_log_integration_disabled,
);

stats_collector.register_countable(
Expand Down Expand Up @@ -2502,6 +2528,7 @@ impl AgentComponents {
telegraf_uniform_sender,
profile_uniform_sender,
proc_event_uniform_sender,
application_log_uniform_sender,
tap_mode: candidate_config.tap_mode,
packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence
packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence
Expand Down Expand Up @@ -2592,6 +2619,7 @@ impl AgentComponents {
self.telegraf_uniform_sender.start();
self.profile_uniform_sender.start();
self.proc_event_uniform_sender.start();
self.application_log_uniform_sender.start();
if self.config.metric_server.enabled {
self.metrics_server_component.start();
}
Expand Down Expand Up @@ -2662,6 +2690,9 @@ impl AgentComponents {
if let Some(h) = self.pcap_batch_uniform_sender.notify_stop() {
join_handles.push(h);
}
if let Some(h) = self.application_log_uniform_sender.notify_stop() {
join_handles.push(h);
}
// Enterprise Edition Feature: packet-sequence
if let Some(h) = self.packet_sequence_uniform_sender.notify_stop() {
join_handles.push(h);
Expand Down

0 comments on commit d40df96

Please sign in to comment.