Skip to content

Commit

Permalink
QdrantDataSourceConfig, main read, shadow write (#2559)
Browse files Browse the repository at this point in the history
* QdrantDataSourceConfig, main read, shadow write

* No failure sequential shadow write

* Migration for data_sources core table
  • Loading branch information
spolu authored Nov 16, 2023
1 parent 6aace59 commit cb73699
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 59 deletions.
216 changes: 165 additions & 51 deletions core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::try_join;
use tokio_stream::{self as stream};
use uuid::Uuid;

use super::qdrant::{QdrantClients, QdrantCluster};
use super::qdrant::{QdrantClients, QdrantDataSourceConfig};

/// A filter to apply to the search query based on `tags`. All documents returned must have at least
/// one tag in `is_in` and none of the tags in `is_not`.
Expand Down Expand Up @@ -188,7 +188,7 @@ pub struct DataSourceConfig {
pub extras: Option<Value>,
pub splitter_id: SplitterID,
pub max_chunk_size: usize,
pub use_cache: bool,
pub qdrant_config: Option<QdrantDataSourceConfig>,
}

/// The `data_source_id` is the unique identifier that allows routing to the right data in SQL store
Expand Down Expand Up @@ -305,7 +305,7 @@ impl DataSource {
credentials: Credentials,
qdrant_clients: QdrantClients,
) -> Result<()> {
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);

let mut embedder = provider(self.config.provider_id).embedder(self.config.model_id.clone());
embedder.initialize(credentials).await?;
Expand Down Expand Up @@ -425,8 +425,6 @@ impl DataSource {
document_id: String,
parents: Vec<String>,
) -> Result<()> {
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);

store
.update_data_source_document_parents(
&self.project,
Expand All @@ -440,7 +438,7 @@ impl DataSource {
hasher.update(document_id.as_bytes());
let document_id_hash = format!("{}", hasher.finalize().to_hex());

self.update_document_payload(qdrant_client, document_id_hash, "parents", parents)
self.update_document_payload(qdrant_clients, document_id_hash, "parents", parents)
.await?;
Ok(())
}
Expand All @@ -453,8 +451,6 @@ impl DataSource {
add_tags: Vec<String>,
remove_tags: Vec<String>,
) -> Result<Vec<String>> {
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);

let new_tags = store
.update_data_source_document_tags(
&self.project,
Expand All @@ -469,18 +465,20 @@ impl DataSource {
hasher.update(document_id.as_bytes());
let document_id_hash = format!("{}", hasher.finalize().to_hex());

self.update_document_payload(qdrant_client, document_id_hash, "tags", new_tags.clone())
self.update_document_payload(qdrant_clients, document_id_hash, "tags", new_tags.clone())
.await?;
Ok(new_tags)
}

async fn update_document_payload(
&self,
qdrant_client: Arc<QdrantClient>,
qdrant_clients: QdrantClients,
document_id_hash: String,
field_name: &str,
field_value: impl Into<Value>,
) -> Result<()> {
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);

let mut payload = Payload::new();
payload.insert(field_name, field_value.into());

Expand All @@ -499,6 +497,37 @@ impl DataSource {
})),
};

match qdrant_clients.shadow_write_client(&self.config.qdrant_config) {
Some(qdrant_client) => {
match qdrant_client
.set_payload(
self.qdrant_collection().to_string(),
&points_selector,
payload.clone(),
None,
)
.await
{
Ok(_) => {
utils::done(&format!(
"[SHADOW_WRITE_SUCCESS] Update payload: cluster={:?} collection={}",
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
self.qdrant_collection(),
));
}
Err(e) => {
utils::error(&format!(
"[SHADOW_WRITE_FAIL] Update payload: cluster={:?} collection={} error={}",
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
self.qdrant_collection(),
e
));
}
}
}
None => (),
}

qdrant_client
.set_payload(
self.qdrant_collection().to_string(),
Expand All @@ -507,6 +536,7 @@ impl DataSource {
None,
)
.await?;

Ok(())
}

Expand All @@ -523,7 +553,7 @@ impl DataSource {
text: &str,
preserve_system_tags: bool,
) -> Result<Document> {
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);

// disallow preserve_system_tags=true if tags contains a string starting with the system tag prefix
// prevents having duplicate system tags or have users accidentally add system tags (from UI/API)
Expand Down Expand Up @@ -830,27 +860,52 @@ impl DataSource {
document.token_count = Some(document.chunks.len() * self.config.max_chunk_size);

let now = utils::now();

// Clean-up previous document chunks (vector search db).
let _ = qdrant_client
.delete_points(
self.qdrant_collection(),
&qdrant::Filter {
must_not: vec![],
should: vec![],
must: vec![qdrant::FieldCondition {
key: "document_id_hash".to_string(),
r#match: Some(qdrant::Match {
match_value: Some(qdrant::r#match::MatchValue::Keyword(
document_id_hash.clone(),
)),
}),
..Default::default()
let filter: PointsSelector = qdrant::Filter {
must_not: vec![],
should: vec![],
must: vec![qdrant::FieldCondition {
key: "document_id_hash".to_string(),
r#match: Some(qdrant::Match {
match_value: Some(qdrant::r#match::MatchValue::Keyword(
document_id_hash.clone(),
)),
}),
..Default::default()
}
.into()],
}
.into();

