diff --git a/.env.example b/.env.example index cd533521985d..934c0f061413 100644 --- a/.env.example +++ b/.env.example @@ -125,13 +125,6 @@ OPENDAL_GDRIVE_ACCESS_TOKEN= OPENDAL_GDRIVE_REFRESH_TOKEN= OPENDAL_GDRIVE_CLIENT_ID= OPENDAL_GDRIVE_CLIENT_SECRET= -# libsql -OPENDAL_LIBSQL_ROOT=/tmp/opendal/ -OPENDAL_LIBSQL_CONNECTION_STRING=https://example.com/db -OPENDAL_LIBSQL_AUTH_TOKEN= -OPENDAL_LIBSQL_TABLE=t_opendal -OPENDAL_LIBSQL_KEY_FIELD=key -OPENDAL_LIBSQL_VALUE_FIELD=val # sqlite OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db OPENDAL_SQLITE_TABLE=data diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml index 77964b592f4f..817ddc18c29c 100644 --- a/bindings/java/Cargo.toml +++ b/bindings/java/Cargo.toml @@ -85,7 +85,6 @@ services-all = [ "services-gridfs", "services-sqlite", "services-azfile", - "services-libsql", "services-swift", "services-alluxio", "services-b2", @@ -128,7 +127,6 @@ services-hdfs = ["opendal/services-hdfs"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] -services-libsql = ["opendal/services-libsql"] services-memcached = ["opendal/services-memcached"] services-mini-moka = ["opendal/services-mini-moka"] services-moka = ["opendal/services-moka"] diff --git a/bindings/java/src/main/java/org/apache/opendal/ServiceConfig.java b/bindings/java/src/main/java/org/apache/opendal/ServiceConfig.java index 3f6ae11b4334..d609e3778cc2 100644 --- a/bindings/java/src/main/java/org/apache/opendal/ServiceConfig.java +++ b/bindings/java/src/main/java/org/apache/opendal/ServiceConfig.java @@ -1729,68 +1729,6 @@ public Map configMap() { } } - /** - * Configuration for service libsql. - */ - @Builder - @Data - @RequiredArgsConstructor(access = AccessLevel.PRIVATE) - class Libsql implements ServiceConfig { - /** - *

Connection string for libsql service.

