Skip to content

Commit

Permalink
Add update timestamp to shard model
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 21, 2024
1 parent c59be63 commit e802e62
Show file tree
Hide file tree
Showing 23 changed files with 123 additions and 21 deletions.
7 changes: 3 additions & 4 deletions docs/internals/backward-compatibility.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Backward compatibility in Quickwit.

If you are reading this, chances are you want to make a change to one of the resource
of quickwit's meta/config:
of Quickwit's meta/config:

User edited:
- QuickwitConfig
Expand All @@ -19,7 +19,7 @@ Quickwit currently manages backward compatibility of all of these resources but
This document describes how to handle a change, and how to make test such a change,
and spot eventual regression.

# How do I update `{IndexMetadata, SplitMetadata, FileBackedIndex, SourceConfig, IndexConfig}`?
## How do I update `{IndexMetadata, SplitMetadata, FileBackedIndex, SourceConfig, IndexConfig}`?

There are two types of upgrades.

Expand All @@ -45,6 +45,7 @@ non-regression.

When introducing such a change:
- modify your model with the help of the attributes above.
- modify the example for the model by editing its `TestableForRegression` trait implementation.
- commit the 2 files that were updated by build.rs
- eyeball the diff on the `.expected.json` that failed, and send it with your PR.

Expand Down Expand Up @@ -121,5 +122,3 @@ most recent version.

The unit test will start making sense in future updates thanks to the update phase
described in the previous section.


