Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an RDS kvbackend #4404

Open
1 of 2 tasks
WenyXu opened this issue Jul 22, 2024 · 15 comments
Open
1 of 2 tasks

Implement an RDS kvbackend #4404

WenyXu opened this issue Jul 22, 2024 · 15 comments
Labels
C-feature Category Features

Comments

@WenyXu
Copy link
Member

WenyXu commented Jul 22, 2024

What problem does the new feature solve?

Currently, all metadata is stored in etcd. If we implement an RDS key-value backend, we can allow users to deploy a GreptimeDB cluster without relying on etcd.

What does the feature do?

Implement an kvbackend using RDS(e.g., MySQL, Postgres) .

Implementation challenges

It's a good idea to start by implementing an RDS-based KvBackend:

pub trait KvBackend: TxnService
where
Self::Error: ErrorExt,
{
fn name(&self) -> &str;
fn as_any(&self) -> &dyn Any;
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
async fn delete_range(
&self,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error>;
async fn batch_delete(
&self,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error>;
// The following methods are implemented based on the above methods,
// and a higher-level interface is provided for to simplify usage.
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
let req = RangeRequest::new().with_key(key.to_vec());
let mut resp = self.range(req).await?;
Ok(if resp.kvs.is_empty() {
None
} else {
Some(resp.kvs.remove(0))
})
}
/// CAS: Compares the value at the key with the given value, and if they are
/// equal, puts the new value at the key.
async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;
let txn = if expect.is_empty() {
Txn::put_if_not_exists(key, value)
} else {
Txn::compare_and_put(key, expect, value)
};
let txn_res = self.txn(txn).await?;
let success = txn_res.succeeded;
// The response is guaranteed to have at most one element.
let op_res = txn_res.responses.into_iter().next();
let prev_kv = match op_res {
Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
None => None,
};
Ok(CompareAndPutResponse { success, prev_kv })
}
/// Puts a value at a key. If `if_not_exists` is `true`, the operation
/// ensures the key does not exist before applying the PUT operation.
/// Otherwise, it simply applies the PUT operation without checking for
/// the key's existence.
async fn put_conditionally(
&self,
key: Vec<u8>,
value: Vec<u8>,
if_not_exists: bool,
) -> Result<bool, Self::Error> {
let success = if if_not_exists {
let req = CompareAndPutRequest::new()
.with_key(key)
.with_expect(vec![])
.with_value(value);
let res = self.compare_and_put(req).await?;
res.success
} else {
let req = PutRequest::new().with_key(key).with_value(value);
self.put(req).await?;
true
};
Ok(success)
}
/// Check if the key exists, not returning the value.
/// If the value is large, this method is more efficient than `get`.
async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
let resp = self.range(req).await?;
Ok(!resp.kvs.is_empty())
}
/// Returns previous key-value pair if `prev_kv` is `true`.
async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
if prev_kv {
req = req.with_prev_kv();
}
let resp = self.delete_range(req).await?;
if prev_kv {
Ok(resp.prev_kvs.into_iter().next())
} else {
Ok(None)
}
}
}

@WenyXu WenyXu added the C-feature Category Features label Jul 22, 2024
@killme2008
Copy link
Contributor

@WenyXu Needs more detailed info for developers. For example, the key component or code segment etc.

@lyang24
Copy link
Collaborator

lyang24 commented Jul 22, 2024

This issue is very interesting may i work on this one? may i use this client https://crates.io/crates/aws-sdk-rds

@WenyXu
Copy link
Member Author

WenyXu commented Jul 22, 2024

This issue is very interesting may i work on this one?

Of course, would you like simply describe your idea or plan?

may i use this client https://crates.io/crates/aws-sdk-rds

I think it's better not rely on a particular RDS client.

@lyang24
Copy link
Collaborator

lyang24 commented Jul 22, 2024

This issue is very interesting may i work on this one?

Of course, would you like simply describe your idea or plan?

My idea is very naive, lets use postgres impl as an example:
I want to take connection parameters from the MetasrvOptions and implement the kvBackEnd interface by translating the calls into postgres CRUD sql queries backed by a postgres table.

@WenyXu
Copy link
Member Author

WenyXu commented Jul 22, 2024

My idea is very naive, lets use postgres impl as an example: I want to take connection parameters from the MetasrvOptions and implement the kvBackEnd interface by translating the calls into postgres CRUD sql queries backed by a postgres table.

Cool. BTW, this feature could be an epic; you may need to create multiple PRs to implement it and a tracking issue to track it. 👀

@lyang24
Copy link
Collaborator

lyang24 commented Jul 23, 2024