- */ - public final String connectionString; - /** - *

Authentication token for libsql service.

- */ - public final String authToken; - /** - *

Table name for libsql service.

- */ - public final String table; - /** - *

Key field name for libsql service.

- */ - public final String keyField; - /** - *

Value field name for libsql service.

- */ - public final String valueField; - /** - *

Root for libsql service.

- */ - public final String root; - - @Override - public String scheme() { - return "libsql"; - } - - @Override - public Map configMap() { - final HashMap map = new HashMap<>(); - if (connectionString != null) { - map.put("connection_string", connectionString); - } - if (authToken != null) { - map.put("auth_token", authToken); - } - if (table != null) { - map.put("table", table); - } - if (keyField != null) { - map.put("key_field", keyField); - } - if (valueField != null) { - map.put("value_field", valueField); - } - if (root != null) { - map.put("root", root); - } - return map; - } - } - /** * Configuration for service memcached. */ diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index f38dff010218..46557e33b1bc 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -83,7 +83,6 @@ services-all = [ "services-mongodb", "services-gridfs", "services-sqlite", - "services-libsql", "services-alluxio", "services-b2", "services-seafile", @@ -126,7 +125,6 @@ services-hdfs = ["opendal/services-hdfs"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] -services-libsql = ["opendal/services-libsql"] services-memcached = ["opendal/services-memcached"] services-mini-moka = ["opendal/services-mini-moka"] services-moka = ["opendal/services-moka"] diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index de284468214f..3f952868b23b 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -86,7 +86,6 @@ services-all = [ "services-mongodb", "services-gridfs", "services-sqlite", - "services-libsql", "services-alluxio", "services-b2", "services-seafile", @@ -129,7 +128,6 @@ services-hdfs = ["opendal/services-hdfs"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] -services-libsql = ["opendal/services-libsql"] services-memcached = ["opendal/services-memcached"] services-mini-moka = ["opendal/services-mini-moka"] services-moka = ["opendal/services-moka"] diff --git a/bindings/python/python/opendal/__base.pyi b/bindings/python/python/opendal/__base.pyi index 6096af429bf7..680420f68507 100644 --- a/bindings/python/python/opendal/__base.pyi +++ b/bindings/python/python/opendal/__base.pyi @@ -420,20 +420,6 @@ class _Base: branch: str = ..., ) -> None: ... - @overload - def __init__( - self, - scheme: Literal["libsql"], - /, - *, - connection_string: str = ..., - auth_token: str = ..., - table: str = ..., - key_field: str = ..., - value_field: str = ..., - root: str = ..., - ) -> None: ... - @overload def __init__( self, @@ -842,4 +828,4 @@ class _Base: @overload - def __init__(self, scheme: str, /, **kwargs: str) -> None: ... \ No newline at end of file + def __init__(self, scheme: str, /, **kwargs: str) -> None: ... diff --git a/core/Cargo.lock b/core/Cargo.lock index 92cd1a03ca90..5a5372ba9862 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -3618,18 +3618,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "hrana-client-proto" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f16b4e41e289da3fd60e64f245246a97e78fab7b3788c6d8147b3ae7d9f5e533" -dependencies = [ - "anyhow", - "base64 0.21.7", - "serde", - "serde_json", -] - [[package]] name = "html5ever" version = "0.27.0" @@ -5189,7 +5177,6 @@ dependencies = [ "hdfs-native", "hdrs", "hmac", - "hrana-client-proto", "http 1.2.0", "libtest-mimic", "log", diff --git a/core/Cargo.toml b/core/Cargo.toml index 669cc949cce4..2ed43eaf6932 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -56,16 +56,16 @@ default = ["reqwest/rustls-tls", "executors-tokio", "services-memory"] # # You should never enable this feature unless you are developing opendal. tests = [ - "dep:rand", - "dep:sha2", - "dep:dotenvy", - "layers-blocking", - "services-azblob", - "services-fs", - "services-http", - "services-memory", - "internal-tokio-rt", - "services-s3", + "dep:rand", + "dep:sha2", + "dep:dotenvy", + "layers-blocking", + "services-azblob", + "services-fs", + "services-http", + "services-memory", + "internal-tokio-rt", + "services-s3", ] # Enable path cache. @@ -109,20 +109,20 @@ services-aliyun-drive = [] services-alluxio = [] services-atomicserver = ["dep:atomic_lib"] services-azblob = [ - "dep:sha2", - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:sha2", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-azdls = [ - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-azfile = [ - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-b2 = [] services-cacache = ["dep:cacache"] @@ -130,9 +130,9 @@ services-chainsafe = [] services-cloudflare-kv = [] services-compfs = ["dep:compio"] services-cos = [ - "dep:reqsign", - "reqsign?/services-tencent", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-tencent", + "reqsign?/reqwest_request", ] services-d1 = [] services-dashmap = ["dep:dashmap"] @@ -143,9 +143,9 @@ services-foundationdb = ["dep:foundationdb"] services-fs = ["tokio/fs", "internal-tokio-rt"] services-ftp = ["dep:suppaftp", "dep:bb8", "dep:async-tls"] services-gcs = [ - "dep:reqsign", - "reqsign?/services-google", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-google", + "reqsign?/reqwest_request", ] services-gdrive = ["internal-path-cache"] services-ghac = [] @@ -160,7 +160,6 @@ services-ipfs = ["dep:prost"] services-ipmfs = [] services-koofr = [] services-lakefs = [] -services-libsql = ["dep:hrana-client-proto"] services-memcached = ["dep:bb8"] services-memory = [] services-mini-moka = ["dep:mini-moka"] @@ -170,15 +169,15 @@ services-monoiofs = ["dep:monoio", "dep:flume"] services-mysql = ["dep:sqlx", "sqlx?/mysql"] services-nebula-graph = ["dep:rust-nebula", "dep:bb8", "dep:snowflaked"] services-obs = [ - "dep:reqsign", - "reqsign?/services-huaweicloud", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-huaweicloud", + "reqsign?/reqwest_request", ] services-onedrive = [] services-oss = [ - "dep:reqsign", - "reqsign?/services-aliyun", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-aliyun", + "reqsign?/reqwest_request", ] services-pcloud = [] services-persy = ["dep:persy", "internal-tokio-rt"] @@ -188,11 +187,11 @@ services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"] services-s3 = [ - "dep:reqsign", - "reqsign?/services-aws", - "reqsign?/reqwest_request", - "dep:crc32c", - "dep:crc64fast-nvme", + "dep:reqsign", + "reqsign?/services-aws", + "reqsign?/reqwest_request", + "dep:crc32c", + "dep:crc64fast-nvme", ] services-seafile = [] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] @@ -238,12 +237,12 @@ backon = { version = "1.2", features = ["tokio-sleep"] } base64 = "0.22" bytes = "1.6" chrono = { version = "0.4.28", default-features = false, features = [ - "clock", - "std", + "clock", + "std", ] } futures = { version = "0.3", default-features = false, features = [ - "std", - "async-await", + "std", + "async-await", ] } http = "1.1" log = "0.4" @@ -253,7 +252,7 @@ once_cell = "1" percent-encoding = "2" quick-xml = { version = "0.36", features = ["serialize", "overlapped-lists"] } reqwest = { version = "0.12.2", features = [ - "stream", + "stream", ], default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -273,7 +272,7 @@ prost = { version = "0.13", optional = true } sha1 = { version = "0.10.6", optional = true } sha2 = { version = "0.10", optional = true } sqlx = { version = "0.8.0", features = [ - "runtime-tokio-rustls", + "runtime-tokio-rustls", ], optional = true } # For http based services. @@ -286,8 +285,8 @@ ouroboros = { version = "0.18.4", optional = true } atomic_lib = { version = "0.39.0", optional = true } # for services-cacache cacache = { version = "13.0", default-features = false, features = [ - "tokio-runtime", - "mmap", + "tokio-runtime", + "mmap", ], optional = true } # for services-dashmap dashmap = { version = "6", optional = true } @@ -295,15 +294,13 @@ dashmap = { version = "6", optional = true } etcd-client = { version = "0.14", optional = true, features = ["tls"] } # for services-foundationdb foundationdb = { version = "0.9.0", features = [ - "embedded-fdb-include", - "fdb-7_3", + "embedded-fdb-include", + "fdb-7_3", ], optional = true } # for services-hdfs hdrs = { version = "0.3.2", optional = true, features = ["async_file"] } # for services-upyun hmac = { version = "0.12.1", optional = true } -# for services-libsql -hrana-client-proto = { version = "0.2.1", optional = true } # for services-mini-moka mini-moka = { version = "0.10", optional = true } # for services-moka @@ -315,8 +312,8 @@ mongodb-internal-macros = { version = ">=3,<3.2.0", optional = true } # for services-sftp openssh = { version = "0.11.0", optional = true } openssh-sftp-client = { version = "0.15.2", optional = true, features = [ - "openssh", - "tracing", + "openssh", + "tracing", ] } # for services-persy persy = { version = "1.4.6", optional = true } @@ -324,9 +321,9 @@ persy = { version = "1.4.6", optional = true } redb = { version = "2", optional = true } # for services-redis redis = { version = "0.27", features = [ - "cluster-async", - "tokio-comp", - "connection-manager", + "cluster-async", + "tokio-comp", + "connection-manager", ], optional = true } # for services-rocksdb rocksdb = { version = "0.21.0", default-features = false, optional = true } @@ -334,9 +331,9 @@ rocksdb = { version = "0.21.0", default-features = false, optional = true } sled = { version = "0.34.7", optional = true } # for services-ftp suppaftp = { version = "6.0.3", default-features = false, features = [ - "async-secure", - "rustls", - "async-rustls", + "async-secure", + "rustls", + "async-rustls", ], optional = true } # for services-tikv tikv-client = { version = "0.3.0", optional = true, default-features = false } @@ -346,24 +343,24 @@ hdfs-native = { version = "0.10", optional = true } surrealdb = { version = "2", optional = true, features = ["protocol-http"] } # for services-compfs compio = { version = "0.12.0", optional = true, features = [ - "runtime", - "bytes", - "polling", - "dispatcher", + "runtime", + "bytes", + "polling", + "dispatcher", ] } # for services-s3 crc32c = { version = "0.6.6", optional = true } -crc64fast-nvme = {version = "1.1.1", optional = true} +crc64fast-nvme = { version = "1.1.1", optional = true } # for services-nebula-graph rust-nebula = { version = "^0.0.2", optional = true, features = ["graph"] } snowflaked = { version = "1", optional = true, features = ["sync"] } # for services-monoiofs flume = { version = "0.11", optional = true } monoio = { version = "0.2.4", optional = true, features = [ - "sync", - "mkdirat", - "unlinkat", - "renameat", + "sync", + "mkdirat", + "unlinkat", + "renameat", ] } # Layers @@ -402,7 +399,7 @@ fastrace = { version = "0.7", features = ["enable"] } fastrace-jaeger = "0.7" libtest-mimic = "0.8" opentelemetry = { version = "0.27", default-features = false, features = [ - "trace", + "trace", ] } opentelemetry-otlp = "0.27" opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } @@ -413,6 +410,6 @@ size = "0.4" tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] } tracing-opentelemetry = "0.28.0" tracing-subscriber = { version = "0.3", features = [ - "env-filter", - "tracing-log", + "env-filter", + "tracing-log", ] } diff --git a/core/fuzz/Cargo.toml b/core/fuzz/Cargo.toml index e2b8173e6f32..2600c0ba4536 100644 --- a/core/fuzz/Cargo.toml +++ b/core/fuzz/Cargo.toml @@ -49,7 +49,6 @@ services-http = ["opendal/services-http"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-ipmfs = ["opendal/services-ipmfs"] -services-libsql = ["opendal/services-libsql"] services-memcached = ["opendal/services-memcached"] services-memory = ["opendal/services-memory"] services-mini-moka = ["opendal/services-mini-moka"] @@ -80,9 +79,9 @@ services-webhdfs = ["opendal/services-webhdfs"] [dependencies] arbitrary = { version = "1.3.0", features = ["derive"] } libfuzzer-sys = "0.4" -opendal = { path = "..", features = ["tests"] } log = { version = "0.4.22" } logforth = { version = "0.21.0", default-features = false } +opendal = { path = "..", features = ["tests"] } uuid = { version = "1", features = ["v4"] } [[bin]] diff --git a/core/src/services/libsql/backend.rs b/core/src/services/libsql/backend.rs deleted file mode 100644 index 1867d3bbbfaf..000000000000 --- a/core/src/services/libsql/backend.rs +++ /dev/null @@ -1,394 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fmt::Debug; -use std::str; - -use bytes::Buf; -use bytes::Bytes; -use hrana_client_proto::pipeline::ClientMsg; -use hrana_client_proto::pipeline::Response; -use hrana_client_proto::pipeline::ServerMsg; -use hrana_client_proto::pipeline::StreamExecuteReq; -use hrana_client_proto::pipeline::StreamExecuteResult; -use hrana_client_proto::pipeline::StreamRequest; -use hrana_client_proto::pipeline::StreamResponse; -use hrana_client_proto::pipeline::StreamResponseError; -use hrana_client_proto::pipeline::StreamResponseOk; -use hrana_client_proto::Error as PipelineError; -use hrana_client_proto::Stmt; -use hrana_client_proto::StmtResult; -use hrana_client_proto::Value; -use http::Request; -use http::Uri; - -use super::error::parse_error; -use crate::raw::adapters::kv; -use crate::raw::*; -use crate::services::LibsqlConfig; -use crate::*; - -impl Configurator for LibsqlConfig { - type Builder = LibsqlBuilder; - fn into_builder(self) -> Self::Builder { - LibsqlBuilder { config: self } - } -} - -#[doc = include_str!("docs.md")] -#[derive(Default)] -pub struct LibsqlBuilder { - config: LibsqlConfig, -} - -impl Debug for LibsqlBuilder { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut d = f.debug_struct("LibsqlBuilder"); - - d.field("config", &self.config); - d.finish() - } -} - -impl LibsqlBuilder { - /// Set the connection_string of the libsql service. - /// - /// This connection string is used to connect to the libsql service. There are url based formats: - /// - /// ## Url - /// - /// This format resembles the url format of the libsql client. - /// - /// for a remote database connection: - /// - /// - `http://example.com/db` - /// - `https://example.com/db` - /// - `libsql://example.com/db` - pub fn connection_string(mut self, v: &str) -> Self { - if !v.is_empty() { - self.config.connection_string = Some(v.to_string()); - } - self - } - - /// set the authentication token for libsql service. - /// - /// default: no authentication token - pub fn auth_token(mut self, auth_token: &str) -> Self { - if !auth_token.is_empty() { - self.config.auth_token = Some(auth_token.to_owned()); - } - self - } - - /// set the working directory, all operations will be performed under it. - /// - /// default: "/" - pub fn root(mut self, root: &str) -> Self { - self.config.root = if root.is_empty() { - None - } else { - Some(root.to_string()) - }; - - self - } - - /// Set the table name of the libsql service to read/write. - pub fn table(mut self, table: &str) -> Self { - if !table.is_empty() { - self.config.table = Some(table.to_string()); - } - self - } - - /// Set the key field name of the libsql service to read/write. - /// - /// Default to `key` if not specified. - pub fn key_field(mut self, key_field: &str) -> Self { - if !key_field.is_empty() { - self.config.key_field = Some(key_field.to_string()); - } - self - } - - /// Set the value field name of the libsql service to read/write. - /// - /// Default to `value` if not specified. - pub fn value_field(mut self, value_field: &str) -> Self { - if !value_field.is_empty() { - self.config.value_field = Some(value_field.to_string()); - } - self - } -} - -impl Builder for LibsqlBuilder { - const SCHEME: Scheme = Scheme::Libsql; - type Config = LibsqlConfig; - - fn build(self) -> Result { - let conn = self.get_connection_string()?; - - let table = match self.config.table.clone() { - Some(v) => v, - None => { - return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty") - .with_context("service", Scheme::Libsql)) - } - }; - let key_field = match self.config.key_field.clone() { - Some(v) => v, - None => "key".to_string(), - }; - let value_field = match self.config.value_field.clone() { - Some(v) => v, - None => "value".to_string(), - }; - let root = normalize_root( - self.config - .root - .clone() - .unwrap_or_else(|| "/".to_string()) - .as_str(), - ); - - let client = HttpClient::new().map_err(|err| { - err.with_operation("Builder::build") - .with_context("service", Scheme::Libsql) - })?; - - Ok(LibsqlBackend::new(Adapter { - client, - connection_string: conn, - auth_token: self.config.auth_token.clone(), - table, - key_field, - value_field, - }) - .with_normalized_root(root)) - } -} - -impl LibsqlBuilder { - fn get_connection_string(&self) -> Result { - let connection_string = - self.config.connection_string.clone().ok_or_else(|| { - Error::new(ErrorKind::ConfigInvalid, "connection_string is empty") - })?; - - let ep_url = connection_string - .replace("libsql://", "https://") - .parse::() - .map_err(|e| { - Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid") - .with_context("service", Scheme::Libsql) - .with_context("connection_string", connection_string) - .set_source(e) - })?; - - match ep_url.scheme_str() { - None => Ok(format!("https://{ep_url}/")), - Some("http") | Some("https") => Ok(ep_url.to_string()), - Some(s) => Err( - Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme") - .with_context("service", Scheme::Libsql) - .with_context("scheme", s), - ), - } - } -} - -/// Backend for libsql service -pub type LibsqlBackend = kv::Backend; - -#[derive(Clone)] -pub struct Adapter { - client: HttpClient, - connection_string: String, - auth_token: Option, - - table: String, - key_field: String, - value_field: String, -} - -impl Debug for Adapter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("LibsqlAdapter"); - ds.field("connection_string", &self.connection_string) - .field("table", &self.table) - .field("key_field", &self.key_field) - .field("value_field", &self.value_field); - - if self.auth_token.is_some() { - ds.field("auth_token", &""); - } - - ds.finish() - } -} - -impl Adapter { - async fn execute(&self, sql: String, args: Vec) -> Result { - let url = format!("{}v2/pipeline", self.connection_string); - - let mut req = Request::post(&url); - - if let Some(auth_token) = self.auth_token.clone() { - req = req.header("Authorization", format!("Bearer {}", auth_token)); - } - - let msg = ClientMsg { - baton: None, - requests: vec![StreamRequest::Execute(StreamExecuteReq { - stmt: Stmt { - sql, - args, - named_args: vec![], - want_rows: true, - }, - })], - }; - let body = serde_json::to_string(&msg).map_err(|err| { - Error::new(ErrorKind::Unexpected, "failed to serialize request") - .with_context("service", Scheme::Libsql) - .set_source(err) - })?; - - let req = req - .body(Buffer::from(Bytes::from(body))) - .map_err(new_request_build_error)?; - - let resp = self.client.send(req).await?; - - if resp.status() != http::StatusCode::OK { - return Err(parse_error(resp)); - } - - let bs = resp.into_body(); - - let resp: ServerMsg = serde_json::from_reader(bs.reader()).map_err(|e| { - Error::new(ErrorKind::Unexpected, "deserialize json from response").set_source(e) - })?; - - if resp.results.is_empty() { - return Err(Error::new( - ErrorKind::Unexpected, - "Unexpected empty response from server", - )); - } - - if resp.results.len() > 1 { - return Err(Error::new( - ErrorKind::Unexpected, - "Unexpected multiple response from server", - )); - } - - Ok(resp) - } -} - -impl kv::Adapter for Adapter { - type Scanner = (); - - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::Libsql, - &self.table, - Capability { - read: true, - write: true, - delete: true, - shared: true, - ..Default::default() - }, - ) - } - - async fn get(&self, path: &str) -> Result> { - let query = format!( - "SELECT {} FROM {} WHERE `{}` = ? LIMIT 1", - self.value_field, self.table, self.key_field - ); - let mut resp = self.execute(query, vec![Value::from(path)]).await?; - - match resp.results.swap_remove(0) { - Response::Ok(StreamResponseOk { - response: - StreamResponse::Execute(StreamExecuteResult { - result: StmtResult { cols: _, rows, .. }, - }), - }) => { - if rows.is_empty() || rows[0].is_empty() { - Ok(None) - } else { - let val = &rows[0][0]; - match val { - Value::Null => Ok(None), - Value::Blob { value } => Ok(Some(Buffer::from(value.to_vec()))), - _ => Err(Error::new(ErrorKind::Unexpected, "invalid value type")), - } - } - } - Response::Ok(_) => Err(Error::new( - ErrorKind::Unexpected, - "Unexpected response from server", - )), - Response::Error(StreamResponseError { - error: PipelineError { message }, - }) => Err(Error::new( - ErrorKind::Unexpected, - format!("get failed: {}", message).as_str(), - )), - } - } - - async fn set(&self, path: &str, value: Buffer) -> Result<()> { - let query = format!( - "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES (?, ?)", - self.table, self.key_field, self.value_field - ); - let mut resp = self - .execute(query, vec![Value::from(path), Value::from(value.to_vec())]) - .await?; - match resp.results.swap_remove(0) { - Response::Ok(_) => Ok(()), - Response::Error(StreamResponseError { - error: PipelineError { message }, - }) => Err(Error::new( - ErrorKind::Unexpected, - format!("set failed: {}", message).as_str(), - )), - } - } - - async fn delete(&self, path: &str) -> Result<()> { - let query = format!("DELETE FROM {} WHERE `{}` = ?", self.table, self.key_field); - let mut resp = self.execute(query, vec![Value::from(path)]).await?; - match resp.results.swap_remove(0) { - Response::Ok(_) => Ok(()), - Response::Error(StreamResponseError { - error: PipelineError { message }, - }) => Err(Error::new( - ErrorKind::Unexpected, - format!("delete failed: {}", message).as_str(), - )), - } - } -} diff --git a/core/src/services/libsql/config.rs b/core/src/services/libsql/config.rs deleted file mode 100644 index a8c953e6976e..000000000000 --- a/core/src/services/libsql/config.rs +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fmt::Debug; -use std::fmt::Formatter; - -use serde::Deserialize; -use serde::Serialize; - -/// Config for Libsql services support. -#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] -#[serde(default)] -#[non_exhaustive] -pub struct LibsqlConfig { - /// Connection string for libsql service. - pub connection_string: Option, - /// Authentication token for libsql service. - pub auth_token: Option, - - /// Table name for libsql service. - pub table: Option, - /// Key field name for libsql service. - pub key_field: Option, - /// Value field name for libsql service. - pub value_field: Option, - /// Root for libsql service. - pub root: Option, -} - -impl Debug for LibsqlConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("LibsqlConfig"); - ds.field("connection_string", &self.connection_string) - .field("table", &self.table) - .field("key_field", &self.key_field) - .field("value_field", &self.value_field) - .field("root", &self.root); - - if self.auth_token.is_some() { - ds.field("auth_token", &""); - } - - ds.finish() - } -} diff --git a/core/src/services/libsql/docs.md b/core/src/services/libsql/docs.md deleted file mode 100644 index 61ad5b3cf595..000000000000 --- a/core/src/services/libsql/docs.md +++ /dev/null @@ -1,49 +0,0 @@ -## Capabilities - -This service can be used to: - -- [x] stat -- [x] read -- [x] write -- [x] create_dir -- [x] delete -- [ ] copy -- [ ] rename -- [ ] ~~list~~ -- [ ] ~~presign~~ -- [ ] blocking - -## Configuration - -- `root`: Set the working directory of `OpenDAL` -- `connection_string`: Set the connection string for libsql server -- `auth_token`: Set the authentication token for libsql server -- `table`: Set the table of libsql -- `key_field`: Set the key field of libsql -- `value_field`: Set the value field of libsql - -## Example - -### Via Builder - -```rust,no_run -use anyhow::Result; -use opendal::services::Libsql; -use opendal::Operator; - -#[tokio::main] -async fn main() -> Result<()> { - let mut builder = Libsql::default() - .root("/") - .connection_string("https://example.com/db") - .auth_token("secret") - .table("your_table") - // key field type in the table should be compatible with Rust's &str like text - .key_field("key") - // value field type in the table should be compatible with Rust's Vec like bytea - .value_field("value"); - - let op = Operator::new(builder)?.finish(); - Ok(()) -} -``` diff --git a/core/src/services/libsql/error.rs b/core/src/services/libsql/error.rs deleted file mode 100644 index 54af90a53702..000000000000 --- a/core/src/services/libsql/error.rs +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use http::Response; -use http::StatusCode; - -use crate::raw::*; -use crate::*; - -/// Parse error response into Error. -pub(super) fn parse_error(resp: Response) -> Error { - let (parts, body) = resp.into_parts(); - let bs = body.to_bytes(); - - let (kind, retryable) = match parts.status { - StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), - StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), - StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { - (ErrorKind::ConditionNotMatch, false) - } - StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, false), - }; - - let mut message = String::from_utf8_lossy(&bs).into_owned(); - - // If there is no body here, fill with http response code. - if message.is_empty() { - message = format!("Error response code: {}", parts.status); - } - - let mut err = Error::new(kind, &message); - - err = with_error_response_context(err, parts); - - if retryable { - err = err.set_temporary(); - } - - err -} diff --git a/core/src/services/libsql/mod.rs b/core/src/services/libsql/mod.rs deleted file mode 100644 index 7e859d5fdb49..000000000000 --- a/core/src/services/libsql/mod.rs +++ /dev/null @@ -1,27 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#[cfg(feature = "services-libsql")] -mod error; - -#[cfg(feature = "services-libsql")] -mod backend; -#[cfg(feature = "services-libsql")] -pub use backend::LibsqlBuilder as Libsql; - -mod config; -pub use config::LibsqlConfig; diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 0437dff4a759..a84bb2097984 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -121,9 +121,6 @@ pub use koofr::*; mod lakefs; pub use lakefs::*; -mod libsql; -pub use libsql::*; - mod memcached; pub use memcached::*; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 4393cd5e0206..7c393649b4ed 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -230,8 +230,6 @@ impl Operator { Scheme::Ipmfs => Self::from_iter::(iter)?.finish(), #[cfg(feature = "services-icloud")] Scheme::Icloud => Self::from_iter::(iter)?.finish(), - #[cfg(feature = "services-libsql")] - Scheme::Libsql => Self::from_iter::(iter)?.finish(), #[cfg(feature = "services-memcached")] Scheme::Memcached => Self::from_iter::(iter)?.finish(), #[cfg(feature = "services-memory")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index c0da5219b829..ad34feddde08 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -123,8 +123,6 @@ pub enum Scheme { Redis, /// [postgresql][crate::services::Postgresql]: Postgresql services Postgresql, - /// [libsql][crate::services::Libsql]: Libsql services - Libsql, /// [mysql][crate::services::Mysql]: Mysql services Mysql, /// [sqlite][crate::services::Sqlite]: Sqlite services @@ -247,8 +245,6 @@ impl Scheme { Scheme::Ipmfs, #[cfg(feature = "services-icloud")] Scheme::Icloud, - #[cfg(feature = "services-libsql")] - Scheme::Libsql, #[cfg(feature = "services-memcached")] Scheme::Memcached, #[cfg(feature = "services-memory")] @@ -375,7 +371,6 @@ impl FromStr for Scheme { "ipmfs" => Ok(Scheme::Ipmfs), "icloud" => Ok(Scheme::Icloud), "koofr" => Ok(Scheme::Koofr), - "libsql" => Ok(Scheme::Libsql), "memcached" => Ok(Scheme::Memcached), "memory" => Ok(Scheme::Memory), "mysql" => Ok(Scheme::Mysql), @@ -446,7 +441,6 @@ impl From for &'static str { Scheme::Ipmfs => "ipmfs", Scheme::Icloud => "icloud", Scheme::Koofr => "koofr", - Scheme::Libsql => "libsql", Scheme::Memcached => "memcached", Scheme::Memory => "memory", Scheme::MiniMoka => "mini_moka",