4 changes: 4 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
}],
}],
};
Expand Down Expand Up @@ -2054,6 +2055,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
}],
}],
};
Expand Down Expand Up @@ -2342,6 +2344,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
}),
}],
};
Expand Down Expand Up @@ -2495,6 +2498,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
}),
}],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,12 @@ impl IngestController {
shard_id: Some(shard_id),
leader_id: leader_id.to_string(),
follower_id: follower_id_opt.as_ref().map(ToString::to_string),
shard_state: ShardState::Open as i32,
doc_mapping_uid: Some(doc_mapping_uid),
// TODO: these fields are not used by init_shard()
shard_state: ShardState::Open as i32,
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 0,
};
let init_shard_subrequest = InitShardSubrequest {
subrequest_id: subrequest_id as u32,
Expand Down Expand Up @@ -2136,6 +2138,7 @@ mod tests {
doc_mapping_uid: subrequest.doc_mapping_uid,
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
};
let response = OpenShardsResponse {
subresponses: vec![OpenShardSubresponse {
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(10u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand All @@ -752,6 +753,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand All @@ -776,6 +778,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -787,6 +790,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(12u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1078,6 +1082,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::eof(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -1089,6 +1094,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning.as_eof()),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1221,6 +1227,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -1232,6 +1239,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::eof(22u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1575,6 +1583,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub mod shared_state_for_tests {
doc_mapping_uid: sub_req.doc_mapping_uid,
publish_position_inclusive: Some(position),
shard_state: ShardState::Open as i32,
update_timestamp: 1724158996,
}),
}
})
Expand Down Expand Up @@ -233,6 +234,7 @@ pub mod shared_state_for_tests {
doc_mapping_uid: None,
publish_position_inclusive: Some(position),
shard_state: ShardState::Open as i32,
update_timestamp: 1724158996,
}
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,7 @@ mod tests {
doc_mapping_uid: Some(doc_mapping_uid),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
};
let init_shards_request = InitShardsRequest {
subrequests: vec![InitShardSubrequest {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE shards
DROP IF EXISTS COLUMN update_timestamps;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE shards
-- We prefer a fix value here because it makes tests simpler.
-- Very few users use the shard API in versions <0.9 anyway.
ADD COLUMN IF NOT EXISTS update_timestamp TIMESTAMP NOT NULL DEFAULT '2024-01-01 00:00:00+00';
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl quickwit_config::TestableForRegression for FileBackedIndex {
follower_id: Some("follower-ingester".to_string()),
doc_mapping_uid: Some(DocMappingUid::for_test(1)),
publish_position_inclusive: Some(Position::Beginning),
update_timestamp: 1724240908,
..Default::default()
};
let shards = Shards::from_shards_vec(index_uid.clone(), source_id.clone(), vec![shard]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use quickwit_proto::metastore::{
OpenShardSubrequest, OpenShardSubresponse,
};
use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId};
use time::OffsetDateTime;
use tracing::{info, warn};

use crate::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta};
Expand Down Expand Up @@ -132,6 +133,7 @@ impl Shards {
doc_mapping_uid: subrequest.doc_mapping_uid,
publish_position_inclusive: Some(Position::Beginning),
publish_token: subrequest.publish_token.clone(),
update_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
};
mutation_occurred = true;
entry.insert(shard.clone());
Expand Down Expand Up @@ -288,6 +290,7 @@ impl Shards {
shard.shard_state = ShardState::Closed as i32;
}
shard.publish_position_inclusive = Some(publish_position_inclusive);
shard.update_timestamp = OffsetDateTime::now_utc().unix_timestamp();
}
Ok(MutationOccurred::Yes(()))
}
Expand Down
38 changes: 30 additions & 8 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ async fn try_apply_delta_v2(
shards
SET
publish_position_inclusive = new_positions.position,
shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END
shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END,
update_timestamp = CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
FROM
UNNEST($3, $4)
AS new_positions(shard_id, position)
Expand Down Expand Up @@ -1794,16 +1795,37 @@ mod tests {
const INSERT_SHARD_QUERY: &str = include_str!("queries/shards/insert.sql");

for shard in shards {
assert_eq!(&shard.source_id, source_id);
assert_eq!(shard.index_uid(), index_uid);
// explicit destructuring to ensure new fields are properly handled
let Shard {
doc_mapping_uid,
follower_id,
index_uid,
leader_id,
publish_position_inclusive,
publish_token,
shard_id,
shard_state,
source_id,
update_timestamp,
} = shard;
let shard_state_name = ShardState::from_i32(shard_state)
.unwrap()
.as_json_str_name();
let update_timestamp = OffsetDateTime::from_unix_timestamp(update_timestamp)
.expect("Bad timestamp format");
sqlx::query(INSERT_SHARD_QUERY)
.bind(index_uid)
.bind(source_id)
.bind(shard.shard_id())
.bind(shard.shard_state().as_json_str_name())
.bind(&shard.leader_id)
.bind(&shard.follower_id)
.bind(shard.doc_mapping_uid)
.bind(&shard.publish_position_inclusive().to_string())
.bind(&shard.publish_token)
.bind(shard_id.unwrap())
.bind(shard_state_name)
.bind(leader_id)
.bind(follower_id)
.bind(doc_mapping_uid)
.bind(publish_position_inclusive.unwrap().to_string())
.bind(publish_token)
.bind(update_timestamp)
.execute(&self.connection_pool)
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub(super) struct PgShard {
pub doc_mapping_uid: DocMappingUid,
pub publish_position_inclusive: String,
pub publish_token: Option<String>,
pub update_timestamp: sqlx::types::time::PrimitiveDateTime,
}

impl From<PgShard> for Shard {
Expand All @@ -277,6 +278,7 @@ impl From<PgShard> for Shard {
doc_mapping_uid: Some(pg_shard.doc_mapping_uid),
publish_position_inclusive: Some(pg_shard.publish_position_inclusive.into()),
publish_token: pg_shard.publish_token,
update_timestamp: pg_shard.update_timestamp.assume_utc().unix_timestamp(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token)
VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9)
INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token, update_timestamp)
VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9, $10)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token, update_timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
ON CONFLICT
DO NOTHING
RETURNING
Expand Down
Loading

0 comments on commit e802e62

Please sign in to comment.