I realized we have to implement DistLock trait as well, I wonder how often are lock and unlock getting called i have two options for implement locking on RDS which one do you prefer?

normalized

key string, value bytes, lock owner string, lock expire timestamp

denormalized
kv table

key string, value bytes

lock table

key string, lock owner string, lock expire timestamp

@fengjiachun
Copy link
Collaborator

Currently, DistLock is not being used in practice. I think we can hold off on implementing it for now. What do you think? @WenyXu

@WenyXu
Copy link
Member Author

WenyXu commented Jul 23, 2024

Currently, DistLock is not being used in practice. I think we can hold off on implementing it for now. What do you think? @WenyXu

Yes, maybe we can implement an RDS kvbackend first.

@lyang24
Copy link
Collaborator

lyang24 commented Jul 24, 2024

Currently, DistLock is not being used in practice. I think we can hold off on implementing it for now. What do you think? @WenyXu

Yes, maybe we can implement an RDS kvbackend first.

Thank you for confirming. I wrote some early skeleton code. If this is the approach you wanted to see I can continue if not let me know i can adjust.
#4421

@WenyXu
Copy link
Member Author

WenyXu commented Jul 25, 2024

Thank you for confirming. I wrote some early skeleton code. If this is the approach you wanted to see I can continue if not let me know i can adjust.

LGTM, let's keep going on 🚀

@lyang24
Copy link
Collaborator

lyang24 commented Jul 29, 2024

Hey @WenyXu i have a postgres impl ready. Do you know what would be a good place to add integration tests? Also I wonder do we have benchmarks for the metasrv? I would like to modify that to include postgres as well.
#4421

@WenyXu
Copy link
Member Author

WenyXu commented Jul 29, 2024

Hey @WenyXu i have a postgres impl ready. Do you know what would be a good place to add integration tests?

We can set up a Postgres in GitHub Actions. BTW, For kv backend's integration tests, we have some test suites to ensure that all implementation has the same behaviors.

async fn test_put() {
if let Some(kv_backend) = build_kv_backend().await {
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}

Also I wonder do we have benchmarks for the metasrv? I would like to modify that to include postgres as well. #4421

We don't have the benchmarks for the metasrv for now.

@lyang24
Copy link
Collaborator

lyang24 commented Aug 1, 2024

Hey the postgres kvBackEnd is very close to complete #4421 i just have two small questions:

  1. the \0 in the kv encodings in test for an example we concat '\0' in here when prefix is not null this will cause the rust postgres driver to error out see. Is is ok for me to process key value with removing the 0 bytes in vec ? or this is uncommon and i can refactor this test?

  2. for range delete if key is b"\0" and range_end is also b"\0" does that means delete everything from the table?

@WenyXu
Copy link
Member Author

WenyXu commented Aug 1, 2024

Is is ok for me to process key value with removing the 0 bytes in vec?

Yes, I agree with you, For now, we can carefully remove the tailing \0. However, in the future, we'd be better off refactoring the RangeRequest to something like

struct RangeRequest {
    key: Option<Vec<u8>>,
    range_end: Option<Vec<u8>>,
    ...
}

or

enum RangeRequest{ 
    KeyRange(...)
    SIngleKey(...)
}

cc @fengjiachun

  1. for range delete if key is b"\0" and range_end is also b"\0" does that means delete everything from the table?

Yes, we are following the etcd's behaviors https://etcd.io/docs/v3.3/learning/api/#key-value-pair

@lyang24
Copy link
Collaborator

lyang24 commented Aug 2, 2024

Is is ok for me to process key value with removing the 0 bytes in vec?

Yes, I agree with you, For now, we can carefully remove the tailing \0. However, in the future, we'd be better off refactoring the RangeRequest to something like

struct RangeRequest {
    key: Option<Vec<u8>>,
    range_end: Option<Vec<u8>>,
    ...
}

or

enum RangeRequest{ 
    KeyRange(...)
    SIngleKey(...)
}

cc @fengjiachun

  1. for range delete if key is b"\0" and range_end is also b"\0" does that means delete everything from the table?

Yes, we are following the etcd's behaviors https://etcd.io/docs/v3.3/learning/api/#key-value-pair

Thank you, I fixed the second case on range delete. I do agree with refactoring RangeRequest. However I think the real problem i am facing is this test condition when the test is called with a actual prefix like here this will produce b"range/20" when i inject that into sql query it will look like where k >= 'range/20' in string format but the sql byte array will contain a 0 which will throw off the postgres driver.
Refactoring is great but probably wont help this case, i implement trimming of nulls at the end to bypass this case. All tests passed locally pr is ready for feedback :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-feature Category Features
Projects
None yet
Development

No branches or pull requests

4 participants