Skip to content

Commit

Permalink
Merge branch 'main' into simplified_load_plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Jan 15, 2024
2 parents 39f49d8 + 2086eb6 commit 522a10d
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 9 deletions.
File renamed without changes.
91 changes: 91 additions & 0 deletions .github/workflows/sync-lockfiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
name: Sync Cargo lockfile with Zenoh's dependants

on:
schedule:
- cron: "0 0 * * *" # At the end of every day
workflow_dispatch:

defaults:
run:
shell: bash

jobs:
sync:
name: Sync Cargo lockfile with ${{ matrix.dependant }}
runs-on: ubuntu-latest
strategy:
matrix:
dependant:
- zenoh-c
- zenoh-python
- zenoh-java
- zenoh-kotlin
- zenoh-plugin-dds
- zenoh-plugin-mqtt
- zenoh-plugin-ros1
- zenoh-plugin-ros2dds
- zenoh-plugin-webserver
- zenoh-backend-filesystem
- zenoh-backend-influxdb
- zenoh-backend-rocksdb
- zenoh-backend-s3
steps:
- name: Checkout ${{ matrix.dependant }}
uses: actions/checkout@v4
with:
token: ${{ secrets.BOT_TOKEN_WORKFLOW }}

- name: Install Rust toolchain
# NOTE: Showing the active Rust toolchain (defined by the rust-toolchain.toml file)
# will have the side effect of installing it; if it's not installed already.
run: rustup show

# NOTE: Not all Zenoh dependants have their Cargo manifest and lockfile
# at the repository's toplevel. The only exception being zenoh-kotlin and
# zenoh-java. Thus the need for this ugly workaround.
- name: Compute Cargo manifest path of ${{ matrix.dependant }}
id: manifest-path
run: |
if [[ "${{ matrix.dependant }}" =~ zenoh-(java|kotlin) ]]; then
echo "value=./zenoh-jni/Cargo.toml" >> $GITHUB_OUTPUT
else
echo "value=./Cargo.toml" >> $GITHUB_OUTPUT
fi
- name: Override ${{ matrix.dependant }} lockfile with Zenoh's
# NOTE: We assume that the lockfile resides in the same directory as the manifest.
run: |
MANIFEST_PATH=${{ steps.manifest-path.outputs.value }}
curl "https://raw.githubusercontent.com/eclipse-zenoh/zenoh/Cargo.lock" --output ${MANIFEST_PATH/toml/lock}
- name: Rectify lockfile
# NOTE: Checking the package for errors will rectify the Cargo.lock while preserving
# the dependency versions fetched from source.
run: cargo check --manifest-path ${{ steps.manifest-path.outputs.value }}

- name: Create/Update a pull request if the lockfile changed
id: cpr
# NOTE: If there is a pending PR, this action will simply update it with a forced push.
uses: peter-evans/create-pull-request@v5
with:
title: Sync lockfile with Zenoh's
body: Automated synchronization of the Cargo lockfile with Zenoh. This is done to ensure plugin ABI compatibility.
commit-message: "chore: Sync Cargo lockfile with Zenoh's"
committer: eclipse-zenoh-bot <[email protected]>
author: eclipse-zenoh-bot <[email protected]>
branch: eclipse-zenoh-bot/sync-lockfile
delete-branch: true
labels: dependencies
token: ${{ secrets.BOT_TOKEN_WORKFLOW }}

- name: Auto approve the pull request
if: ${{ steps.cpr.outputs.pull-request-operation == 'created' }}
run: gh pr review -R "eclipse-zenoh/${{ matrix.dependant }}" --approve "${{ steps.cpr.outputs.pull-request-number }}"
env:
GH_TOKEN: ${{ secrets.BOT_TOKEN_WORKFLOW }}

