Skip to content

Commit

Permalink
Merge pull request #722 from swimos/fluvio_example
Browse files Browse the repository at this point in the history
Fluvio connector example
  • Loading branch information
SirCipher authored Nov 13, 2024
2 parents 4db7c0b + 9f90c19 commit 3d18c95
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 114 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ members = [
"example_apps/ripple",
"example_apps/stocks_simulated",
"example_apps/kafka_ingress_connector",
"example_apps/kafka_egress_connector"
"example_apps/kafka_egress_connector",
"example_apps/fluvio_ingress_connector"
]

[workspace.package]
Expand Down Expand Up @@ -85,6 +86,7 @@ swimos_introspection = { path = "server/swimos_introspection", version = "0.1.1"
swimos_server_app = { path = "server/swimos_server_app", version = "0.1.1" }
swimos_connector = { path = "server/swimos_connector", version = "0.1.1" }
swimos_connector_kafka = { path = "server/swimos_connector_kafka", version = "0.1.1" }
swimos_connector_fluvio = { path = "server/swimos_connector_fluvio", version = "0.1.1" }
swimos = { path = "swimos", version = "0.1.1" }
swimos_client = { path = "swimos_client", version = "0.1.1" }
swimos_downlink = { path = "swimos_downlink", version = "0.1.1" }
Expand Down
24 changes: 24 additions & 0 deletions example_apps/fluvio_ingress_connector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "fluvio_ingress_connector"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
publish = false

[dependencies]
swimos = { workspace = true, features = ["server", "agent"] }
swimos_client = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] }
example-util = { path = "../example_util" }
swimos_connector = { workspace = true }
swimos_connector_fluvio = { workspace = true, features = ["json"] }
swimos_recon = { workspace = true }
clap = { workspace = true, features = ["derive"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
serde_json = { workspace = true }
rand = { workspace = true }
fluvio = { workspace = true }
bytes = { workspace = true }
82 changes: 82 additions & 0 deletions example_apps/fluvio_ingress_connector/src/agent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2015-2024 Swim Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use swimos::agent::agent_lifecycle::HandlerContext;
use swimos::agent::event_handler::{EventHandler, HandlerActionExt};
use swimos::agent::lanes::{MapLane, ValueLane};
use swimos::agent::{lifecycle, AgentLaneModel};

/// Sensor Agent model.
#[derive(AgentLaneModel)]
pub struct SensorAgent {
/// The latest temperature reading.
temperature: ValueLane<i64>,
/// The latest voltage reading.
/// Key: timestamp that the key was updated.
/// Value: voltage.
voltage: MapLane<i64, f64>,
}

/// Sensor Agent lifecycle.
#[derive(Clone)]
pub struct SensorLifecycle;

#[lifecycle(SensorAgent)]
impl SensorLifecycle {
#[on_start]
pub fn on_start(&self, context: HandlerContext<SensorAgent>) -> impl EventHandler<SensorAgent> {
context.get_agent_uri().and_then(move |uri| {
context.effect(move || {
println!("Starting agent at: {}", uri);
})
})
}

#[on_stop]
pub fn on_stop(&self, context: HandlerContext<SensorAgent>) -> impl EventHandler<SensorAgent> {
context.get_agent_uri().and_then(move |uri| {
context.effect(move || {
println!("Stopping agent at: {}", uri);
})
})
}

#[on_event(temperature)]
pub fn on_temperature(
&self,
context: HandlerContext<SensorAgent>,
value: &i64,
) -> impl EventHandler<SensorAgent> {
let n = *value;
context.effect(move || {
println!("Setting temperature to: {}", n);
})
}

#[on_update(voltage)]
pub fn on_update(
&self,
context: HandlerContext<SensorAgent>,
_map: &HashMap<i64, f64>,
timestamp: i64,
_prev: Option<f64>,
new_value: &f64,
) -> impl EventHandler<SensorAgent> + '_ {
let new_value = *new_value;
context.effect(move || {
println!("Setting voltage entry for {} to '{}'", timestamp, new_value);
})
}
}
26 changes: 26 additions & 0 deletions example_apps/fluvio_ingress_connector/src/fluvio_connector.recon
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@fluvio {
topic: "sensors",
addr: "127.0.0.1:9003",
partition: 0,
offset: @End {
},
value_lanes: {
@ValueLaneSpec {
name: temperature,
selector: "$payload.temperature",
required: true
}
},
map_lanes: {
},
relays: {
@Value {
node: "/sensors/$key",
lane: "temperature",
payload: "$payload.temperature",
required: true
}
},
key_deserializer: @UInt32(@LittleEndian),
payload_deserializer: @Json
}
155 changes: 155 additions & 0 deletions example_apps/fluvio_ingress_connector/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2015-2024 Swim Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! An example demonstrating a Fluvio connector.
//!
//! Run the application using the following:
//! ```text
//! $ cargo run --bin fluvio_connector
//! ```
use clap::Parser;
use example_util::{example_filter, manage_handle};
use fluvio::RecordKey;
use rand::Rng;
use serde_json::json;
use std::collections::HashSet;
use std::{error::Error, str::FromStr, time::Duration};
use swimos::{
route::{RoutePattern, RouteUri},
server::{Server, ServerBuilder},
};
use swimos_connector::IngressConnectorModel;
use tokio::time::sleep;

mod agent;
mod params;

use crate::agent::{SensorAgent, SensorLifecycle};
use params::Params;
use swimos::agent::agent_model::AgentModel;
use swimos_connector_fluvio::{FluvioIngressConfiguration, FluvioIngressConnector};
use tracing::error;
use tracing_subscriber::filter::LevelFilter;

const FLUVIO_TOPIC: &str = "sensors";
const MAX_AGENTS: usize = 50;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let Params {
config,
enable_logging,
} = Params::parse();
if enable_logging {
setup_logging()?;
}

let connector_config = load_config(config).await?;

let route = RoutePattern::parse_str("/fluvio")?;

let connector_agent = IngressConnectorModel::for_fn(move || {
FluvioIngressConnector::for_config(connector_config.clone())
});
let sensor_agent = AgentModel::new(SensorAgent::default, SensorLifecycle.into_lifecycle());

let server = ServerBuilder::with_plane_name("Example Plane")
.set_bind_addr("127.0.0.1:8080".parse()?)
.add_route(route, connector_agent)
.add_route(RoutePattern::parse_str("/sensors/:id")?, sensor_agent)
.update_config(|config| {
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
})
.build()
.await?;

let (task, handle) = server.run();

let uri = RouteUri::from_str("/fluvio")?;

let shutdown = async move {
if let Err(error) = handle.start_agent(uri).await {
error!(error = %error, "Failed to start connector agent.");
}
manage_handle(handle).await
};

let (_, task_result, producer_result) = tokio::join!(shutdown, task, run_fluvio());

producer_result?;
task_result?;
println!("Server stopped successfully.");
Ok(())
}