match qdrant_clients.shadow_write_client(&self.config.qdrant_config) {
Some(qdrant_client) => {
match qdrant_client
.delete_points(self.qdrant_collection(), &filter, None)
.await
{
Ok(_) => {
utils::done(&format!(
"[SHADOW_WRITE_SUCCESS] Delete points: cluster={:?} collection={}",
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
self.qdrant_collection(),
));
}
Err(e) => {
utils::error(&format!(
"[SHADOW_WRITE_FAIL] Delete points: cluster={:?} collection={} error={}",
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
self.qdrant_collection(),
e
));
}
.into()],
}
.into(),
None,
)
}
None => (),
}

qdrant_client
.delete_points(self.qdrant_collection(), &filter, None)
.await?;

utils::done(&format!(
Expand Down Expand Up @@ -915,7 +970,33 @@ impl DataSource {
let now = utils::now();
let chunk_len = chunk.len();

let _ = qdrant_client
match qdrant_clients.shadow_write_client(&self.config.qdrant_config) {
Some(qdrant_client) => {
match qdrant_client
.upsert_points(self.qdrant_collection(), chunk.clone(), None)
.await
{
Ok(_) => {
utils::done(&format!(
"[SHADOW_WRITE_SUCCESS] Upsert points: cluster={:?} collection={}",
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
self.qdrant_collection(),
));
}
Err(e) => {
utils::error(&format!(
"[SHADOW_WRITE_FAIL] Upsert points: cluster={:?} collection={} error={}",
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
self.qdrant_collection(),
e
));
}
}
}
None => (),
}

qdrant_client
.upsert_points(self.qdrant_collection(), chunk, None)
.await?;

Expand Down Expand Up @@ -953,7 +1034,7 @@ impl DataSource {
full_text: bool,
target_document_tokens: Option<usize>,
) -> Result<Vec<Document>> {
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);

if top_k > DataSource::MAX_TOP_K_SEARCH {
return Err(anyhow!("top_k must be <= {}", DataSource::MAX_TOP_K_SEARCH));
Expand Down Expand Up @@ -1530,34 +1611,58 @@ impl DataSource {
qdrant_clients: QdrantClients,
document_id: &str,
) -> Result<()> {
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);
let store = store.clone();

let mut hasher = blake3::Hasher::new();
hasher.update(document_id.as_bytes());
let document_id_hash = format!("{}", hasher.finalize().to_hex());

// Clean-up document chunks (vector search db).
let _ = qdrant_client
.delete_points(
self.qdrant_collection(),
&qdrant::Filter {
must_not: vec![],
should: vec![],
must: vec![qdrant::FieldCondition {
key: "document_id_hash".to_string(),
r#match: Some(qdrant::Match {
match_value: Some(qdrant::r#match::MatchValue::Keyword(
document_id_hash.clone(),
)),
}),
..Default::default()
let filter: PointsSelector = qdrant::Filter {
must_not: vec![],
should: vec![],
must: vec![qdrant::FieldCondition {
key: "document_id_hash".to_string(),
r#match: Some(qdrant::Match {
match_value: Some(qdrant::r#match::MatchValue::Keyword(
document_id_hash.clone(),
)),
}),
..Default::default()
}
.into()],
}
.into();

match qdrant_clients.shadow_write_client(&self.config.qdrant_config) {
Some(qdrant_client) => {
match qdrant_client
.delete_points(self.qdrant_collection(), &filter, None)
.await
{
Ok(_) => {
utils::done(&format!(
"[SHADOW_WRITE_SUCCESS] Delete points: cluster={:?} collection={}",
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
self.qdrant_collection(),
));
}
Err(e) => {
utils::error(&format!(
"[SHADOW_WRITE_FAIL] Delete points: cluster={:?} collection={} error={}",
qdrant_clients.shadow_write_cluster(&self.config.qdrant_config),
self.qdrant_collection(),
e
));
}
.into()],
}
.into(),
None,
)
}
None => (),
}

qdrant_client
.delete_points(self.qdrant_collection(), &filter, None)
.await?;

// Delete document (SQL)
Expand All @@ -1573,7 +1678,16 @@ impl DataSource {
store: Box<dyn Store + Sync + Send>,
qdrant_clients: QdrantClients,
) -> Result<()> {
let qdrant_client = qdrant_clients.get(QdrantCluster::Main0);
if qdrant_clients
.shadow_write_cluster(&self.config.qdrant_config)
.is_some()
{
Err(anyhow!(
"Cannot delete data source with a shadow_write_cluster set"
))?;
}

let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);
let store = store.clone();

// Delete collection (vector search db).
Expand Down
Loading

0 comments on commit cb73699

Please sign in to comment.