Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Backoff-based error policy #314

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ snafu = { version = "0.6.8", features = ["futures"] }
either = "1.6.0"
# Some configuration tweaking require reqwest atm
reqwest = { version = "0.10.7", default-features = false, features = ["json", "gzip", "stream"] }
backoff = "0.2.1"

[[example]]
name = "configmapgen_controller"
Expand Down
13 changes: 5 additions & 8 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,6 @@ async fn reconcile(generator: ConfigMapGenerator, ctx: Context<Data>) -> Result<
})
}

/// The controller triggers this on reconcile errors
fn error_policy(_error: &Error, _ctx: Context<Data>) -> ReconcilerAction {
ReconcilerAction {
requeue_after: Some(Duration::from_secs(1)),
}
}

// Data we want access to in error/reconcile calls
struct Data {
client: Client,
Expand All @@ -120,7 +113,11 @@ async fn main() -> Result<()> {

Controller::new(cmgs, ListParams::default())
.owns(cms, ListParams::default())
.run(reconcile, error_policy, Context::new(Data { client }))
.run(
reconcile,
|| backoff::backoff::Constant::new(Duration::from_secs(1)),
clux marked this conversation as resolved.
Show resolved Hide resolved
Context::new(Data { client }),
)
.for_each(|res| async move {
match res {
Ok(o) => info!("reconciled {:?}", o),
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pin-project = "0.4.23"
tokio = { version = "0.2.21", features = ["time"] }
snafu = { version = "0.6.8", features = ["futures"] }
dashmap = "3.11.10"
backoff = "0.2.1"

[features]
default = ["native-tls"]
Expand Down
180 changes: 145 additions & 35 deletions kube-runtime/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use crate::{
reflector::{
reflector,
store::{Store, Writer},
ErasedResource, ObjectRef,
ObjectRef, RuntimeResource,
},
scheduler::{self, scheduler, ScheduleRequest},
utils::{try_flatten_applied, try_flatten_touched, trystream_try_via},
watcher::{self, watcher},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
use futures::{
channel, future,
Expand All @@ -17,17 +18,23 @@ use futures::{
use kube::api::{Api, ListParams, Meta};
use serde::de::DeserializeOwned;
use snafu::{futures::TryStreamExt as SnafuTryStreamExt, Backtrace, OptionExt, ResultExt, Snafu};
use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, fmt::Debug, marker::PhantomData, sync::Arc, time::Duration};
use stream::BoxStream;
use tokio::time::Instant;

#[derive(Snafu, Debug)]
pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error::Error + 'static> {
#[derive(Snafu, Derivative)]
#[derivative(Debug(bound = ""))]
pub enum Error<
K: RuntimeResource,
ReconcilerErr: std::error::Error + 'static,
QueueErr: std::error::Error + 'static,
> {
ObjectNotFound {
obj_ref: ObjectRef<ErasedResource>,
obj_ref: ObjectRef<K>,
backtrace: Backtrace,
},
ReconcilerFailed {
obj_ref: ObjectRef<K>,
source: ReconcilerErr,
backtrace: Backtrace,
},
Expand Down Expand Up @@ -92,6 +99,75 @@ where
})
}

/// A policy for when to retry reconciliation, after an error has occurred.
pub trait ErrorPolicy {
type Err;
type K: RuntimeResource;
type Ctx;

/// Notifies that the state for an object should be removed, for example if it is reconciled successfully.
clux marked this conversation as resolved.
Show resolved Hide resolved
fn reset_object(&mut self, obj_ref: &ObjectRef<Self::K>, ctx: Context<Self::Ctx>);
/// Queries for when to next retry after an error.
fn on_error(
&mut self,
obj_ref: ObjectRef<Self::K>,
error: &Self::Err,
ctx: Context<Self::Ctx>,
) -> ReconcilerAction;
}

/// Retries errors based on a `Backoff` policy.
///
/// A separate backoff tracker is used for each object, and it is
/// reset whenever the object is reconciled successfully.
#[derive(Debug)]
pub struct BackoffErrorPolicy<MkBackoff, B, K: RuntimeResource, Err, Ctx> {
make_backoff: MkBackoff,
clux marked this conversation as resolved.
Show resolved Hide resolved
backoffs: HashMap<ObjectRef<K>, B>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh. One back off per object. Interesting. I thought we would have one global one. This makes sense though. Probably smarter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I /think/ we should encourage backoff-per-object by default. There are cases where global backoffs make sense (gating access to some central resource for example), but you probably want to gate those by specific error cases..

_err: PhantomData<Err>,
_ctx: PhantomData<Ctx>,
}

impl<MkBackoff: FnMut() -> B, B: Backoff, K: RuntimeResource, Err, Ctx>
BackoffErrorPolicy<MkBackoff, B, K, Err, Ctx>
{
fn new(make_backoff: MkBackoff) -> Self {
BackoffErrorPolicy {
make_backoff,
backoffs: HashMap::new(),
_err: PhantomData,
_ctx: PhantomData,
}
}
}

impl<MkBackoff: FnMut() -> B, B: Backoff, K: RuntimeResource, Err, Ctx> ErrorPolicy
for BackoffErrorPolicy<MkBackoff, B, K, Err, Ctx>
{
type Err = Err;
type K = K;
type Ctx = Ctx;

fn reset_object(&mut self, obj_ref: &ObjectRef<Self::K>, _ctx: Context<Self::Ctx>) {
self.backoffs.remove(obj_ref);
clux marked this conversation as resolved.
Show resolved Hide resolved
}

fn on_error(
&mut self,
obj_ref: ObjectRef<Self::K>,
_error: &Self::Err,
_ctx: Context<Self::Ctx>,
) -> ReconcilerAction {
let obj_backoff = self
.backoffs
.entry(obj_ref)
.or_insert_with(&mut self.make_backoff);
ReconcilerAction {
requeue_after: obj_backoff.next_backoff(),
}
}
}

/// A context data type that's passed through to the controllers callbacks
///
/// Context<T> gets passed to both the `reconciler` and the `error_policy` callbacks.
Expand Down Expand Up @@ -133,11 +209,11 @@ impl<T> Context<T> {
/// (such as triggering from arbitrary `Stream`s), at the cost of some more verbosity.
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
mut error_policy: impl ErrorPolicy<K = K, Err = ReconcilerFut::Error, Ctx = T>,
context: Context<T>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<K, ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Meta + 'static,
ReconcilerFut: TryFuture<Ok = ReconcilerAction>,
Expand Down Expand Up @@ -175,22 +251,51 @@ where
})
// then reconcile every object
.and_then(move |(obj_ref, obj)| {
reconciler(obj, context.clone()) // TODO: add a context argument to the reconcile
reconciler(obj, context.clone())
.into_future() // TryFuture -> impl Future
.map(|result| (obj_ref, result)) // turn into pair and ok wrap
.map(Ok) // (this lets us deal with errors from reconciler below)
.map(|result| match result {
Ok(action) => Ok((obj_ref, action)),
clux marked this conversation as resolved.
Show resolved Hide resolved
Err(err) => Err(err).context(ReconcilerFailed { obj_ref }),
})
})
// finally, for each completed reconcile call:
.and_then(move |(obj_ref, reconciler_result)| {
let ReconcilerAction { requeue_after } = match &reconciler_result {
Ok(action) => action.clone(), // do what user told us
Err(err) => error_policy(err, err_context.clone()), // reconciler fn call failed
.then(move |reconciler_result| {
let (obj_ref, action, error) = match reconciler_result {
// tell the error policy about the success (to reset backoff timers, for example)
Ok((obj_ref, action)) => {
error_policy.reset_object(&obj_ref, err_context.clone());
(obj_ref.clone(), action, None)
clux marked this conversation as resolved.
Show resolved Hide resolved
}
// reconciler fn call failed
Err(Error::ReconcilerFailed {
obj_ref,
source,
backtrace,
}) => (
obj_ref.clone(),
error_policy.on_error(obj_ref.clone(), &source, err_context.clone()),
Some(Error::ReconcilerFailed {
obj_ref,
source,
backtrace,
}),
),
// object was deleted, fake a "success" to the error policy, so that it can clean up any bookkeeping and avoid leaking memory
Err(Error::ObjectNotFound { obj_ref, backtrace }) => {
error_policy.reset_object(&obj_ref, err_context.clone());
(
obj_ref.clone(),
ReconcilerAction { requeue_after: None },
Some(Error::ObjectNotFound { obj_ref, backtrace }),
)
}
// Upstream or internal error, propagate
Err(_) => return future::Either::Left(future::ready(reconciler_result)),
};
// we should always requeue at some point in case of network errors ^
let mut scheduler_tx = scheduler_tx.clone();
async move {
future::Either::Right(async move {
// Transmit the requeue request to the scheduler (picked up again at top)
if let Some(delay) = requeue_after {
if let Some(delay) = action.requeue_after {
scheduler_tx
.send(ScheduleRequest {
message: obj_ref.clone(),
Expand All @@ -199,11 +304,11 @@ where
.await
.expect("Message could not be sent to scheduler_rx");
}
// NB: no else clause ^ because we don't allow not requeuing atm.
reconciler_result
.map(|action| (obj_ref, action))
.context(ReconcilerFailed)
}
match error {
None => Ok((obj_ref, action)),
Some(err) => Err(err),
}
})
})
}

Expand Down Expand Up @@ -245,12 +350,6 @@ where
/// requeue_after: Some(Duration::from_secs(300)),
/// })
/// }
/// /// an error handler that will be called when the reconciler fails
/// fn error_policy(_error: &Error, _ctx: Context<()>) -> ReconcilerAction {
/// ReconcilerAction {
/// requeue_after: Some(Duration::from_secs(60)),
/// }
/// }
///
/// /// something to drive the controller
/// #[tokio::main]
Expand All @@ -261,7 +360,7 @@ where
/// let cms = Api::<ConfigMap>::all(client.clone());
/// Controller::new(cmgs, ListParams::default())
/// .owns(cms, ListParams::default())
/// .run(reconcile, error_policy, context)
/// .run(reconcile, || backoff::backoff::Constant::new(Duration::from_secs(60)), context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could have a .backoff builder on Controller here for readability?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside there is that we either need to introduce a new generic or box the Backoff.

Copy link
Member

@clux clux Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, does it get awkward if we try to do something like:

Controller::(..)
    .error(|| backoff::Constant::new(Duration::from_secs(60)))
    .run(reconcile, context)

I guess I wanted to have a default like what you have there without having to be explicit about the lambda.
With maybe https://docs.rs/backoff/0.2.1/backoff/type.ExponentialBackoff.html as the default if omitted.

/// .for_each(|res| async move {
/// match res {
/// Ok(o) => println!("reconciled {:?}", o),
Expand Down Expand Up @@ -346,18 +445,25 @@ where
/// This creates a stream from all builder calls and starts an applier with
/// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
/// with a configurable `Context`.
pub fn run<ReconcilerFut, T>(
pub fn run<ReconcilerFut, T, B>(
self,
reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
make_backoff: impl Fn() -> B,
context: Context<T>,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<K, ReconcilerFut::Error, watcher::Error>>>
where
K: Clone + Meta + 'static,
ReconcilerFut: TryFuture<Ok = ReconcilerAction>,
ReconcilerFut::Error: std::error::Error + 'static,
B: Backoff,
{
applier(reconciler, error_policy, context, self.reader, self.selector)
applier(
reconciler,
BackoffErrorPolicy::new(make_backoff),
context,
self.reader,
self.selector,
)
}
}

Expand All @@ -367,6 +473,7 @@ mod tests {
use crate::Controller;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::Api;
use snafu::Snafu;

fn assert_send<T: Send>(x: T) -> T {
x
Expand All @@ -378,13 +485,16 @@ mod tests {
)
}

#[derive(Snafu, Debug)]
enum NoError {}

// not #[test] because we don't want to actually run it, we just want to assert that it typechecks
#[allow(dead_code, unused_must_use)]
fn test_controller_should_be_send() {
assert_send(
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<ReconcilerAction>()) },
|_: &std::io::Error, _| mock_type::<ReconcilerAction>(),
|_, _| async { Ok::<_, NoError>(mock_type::<ReconcilerAction>()) },
|| backoff::backoff::Zero {},
Context::new(()),
),
);
Expand Down