Skip to content

Commit

Permalink
fix untagged enum serialization
Browse files Browse the repository at this point in the history
fix verify_serialization feature
  • Loading branch information
sokra committed Nov 19, 2024
1 parent be4fe01 commit ffeced0
Showing 1 changed file with 34 additions and 21 deletions.
55 changes: 34 additions & 21 deletions turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ use crate::{
utils::chunked_vec::ChunkedVec,
};

const POT_CONFIG: pot::Config = pot::Config::new().compatibility(pot::Compatibility::V4);

fn pot_ser_symbol_map() -> pot::ser::SymbolMap {
pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4)
}

fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> {
pot::de::SymbolList::new()
}

const META_KEY_OPERATIONS: u32 = 0;
const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
const META_KEY_SESSION_ID: u32 = 2;
Expand Down Expand Up @@ -113,7 +123,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
else {
return Ok(Vec::new());
};
let operations = pot::from_slice(operations.borrow())?;
let operations = POT_CONFIG.deserialize(operations.borrow())?;
Ok(operations)
}
get(&self.database).unwrap_or_default()
Expand Down Expand Up @@ -174,8 +184,8 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage

let mut task_type_bytes = Vec::new();
for (task_type, task_id) in updates {
let task_id = *task_id;
serialize_task_type(&task_type, &mut task_type_bytes)?;
let task_id: u32 = *task_id;
serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;

batch
.put(
Expand Down Expand Up @@ -263,7 +273,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
let mut task_type_bytes = Vec::new();
for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
let task_id = *task_id;
serialize_task_type(&task_type, &mut task_type_bytes)?;
serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;

batch
.put(
Expand Down Expand Up @@ -346,7 +356,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
tx: &D::ReadTransaction<'_>,
task_type: &CachedTaskType,
) -> Result<Option<TaskId>> {
let task_type = pot::to_vec(task_type)?;
let task_type = POT_CONFIG.serialize(task_type)?;
let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type)? else {
return Ok(None);
};
Expand Down Expand Up @@ -379,7 +389,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
else {
return Ok(None);
};
Ok(Some(pot::from_slice(bytes.borrow())?))
Ok(Some(POT_CONFIG.deserialize(bytes.borrow())?))
}
let result = self
.with_tx(tx, |tx| lookup(&self.database, tx, task_id))
Expand Down Expand Up @@ -412,7 +422,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
else {
return Ok(Vec::new());
};
let result: Vec<CachedDataItem> = pot::from_slice(bytes.borrow())?;
let result: Vec<CachedDataItem> = POT_CONFIG.deserialize(bytes.borrow())?;
Ok(result)
}
self.with_tx(tx, |tx| lookup(&self.database, tx, task_id, category))
Expand Down Expand Up @@ -471,8 +481,9 @@ where
{
let _span =
tracing::trace_span!("update operations", operations = operations.len()).entered();
let operations =
pot::to_vec(&operations).with_context(|| anyhow!("Unable to serialize operations"))?;
let operations = POT_CONFIG
.serialize(&operations)
.with_context(|| anyhow!("Unable to serialize operations"))?;
batch
.put(
KeySpace::Infra,
Expand All @@ -486,15 +497,17 @@ where

fn serialize_task_type(
task_type: &Arc<CachedTaskType>,
task_type_bytes: &mut Vec<u8>,
mut task_type_bytes: &mut Vec<u8>,
task_id: u32,
) -> Result<()> {
task_type_bytes.clear();
pot::to_writer(&**task_type, task_type_bytes)
.with_context(|| anyhow!("Unable to serialize task cache key {task_type:?}"))?;
POT_CONFIG
.serialize_into(&**task_type, &mut task_type_bytes)
.with_context(|| anyhow!("Unable to serialize task {task_id} cache key {task_type:?}"))?;
#[cfg(feature = "verify_serialization")]
{
let deserialize: Result<CachedTaskType, _> = serde_path_to_error::deserialize(
&mut pot::de::SymbolList::new().deserializer_for_slice(&*task_type_bytes)?,
&mut pot_de_symbol_list().deserializer_for_slice(&*task_type_bytes)?,
);
if let Err(err) = deserialize {
println!("Task type would not be deserializable {task_id}: {err:?}\n{task_type:#?}");
Expand Down Expand Up @@ -597,11 +610,12 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>(
if let Some(old_data) =
database.get(&tx, key_space, IntKey::new(*task).as_ref())?
{
let old_data: Vec<CachedDataItem> = match pot::from_slice(old_data.borrow())
let old_data: Vec<CachedDataItem> = match POT_CONFIG
.deserialize(old_data.borrow())
{
Ok(d) => d,
Err(_) => serde_path_to_error::deserialize(
&mut pot::de::SymbolList::new()
&mut pot_de_symbol_list()
.deserializer_for_slice(old_data.borrow())?,
)
.with_context(|| {
Expand Down Expand Up @@ -651,14 +665,14 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>(
}

fn serialize(task: TaskId, mut data: Vec<CachedDataItem>) -> Result<Vec<u8>> {
Ok(match pot::to_vec(&data) {
Ok(match POT_CONFIG.serialize(&data) {
#[cfg(not(feature = "verify_serialization"))]
Ok(value) => value,
_ => {
let mut error = Ok(());
data.retain(|item| {
let mut buf = Vec::<u8>::new();
let mut symbol_map = pot::ser::SymbolMap::new();
let mut symbol_map = pot_ser_symbol_map();
let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
if let Err(err) = serde_path_to_error::serialize(item, &mut serializer) {
if item.is_optional() {
Expand All @@ -675,9 +689,7 @@ fn serialize(task: TaskId, mut data: Vec<CachedDataItem>) -> Result<Vec<u8>> {
{
let deserialize: Result<CachedDataItem, _> =
serde_path_to_error::deserialize(
&mut pot::de::SymbolList::new()
.deserializer_for_slice(&buf)
.unwrap(),
&mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(),
);
if let Err(err) = deserialize {
println!(
Expand All @@ -691,7 +703,8 @@ fn serialize(task: TaskId, mut data: Vec<CachedDataItem>) -> Result<Vec<u8>> {
});
error?;

pot::to_vec(&data)
POT_CONFIG
.serialize(&data)
.with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))?
}
})
Expand Down

0 comments on commit ffeced0

Please sign in to comment.