Skip to content

Commit

Permalink
Merge branch 'v6.6' into support_no_ftrace_syscall
Browse files Browse the repository at this point in the history
  • Loading branch information
yinjiping authored Jan 6, 2025
2 parents 75f9ee2 + d34c22c commit 246357b
Show file tree
Hide file tree
Showing 26 changed files with 323 additions and 120 deletions.
Binary file added agent/resources/test/common/gre-nokey.pcap
Binary file not shown.
74 changes: 66 additions & 8 deletions agent/src/common/decapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,11 @@ impl TunnelInfo {
ip_header_size: usize,
) -> usize {
// TCE GRE:Version 0、Version 1两种
if flags & GRE_FLAGS_VER_MASK > 1 || flags & GRE_FLAGS_KEY_MASK == 0 {
if flags & GRE_FLAGS_VER_MASK > 1 {
return 0;
}

let has_key = flags & GRE_FLAGS_KEY_MASK > 0;
let gre_header_size = GRE_HEADER_SIZE_DECAP + TunnelInfo::calc_gre_option_size(flags);
let mut gre_key_offset = GRE_KEY_OFFSET;
if flags & GRE_FLAGS_CSUM_MASK != 0 {
Expand All @@ -404,7 +405,9 @@ impl TunnelInfo {
if self.tier == 0 {
self.decapsulate_addr(l3_packet);
self.tunnel_type = TunnelType::TencentGre;
self.id = bytes::read_u32_be(&l3_packet[ip_header_size + gre_key_offset..]);
if has_key {
self.id = bytes::read_u32_be(&l3_packet[ip_header_size + gre_key_offset..]);
}
}
self.tier += 1;
let overlay_offset = gre_header_size + ip_header_size - ETH_HEADER_SIZE; // 伪造L2层信息
Expand All @@ -419,12 +422,23 @@ impl TunnelInfo {
} else {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x86, 0xdd]
};
macs[4..6].copy_from_slice(
&l3_packet[ip_header_size + gre_key_offset + 2..ip_header_size + gre_key_offset + 4],
);
macs[10..12].copy_from_slice(
&l3_packet[ip_header_size + gre_key_offset..ip_header_size + gre_key_offset + 2],
);
if has_key {
// L2 Header
//
// 0 4 6 10 12 14
// +-----------------+------------+----------------+------------+--------------+
// | | 2-Byte Key | | 2-Byte Key | 0x800/0x86dd |
// +-----------------+------------+----------------+------------+--------------+
// | Dest MAC | Source MAC | EthType |
macs[4..6].copy_from_slice(
&l3_packet
[ip_header_size + gre_key_offset + 2..ip_header_size + gre_key_offset + 4],
);
macs[10..12].copy_from_slice(
&l3_packet[ip_header_size + gre_key_offset..ip_header_size + gre_key_offset + 2],
);
}

l3_packet[overlay_offset..overlay_offset + 14].copy_from_slice(&macs[..]);

overlay_offset
Expand Down Expand Up @@ -924,6 +938,50 @@ mod tests {
assert_eq!(actual, expected);
}

#[test]
fn test_decapsulate_tencent_gre_nokey() {
let bitmap = TunnelTypeBitmap::new(&vec![TunnelType::TencentGre, TunnelType::Vxlan]);
let expected = TunnelInfo {
src: Ipv4Addr::new(10, 184, 17, 156),
dst: Ipv4Addr::new(10, 128, 48, 25),
mac_src: 0x381fc0f2,
mac_dst: 0xdd09cd02,
id: 65877,
tunnel_type: TunnelType::Vxlan,
tier: 2,
is_ipv6: false,
};
let expected_overlay = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 69, 0, 0, 52, 237, 62, 64, 0, 63, 6, 184,
251, 10, 185, 192, 25, 10, 129, 192, 54,
];

let mut packets: Vec<Vec<u8>> =
Capture::load_pcap(Path::new(PCAP_PATH_PREFIX).join("gre-nokey.pcap"), None).into();
let packet = packets[0].as_mut_slice();

let mut actual = TunnelInfo::default();
let expected_offset = 82;
let mut actual_offset = 0;
for l2_len in vec![22, 14] {
let offset = actual.decapsulate(&mut packet[actual_offset..], l2_len, &bitmap);
if actual.tunnel_type == TunnelType::None {
break;
}
if actual_offset + offset > packet.len() {
break;
}
actual_offset += l2_len + offset;
}

let actual_overlay: [u8; 34] = packet[actual_offset..actual_offset + 34]
.try_into()
.unwrap();
assert_eq!(expected_overlay, actual_overlay);
assert_eq!(actual, expected);
assert_eq!(actual_offset, expected_offset);
}

#[test]
fn test_decapsulate_tencent_gre() {
let bitmap = TunnelTypeBitmap::new(&vec![TunnelType::TencentGre]);
Expand Down
3 changes: 3 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ pub struct AfPacketTunning {
pub ring_blocks: usize,
pub packet_fanout_count: usize,
pub packet_fanout_mode: u32,
#[serde(rename = "interface_promisc_enabled")]
pub promisc: bool,
}

impl Default for AfPacketTunning {
Expand All @@ -638,6 +640,7 @@ impl Default for AfPacketTunning {
ring_blocks: 128,
packet_fanout_count: 1,
packet_fanout_mode: 0,
promisc: false,
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2660,6 +2660,14 @@ impl ConfigHandler {
tunning.packet_fanout_mode = new_tunning.packet_fanout_mode;
restart_agent = !first_run;
}
if tunning.promisc != new_tunning.promisc {
info!(
"Update inputs.cbpf.af_packet.tunning.interface_promisc_enabled from {:?} to {:?}.",
tunning.promisc, new_tunning.promisc
);
tunning.promisc = new_tunning.promisc;
restart_agent = !first_run;
}
if tunning.ring_blocks != new_tunning.ring_blocks {
info!(
"Update inputs.cbpf.af_packet.tunning.ring_blocks from {:?} to {:?}.",
Expand Down
14 changes: 14 additions & 0 deletions agent/src/dispatcher/base_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,20 @@ impl BaseDispatcher {
e
);
}

if self.options.lock().unwrap().promisc {
let if_indices = tap_interfaces
.iter()
.map(|i| i.if_index as i32)
.collect::<Vec<i32>>();
if let Err(e) = self.engine.set_promisc(&if_indices) {
warn!(
"set_promisc failed with tap_interfaces count {}: {:?}",
if_indices.len(),
e
);
}
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions agent/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ pub struct Options {
pub dpdk_ebpf_receiver: Option<Receiver<Box<packet::Packet<'static>>>>,
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fanout_enabled: bool,
#[cfg(any(target_os = "linux", target_os = "android"))]
pub promisc: bool,
}

impl Options {
Expand Down
34 changes: 17 additions & 17 deletions agent/src/dispatcher/recv_engine/af_packet/tpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::os::unix::io::{AsRawFd, FromRawFd};
use std::process;

use libc::{
c_int, c_uint, c_void, getsockopt, mmap, munmap, off_t, poll, pollfd, setsockopt, size_t,
sockaddr, sockaddr_ll, socket, socklen_t, write, AF_PACKET, ETH_P_ALL, MAP_LOCKED,
MAP_NORESERVE, MAP_SHARED, POLLERR, POLLIN, PROT_READ, PROT_WRITE, SOL_PACKET, SOL_SOCKET,
SO_ATTACH_FILTER,
c_int, c_uint, c_void, getsockopt, mmap, munmap, off_t, packet_mreq, poll, pollfd, setsockopt,
size_t, sockaddr, sockaddr_ll, socket, socklen_t, write, AF_PACKET, ETH_P_ALL, MAP_LOCKED,
MAP_NORESERVE, MAP_SHARED, PACKET_ADD_MEMBERSHIP, PACKET_MR_PROMISC, POLLERR, POLLIN,
PROT_READ, PROT_WRITE, SOL_PACKET, SOL_SOCKET, SO_ATTACH_FILTER,
};
use log::{info, warn};
use public::error::*;
Expand Down Expand Up @@ -131,19 +131,19 @@ impl Tpacket {
Ok(())
}

// TODO: 这里看起来不需要,golang版本未涉及该配置,后续有需要再添加
#[allow(dead_code)]
fn set_promisc(&self) -> Result<()> {
// 设置混杂模式

//raw_socket.set_flag(IFF_PROMISC as u64)?;

// TODO:
//let mut mreq: packet_mreq = std::mem::zeroed();
//mreq.mr_ifindex = interface.index as i32;
//mreq.mr_type = PACKET_MR_PROMISC as u16;

//raw_socket.setsockopt(SOL_PACKET, PACKET_ADD_MEMBERSHIP, (&mreq as *const packet_mreq) as *const libc::c_void);
pub fn set_promisc(&self, if_indices: &Vec<i32>) -> Result<()> {
for i in if_indices {
let mreq = packet_mreq {
mr_type: PACKET_MR_PROMISC as u16,
mr_ifindex: *i,
mr_alen: 0,
mr_address: [0, 0, 0, 0, 0, 0, 0, 0],
};
let err = self.setsockopt(SOL_PACKET, PACKET_ADD_MEMBERSHIP, mreq);
if err.is_err() {
warn!("Ifindex {} set promisc error: {:?}", i, err);
}
}
Ok(())
}

Expand Down
8 changes: 8 additions & 0 deletions agent/src/dispatcher/recv_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ impl RecvEngine {
}
}

pub fn set_promisc(&mut self, if_index: &Vec<i32>) -> Result<()> {
match self {
#[cfg(any(target_os = "linux", target_os = "android"))]
Self::AfPacket(e) => e.set_promisc(if_index).map_err(|e| e.into()),
_ => return Ok(()),
}
}

pub fn get_counter_handle(&self) -> Arc<dyn stats::RefCountable> {
match self {
#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down
13 changes: 11 additions & 2 deletions agent/src/ebpf_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ use crate::policy::PolicyGetter;
use crate::rpc::get_timestamp;
use crate::utils::{process::ProcessListener, stats};

#[cfg(feature = "extended_observability")]
use public::queue::Error::Terminated;
use public::{
buffer::BatchedBox,
counter::{Countable, Counter, CounterType, CounterValue, OwnedCountable},
Expand Down Expand Up @@ -542,8 +544,15 @@ impl EbpfCollector {
data: slice::from_raw_parts_mut(ptr, sd.cap_len as usize),
raw: Some(ptr),
};
if let Err(e) = DPDK_SENDER.as_mut().unwrap().send(Box::new(packet)) {
warn!("meta packet send ebpf error: {:?}", e);
match DPDK_SENDER.as_mut().unwrap().send(Box::new(packet)) {
Err(Terminated(a, b)) => {
error!("dpdk init error: {:?}, deepflow-agent restart...", (a, b));
crate::utils::notify_exit(1);
}
Err(e) => {
warn!("meta packet send ebpf error: {:?}", e);
}
_ => {}
}
return;
}
Expand Down
12 changes: 8 additions & 4 deletions agent/src/flow_generator/protocol_logs/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,7 @@ impl<'a> Iterator for V1HeaderIterator<'a> {
}
const SEP: &'static str = "\r\n";
let mut end = 0;
let mut is_ascii = true;
loop {
// handle the case len is odd (such as "HTTP/1.0 200 OK\r\n" where encounter in istio),
if end == self.0.len() - 1
Expand All @@ -1639,7 +1640,7 @@ impl<'a> Iterator for V1HeaderIterator<'a> {
end -= 1;
break;
}
c if !c.is_ascii() => return None,
c if !c.is_ascii() => is_ascii = false,
_ => (),
}
// the length of SEP is 2 so step 2 is ok
Expand All @@ -1648,12 +1649,15 @@ impl<'a> Iterator for V1HeaderIterator<'a> {
if end == 0 {
None
} else {
let result = unsafe {
let result = if is_ascii {
// this is safe because all bytes are checked to be ascii
str::from_utf8_unchecked(&self.0[..end])
unsafe { Some(str::from_utf8_unchecked(&self.0[..end])) }
} else {
str::from_utf8(&self.0[..end]).ok()
};
self.0 = &self.0[end + 2..];
Some(result)

result
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3571,6 +3571,8 @@ fn build_dispatchers(
dpdk_ebpf_receiver,
#[cfg(target_os = "linux")]
fanout_enabled,
#[cfg(any(target_os = "linux", target_os = "android"))]
promisc: user_config.inputs.cbpf.af_packet.tunning.promisc,
..Default::default()
})))
.bpf_options(bpf_options)
Expand Down
14 changes: 14 additions & 0 deletions cli/ctl/trisolaris_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type ParamData struct {
RpcIP string
RpcPort string
Type string
PlatformDataVersion uint64
GroupsVersion uint64
ProcessName string
PluginType string
PluginName string
}
Expand Down Expand Up @@ -233,6 +236,9 @@ func RegisterTrisolarisCommand() *cobra.Command {
trisolarisCmd.PersistentFlags().StringVarP(&paramData.Type, "type", "", "trident", "request type trdient/analyzer")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.PluginType, "ptype", "", "wasm", "request plugin type")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.PluginName, "pname", "", "", "request plugin name")
trisolarisCmd.PersistentFlags().Uint64VarP(&paramData.PlatformDataVersion, "pver", "", 0, "platform-data version")
trisolarisCmd.PersistentFlags().Uint64VarP(&paramData.GroupsVersion, "gver", "", 0, "groups version")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.ProcessName, "procname", "", "", "request process name")
cmds := regiterCommand()
for _, handler := range cmds {
trisolarisCmd.AddCommand(handler)
Expand Down Expand Up @@ -269,12 +275,18 @@ func initCmd(cmd *cobra.Command, cmds []CmdExecute) {
switch paramData.Type {
case "trident":
name = paramData.Type
if paramData.ProcessName != "" {
name = paramData.ProcessName
}
groupID = paramData.GroupID
clusterID = paramData.ClusterID
teamID = paramData.TeamID
orgID = paramData.OrgID
case "analyzer":
name = paramData.Type
if paramData.ProcessName != "" {
name = paramData.ProcessName
}
orgID = paramData.OrgID
default:
fmt.Printf("type(%s) muste be in [trident, analyzer]", paramData.Type)
Expand All @@ -283,6 +295,8 @@ func initCmd(cmd *cobra.Command, cmds []CmdExecute) {
fmt.Printf("request trisolaris(%s), params(%+v)\n", conn.Target(), paramData)
c := trident.NewSynchronizerClient(conn)
reqData := &trident.SyncRequest{
VersionPlatformData: proto.Uint64(paramData.PlatformDataVersion),
VersionGroups: proto.Uint64(paramData.GroupsVersion),
CtrlIp: &paramData.CtrlIP,
CtrlMac: &paramData.CtrlMac,
VtapGroupIdRequest: &groupID,
Expand Down
4 changes: 4 additions & 0 deletions docs/CHANGELOG-6-6.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#### New Feature

#### Bug Fix
* fix: agnet - eBPF Fix the kernel kick on CPU0 was not triggered [#8818](https://github.com/deepflowio/deepflow/pull/8818) by [yinjiping](https://github.com/yinjiping)
* fix: agent - eBPF Fix Crashes Caused by Packet Count Statistics [#8815](https://github.com/deepflowio/deepflow/pull/8815) by [yinjiping](https://github.com/yinjiping)
* fix: wan type cidr may not be tagged [#8811](https://github.com/deepflowio/deepflow/pull/8811) by [lzf575](https://github.com/lzf575)
* fix: the throttler cannot write all data to the queue at once [#8792](https://github.com/deepflowio/deepflow/pull/8792) by [lzf575](https://github.com/lzf575)
* fix: tracemap error [#8755](https://github.com/deepflowio/deepflow/pull/8755) by [xiaochaoren1](https://github.com/xiaochaoren1)
* fix: agent - eBPF Fix process event type size [#8753](https://github.com/deepflowio/deepflow/pull/8753) by [yinjiping](https://github.com/yinjiping)
Expand Down Expand Up @@ -43,6 +46,7 @@
* perf: modify materialized view local table without group by [#8576](https://github.com/deepflowio/deepflow/pull/8576) by [lzf575](https://github.com/lzf575)

#### NEW FEATURE
* feat: agent - eBPF DPDK User-Space Packet Statistics [#8807](https://github.com/deepflowio/deepflow/pull/8807) by [yinjiping](https://github.com/yinjiping)
* feat: Enable agent after first guard check [#8751](https://github.com/deepflowio/deepflow/pull/8751) by [rvql](https://github.com/rvql)
* feat: update vtap ignore fields [#8747](https://github.com/deepflowio/deepflow/pull/8747) by [askyrie](https://github.com/askyrie)
* feat: deprecated ipvlan [#8725](https://github.com/deepflowio/deepflow/pull/8725) by [askyrie](https://github.com/askyrie)
Expand Down
Loading

0 comments on commit 246357b

Please sign in to comment.