- name: Enable auto merge for the pull request
if: steps.cpr.outputs.pull-request-operation == 'created'
run: gh pr merge -R "eclipse-zenoh/${{ matrix.dependant }}" --merge --auto "${{ steps.cpr.outputs.pull-request-number }}"
env:
GH_TOKEN: ${{ secrets.BOT_TOKEN_WORKFLOW }}
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl Stats {
}
impl Drop for Stats {
fn drop(&mut self) {
let elapsed = self.global_start.unwrap().elapsed().as_secs_f64();
let Some(global_start) = self.global_start else {
return;
};
let elapsed = global_start.elapsed().as_secs_f64();
let total = self.round_size * self.finished_rounds + self.round_count;
let throughtput = total as f64 / elapsed;
println!("Received {total} messages over {elapsed:.2}s: {throughtput}msg/s");
Expand Down
4 changes: 3 additions & 1 deletion gen_zenoh_deb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ if [ -z "$1" -o -z "$2" ]; then
exit 1
fi

VERSION=`echo $1`
# NOTE: cargo-deb v2.0.0 and later will add a "-1" suffix to the version for
# compliance with Debian's packaging standard.
VERSION="$1-1"
ARCH=$2

PACKAGE_NAME="zenoh_${VERSION}_${ARCH}"
Expand Down
18 changes: 13 additions & 5 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::collections::{HashMap, HashSet};
use std::str;
use zenoh::key_expr::{KeyExpr, OwnedKeyExpr};
use zenoh::prelude::r#async::*;
use zenoh::query::QueryConsolidation;
use zenoh::time::Timestamp;
use zenoh::Session;

Expand Down Expand Up @@ -88,7 +87,10 @@ impl Aligner {
let checksum = other.checksum;
let timestamp = other.timestamp;
let (missing_content, no_content_err) = self.get_missing_content(&other, from).await;
log::trace!("[ALIGNER] Missing content is {:?}", missing_content);
log::debug!(
"[ALIGNER] Missing {} entries; query corresponding samples",
missing_content.len()
);

// If missing content is not identified, it showcases some problem
// The problem will be addressed in the future rounds, hence will not count as processed
Expand All @@ -98,11 +100,12 @@ impl Aligner {
.await;

// Missing data might be empty since some samples in digest might be outdated
log::trace!("[ALIGNER] Missing data is {:?}", missing_data);
log::debug!("[ALIGNER] Received {} queried samples", missing_data.len());
log::trace!("[ALIGNER] Received queried samples: {missing_data:?}");

for (key, (ts, value)) in missing_data {
let sample = Sample::new(key, value).with_timestamp(ts);
log::debug!("[ALIGNER] Adding sample {:?} to storage", sample);
log::debug!("[ALIGNER] Adding {:?} to storage", sample);
self.tx_sample.send_async(sample).await.unwrap_or_else(|e| {
log::error!("[ALIGNER] Error adding sample to storage: {}", e)
});
Expand Down Expand Up @@ -140,6 +143,7 @@ impl Aligner {
}

async fn get_missing_content(&self, other: &Digest, from: &str) -> (Vec<LogEntry>, bool) {
log::debug!("[ALIGNER] Get missing content from {from} ...");
// get my digest
let this = &self.snapshotter.get_digest().await;

Expand All @@ -152,6 +156,9 @@ impl Aligner {

let ((cold_data, no_cold_err), (warm_data, no_warm_err), (hot_data, no_hot_err)) =
futures::join!(cold_alignment, warm_alignment, hot_alignment);
log::debug!("[ALIGNER] Missing content from {from} in Cold era: {cold_data:?}");
log::debug!("[ALIGNER] Missing content from {from} in Warm era: {warm_data:?}");
log::debug!("[ALIGNER] Missing content from {from} in Hot era: {hot_data:?}");
(
[cold_data, warm_data, hot_data].concat(),
no_cold_err && no_warm_err && no_hot_err,
Expand Down Expand Up @@ -313,7 +320,7 @@ impl Aligner {
match self
.session
.get(&selector)
.consolidation(QueryConsolidation::AUTO)
.consolidation(zenoh::query::ConsolidationMode::None)
.accept_replies(zenoh::query::ReplyKeyExpr::Any)
.res()
.await
Expand Down Expand Up @@ -345,6 +352,7 @@ impl Aligner {
no_err = false;
}
};
log::trace!("[ALIGNER] On Query '{selector}' received: {return_val:?} (no_err:{no_err})");
(return_val, no_err)
}
}

0 comments on commit 522a10d

Please sign in to comment.