Skip to content

Commit

Permalink
feat: add ProxyLB struct for load balance
Browse files Browse the repository at this point in the history
  • Loading branch information
zhu327 committed Nov 10, 2024
1 parent f91cb87 commit aefa229
Showing 1 changed file with 90 additions and 72 deletions.
162 changes: 90 additions & 72 deletions src/proxy/lb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,93 +20,43 @@ use crate::config::{

use super::discovery::HybridDiscovery;

pub enum SelectionLB {
RoundRobin(LB<RoundRobin>),
Random(LB<Random>),
Fnv(LB<FVNHash>),
Ketama(LB<KetamaHashing>),
pub struct ProxyLB {
upstream: Upstream,
lb: SelectionLB,
}

impl From<Upstream> for SelectionLB {
impl From<Upstream> for ProxyLB {
fn from(value: Upstream) -> Self {
match value.r#type {
SelectionType::RoundRobin => SelectionLB::RoundRobin(LB::new_from_upstream(value)),
SelectionType::Random => SelectionLB::Random(LB::new_from_upstream(value)),
SelectionType::Fnv => SelectionLB::Fnv(LB::new_from_upstream(value)),
SelectionType::Ketama => SelectionLB::Ketama(LB::new_from_upstream(value)),
ProxyLB {
upstream: value.clone(),
lb: SelectionLB::from(value),
}
}
}

impl SelectionLB {
pub fn take_background_service(&mut self) -> Option<Box<dyn Service + 'static>> {
match self {
SelectionLB::RoundRobin(ref mut lb) => lb.service.take(),
SelectionLB::Random(ref mut lb) => lb.service.take(),
SelectionLB::Fnv(ref mut lb) => lb.service.take(),
SelectionLB::Ketama(ref mut lb) => lb.service.take(),
}
}
impl ProxyLB {
pub fn select_backend<'a>(&'a self, session: &'a mut Session) -> Option<Backend> {
let key = self.request_selector_key(session);

pub fn select_backend(&self, key: String) -> Option<Backend> {
match self {
match &self.lb {
SelectionLB::RoundRobin(lb) => lb.upstreams.select(key.as_bytes(), 256),
SelectionLB::Random(lb) => lb.upstreams.select(key.as_bytes(), 256),
SelectionLB::Fnv(lb) => lb.upstreams.select(key.as_bytes(), 256),
SelectionLB::Ketama(lb) => lb.upstreams.select(key.as_bytes(), 256),
}
}
}

pub struct LB<BS: BackendSelection> {
pub upstreams: Arc<LoadBalancer<BS>>,
pub service: Option<Box<dyn Service + 'static>>,
}

impl<BS> LB<BS>
where
BS: BackendSelection + Send + Sync + 'static,
BS::Iter: BackendIter,
{
pub fn new_from_upstream(upstream: Upstream) -> Self {
let discovery: HybridDiscovery = upstream.clone().into();
let mut upstreams = LoadBalancer::<BS>::from_backends(Backends::new(Box::new(discovery)));

if let Some(check) = upstream.checks.clone() {
let health_check: Box<(dyn HealthCheckTrait + std::marker::Send + Sync + 'static)> =
check.clone().into();
upstreams.set_health_check(health_check);

let mut health_check_frequency = Duration::from_secs(1);
if let Some(healthy) = check.active.healthy {
health_check_frequency = Duration::from_secs(healthy.interval as u64);
}
upstreams.health_check_frequency = Some(health_check_frequency);
}

let background: pingora::services::background::GenBackgroundService<LoadBalancer<BS>> =
background_service("health check", upstreams);
let upstreams = background.task();

LB {
upstreams,
service: Some(Box::new(background)),
}
}
}

impl Upstream {
pub fn request_selector<'a>(&'a self, session: &'a mut Session) -> String {
match self.hash_on {
fn request_selector_key<'a>(&'a self, session: &'a mut Session) -> String {
match self.upstream.hash_on {
UpstreamHashOn::VARS => {
if self.key.as_str().starts_with("arg_") {
if let Some(name) = self.key.as_str().strip_prefix("arg_") {
if self.upstream.key.as_str().starts_with("arg_") {
if let Some(name) = self.upstream.key.as_str().strip_prefix("arg_") {
return get_query_value(session.req_header(), name)
.map_or("".to_string(), |q| q.to_string());
}
}

match self.key.as_str() {
match self.upstream.key.as_str() {
"uri" => session.req_header().uri.path().to_string(),
"request_uri" => session
.req_header()
Expand All @@ -131,20 +81,88 @@ impl Upstream {
_ => "".to_string(),
}
}
UpstreamHashOn::HEAD => get_req_header_value(session.req_header(), self.key.as_str())
.map_or("".to_string(), |s| s.to_string()),
UpstreamHashOn::COOKIE => get_cookie_value(session.req_header(), self.key.as_str())
.map_or("".to_string(), |s| s.to_string()),
UpstreamHashOn::HEAD => {
get_req_header_value(session.req_header(), self.upstream.key.as_str())
.map_or("".to_string(), |s| s.to_string())
}
UpstreamHashOn::COOKIE => {
get_cookie_value(session.req_header(), self.upstream.key.as_str())
.map_or("".to_string(), |s| s.to_string())
}
}
}

pub fn upstream_host_rewrite(&self, upstream_request: &mut RequestHeader) {
if self.pass_host == UpstreamPassHost::REWRITE {
if let Some(host) = self.upstream_host.clone() {
if self.upstream.pass_host == UpstreamPassHost::REWRITE {
if let Some(host) = self.upstream.upstream_host.clone() {
upstream_request.insert_header("Host", host).unwrap();
}
}
}

pub fn take_background_service(&mut self) -> Option<Box<dyn Service + 'static>> {
match self.lb {
SelectionLB::RoundRobin(ref mut lb) => lb.service.take(),
SelectionLB::Random(ref mut lb) => lb.service.take(),
SelectionLB::Fnv(ref mut lb) => lb.service.take(),
SelectionLB::Ketama(ref mut lb) => lb.service.take(),
}
}
}

enum SelectionLB {
RoundRobin(LB<RoundRobin>),
Random(LB<Random>),
Fnv(LB<FVNHash>),
Ketama(LB<KetamaHashing>),
}

impl From<Upstream> for SelectionLB {
fn from(value: Upstream) -> Self {
match value.r#type {
SelectionType::RoundRobin => SelectionLB::RoundRobin(LB::new_from_upstream(value)),
SelectionType::Random => SelectionLB::Random(LB::new_from_upstream(value)),
SelectionType::Fnv => SelectionLB::Fnv(LB::new_from_upstream(value)),
SelectionType::Ketama => SelectionLB::Ketama(LB::new_from_upstream(value)),
}
}
}

struct LB<BS: BackendSelection> {
upstreams: Arc<LoadBalancer<BS>>,
service: Option<Box<dyn Service + 'static>>,
}

impl<BS> LB<BS>
where
BS: BackendSelection + Send + Sync + 'static,
BS::Iter: BackendIter,
{
fn new_from_upstream(upstream: Upstream) -> Self {
let discovery: HybridDiscovery = upstream.clone().into();
let mut upstreams = LoadBalancer::<BS>::from_backends(Backends::new(Box::new(discovery)));

if let Some(check) = upstream.checks.clone() {
let health_check: Box<(dyn HealthCheckTrait + std::marker::Send + Sync + 'static)> =
check.clone().into();
upstreams.set_health_check(health_check);

let mut health_check_frequency = Duration::from_secs(1);
if let Some(healthy) = check.active.healthy {
health_check_frequency = Duration::from_secs(healthy.interval as u64);
}
upstreams.health_check_frequency = Some(health_check_frequency);
}

let background: pingora::services::background::GenBackgroundService<LoadBalancer<BS>> =
background_service("health check", upstreams);
let upstreams = background.task();

LB {
upstreams,
service: Some(Box::new(background)),
}
}
}

impl From<HealthCheck> for Box<(dyn HealthCheckTrait + std::marker::Send + Sync + 'static)> {
Expand Down

0 comments on commit aefa229

Please sign in to comment.