Skip to content

Commit

Permalink
Log task descriptions for each LeRobot dataset episode (#9028)
Browse files Browse the repository at this point in the history
### Related

This is based on #9027, which should be merged first

### What

This adds support for the `DType::String` and logs the task description
for an episode to the viewer.
  • Loading branch information
oxkitsune authored Feb 14, 2025
1 parent a830fa0 commit dd7f1ef
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 12 deletions.
6 changes: 6 additions & 0 deletions crates/store/re_data_loader/src/lerobot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ impl LeRobotDataset {

Ok(Cow::Owned(contents))
}

/// Retrieve the task using the provided task index.
pub fn task_by_index(&self, task: TaskIndex) -> Option<&LeRobotDatasetTask> {
self.metadata.tasks.get(task.0)
}
}

/// Metadata for a `LeRobot` dataset.
Expand Down Expand Up @@ -365,6 +370,7 @@ pub enum DType {
Float32,
Float64,
Int64,
String,
}

/// Name metadata for a feature in the `LeRobot` dataset.
Expand Down
4 changes: 1 addition & 3 deletions crates/store/re_data_loader/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ pub fn load_from_path(
// NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
tx: &Sender<LogMsg>,
) -> Result<(), DataLoaderError> {
use crate::DataLoaderSettings;

re_tracing::profile_function!(path.to_string_lossy());

if !path.exists() {
Expand All @@ -40,7 +38,7 @@ pub fn load_from_path(
// When loading a LeRobot dataset, avoid sending a `SetStoreInfo` message since the LeRobot
// loader handles this automatically.
let settings = if crate::lerobot::is_lerobot_dataset(path) {
&DataLoaderSettings {
&crate::DataLoaderSettings {
force_store_info: false,
..settings.clone()
}
Expand Down
66 changes: 58 additions & 8 deletions crates/store/re_data_loader/src/loader_lerobot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use re_chunk::{
};

use re_log_types::{ApplicationId, StoreId};
use re_types::archetypes::{AssetVideo, EncodedImage, VideoFrameReference};
use re_types::archetypes::{AssetVideo, EncodedImage, TextDocument, VideoFrameReference};
use re_types::components::{Scalar, VideoTimestamp};
use re_types::{Archetype, Component, ComponentBatch};

use crate::lerobot::{is_lerobot_dataset, DType, EpisodeIndex, Feature, LeRobotDataset};
use crate::lerobot::{is_lerobot_dataset, DType, EpisodeIndex, Feature, LeRobotDataset, TaskIndex};
use crate::load_file::prepare_store_info;
use crate::{DataLoader, DataLoaderError, LoadedData};

Expand All @@ -39,7 +39,7 @@ impl DataLoader for LeRobotDatasetLoader {

fn load_from_path(
&self,
_settings: &crate::DataLoaderSettings,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: Sender<LoadedData>,
) -> Result<(), DataLoaderError> {
Expand All @@ -49,6 +49,10 @@ impl DataLoader for LeRobotDatasetLoader {

let dataset = LeRobotDataset::load_from_directory(&filepath)
.map_err(|err| anyhow!("Loading LeRobot dataset failed: {err}"))?;
let application_id = settings
.application_id
.clone()
.unwrap_or(ApplicationId(format!("{filepath:?}")));

// NOTE(1): `spawn` is fine, this whole function is native-only.
// NOTE(2): this must spawned on a dedicated thread to avoid a deadlock!
Expand All @@ -64,7 +68,7 @@ impl DataLoader for LeRobotDatasetLoader {
dataset.path,
dataset.metadata.episodes.len(),
);
load_and_stream(&dataset, &tx);
load_and_stream(&dataset, &application_id, &tx);
}
})
.with_context(|| {
Expand All @@ -85,9 +89,13 @@ impl DataLoader for LeRobotDatasetLoader {
}
}

fn load_and_stream(dataset: &LeRobotDataset, tx: &Sender<crate::LoadedData>) {
fn load_and_stream(
dataset: &LeRobotDataset,
application_id: &ApplicationId,
tx: &Sender<crate::LoadedData>,
) {
// set up all recordings
let episodes = prepare_episode_chunks(dataset, tx);
let episodes = prepare_episode_chunks(dataset, application_id, tx);

for (episode, store_id) in &episodes {
// log episode data to its respective recording
Expand Down Expand Up @@ -119,9 +127,9 @@ fn load_and_stream(dataset: &LeRobotDataset, tx: &Sender<crate::LoadedData>) {
/// [`LogMsg`](`re_log_types::LogMsg`) for each episode.
fn prepare_episode_chunks(
dataset: &LeRobotDataset,
application_id: &ApplicationId,
tx: &Sender<crate::LoadedData>,
) -> Vec<(EpisodeIndex, StoreId)> {
let application_id = ApplicationId(format!("{:?}", dataset.path));
let mut store_ids = vec![];

for episode in &dataset.metadata.episodes {
Expand Down Expand Up @@ -191,8 +199,14 @@ fn load_episode(
time_column.clone(),
)?);
}

DType::Image => chunks.extend(load_episode_images(feature_key, &timeline, &data)?),
DType::Int64 | DType::Bool => {
DType::Int64 if feature_key == "task_index" => {
// special case int64 task_index columns
// this always refers to the task description in the dataset metadata.
chunks.extend(log_episode_task(dataset, &timeline, &data)?);
}
DType::Int64 | DType::Bool | DType::String => {
re_log::warn_once!(
"Loading LeRobot feature ({}) of dtype `{:?}` into Rerun is not yet implemented",
feature_key,
Expand All @@ -208,6 +222,42 @@ fn load_episode(
Ok(chunks)
}

fn log_episode_task(
dataset: &LeRobotDataset,
timeline: &Timeline,
data: &RecordBatch,
) -> Result<impl ExactSizeIterator<Item = Chunk>, DataLoaderError> {
let task_indices = data
.column_by_name("task_index")
.and_then(|c| c.downcast_array_ref::<Int64Array>())
.with_context(|| "Failed to get task_index field from dataset!")?;

let mut chunk = Chunk::builder("task".into());
let mut row_id = RowId::new();
let mut time_int = TimeInt::ZERO;

for task_index in task_indices {
let Some(task) = task_index
.and_then(|i| usize::try_from(i).ok())
.and_then(|i| dataset.task_by_index(TaskIndex(i)))
else {
// if there is no valid task for the current frame index, we skip it.
time_int = time_int.inc();
continue;
};

let mut timepoint = TimePoint::default();
timepoint.insert(*timeline, time_int);
let text = TextDocument::new(task.task.clone());
chunk = chunk.with_archetype(row_id, timepoint, &text);

row_id = row_id.next();
time_int = time_int.inc();
}

Ok(std::iter::once(chunk.build()?))
}

fn load_episode_images(
observation: &str,
timeline: &Timeline,
Expand Down
2 changes: 1 addition & 1 deletion tests/python/release_checklist/check_lerobot_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
This will load an entire LeRobot dataset -- simply make sure that it does 🙃
The LeRobot dataset loader works by creating a new _recording_ (⚠)️ for each episode in the dataset.
I.e., you should see a bunch of recordings below this readme (10, to be exact).
I.e., you should see a bunch of recordings below this readme (3, to be exact).
"""


Expand Down

0 comments on commit dd7f1ef

Please sign in to comment.