async fn run_fluvio() -> Result<(), Box<dyn Error + Send + Sync>> {
let producer = fluvio::producer(FLUVIO_TOPIC).await?;
let mut agent_ids = HashSet::new();

loop {
let (agent_id, payload) = {
let len = agent_ids.len();
let mut rng = rand::thread_rng();

let agent_id = if len == MAX_AGENTS {
rng.gen_range(0..len)
} else {
let id = len + 1;
agent_ids.insert(id);
id
};

let payload = json! {
{
"temperature": rng.gen_range(10..100),
"voltage": rng.gen_range::<f64, _>(0.0..12.0)
}
};

(agent_id, serde_json::to_vec(&payload)?)
};

producer
.send(RecordKey::from((agent_id as u32).to_le_bytes()), payload)
.await?;
producer.flush().await?;

sleep(Duration::from_millis(500)).await;
}
}

const CONNECTOR_CONFIG: &str = include_str!("fluvio_connector.recon");

async fn load_config(
path: Option<String>,
) -> Result<FluvioIngressConfiguration, Box<dyn Error + Send + Sync>> {
let content: String;
let recon = if let Some(path) = path {
content = tokio::fs::read_to_string(path).await?;
&content
} else {
CONNECTOR_CONFIG
};
FluvioIngressConfiguration::from_str(recon)
}

pub fn setup_logging() -> Result<(), Box<dyn Error + Send + Sync>> {
let filter = example_filter()?
.add_directive(LevelFilter::INFO.into())
.add_directive("swimos_connector_fluvio=trace".parse()?)
.add_directive("swimos_connector=info".parse()?);
tracing_subscriber::fmt().with_env_filter(filter).init();
Ok(())
}
26 changes: 26 additions & 0 deletions example_apps/fluvio_ingress_connector/src/params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2015-2024 Swim Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use clap::Parser;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Params {
/// Specify a Recon configuration file for the connector.
#[arg(long)]
pub config: Option<String>,
#[arg(long, default_value = "false")]
/// Specify that logging should be enabled.
pub enable_logging: bool,
}
3 changes: 0 additions & 3 deletions runtime/swimos_runtime/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,6 @@ pub enum AgentExecError {
/// Initializing the agent failed.
#[error("Failed to initialize agent: {0}")]
FailedInit(#[from] AgentInitError),
/// Initialization completed but no lanes were registered.
#[error("The agent did not register any lanes.")]
NoInitialLanes,
/// The runtime loop of the agent failed.
#[error("The agent task failed: {0}")]
FailedTask(#[from] AgentTaskError),
Expand Down
23 changes: 10 additions & 13 deletions runtime/swimos_runtime/src/agent/task/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,16 @@ impl<Store: AgentPersistence + Send + Sync> AgentInitTask<Store> {
result?;

let Initialization { reporting, .. } = initialization;
if endpoints.lane_endpoints.is_empty() {
Err(AgentExecError::NoInitialLanes)
} else {
Ok((
InitialEndpoints::new(
reporting,
request_stream.into_inner(),
endpoints,
ext_link_state,
),
store,
))
}

Ok((
InitialEndpoints::new(
reporting,
request_stream.into_inner(),
endpoints,
ext_link_state,
),
store,
))
}
}

Expand Down
8 changes: 1 addition & 7 deletions runtime/swimos_runtime/src/agent/task/init/tests/no_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::agent::{
AgentRuntimeRequest, Endpoints, InitialEndpoints, LaneEndpoint, LaneRuntimeSpec,
StoreRuntimeSpec,
},
AgentExecError, Io, LinkRequest,
Io, LinkRequest,
};

type RawMapOperation = MapOperation<Bytes, BytesMut>;
Expand Down Expand Up @@ -183,12 +183,6 @@ impl TestInit for SingleLaneInit {
}
}

#[tokio::test]
async fn no_lanes() {
let (result, _) = run_test(NoLanesInit, StoreDisabled).await;
assert!(matches!(result, Err(AgentExecError::NoInitialLanes)));
}

#[tokio::test]
async fn single_lane() {
for config in CONFIGS {
Expand Down
Loading

0 comments on commit 3d18c95

Please sign in to comment.