From e5a777e5fa25463c1aa317602159b9d3d395b5d9 Mon Sep 17 00:00:00 2001 From: dl239 Date: Sun, 8 Oct 2023 19:08:23 +0800 Subject: [PATCH 1/8] feat: support truncate table statement --- cases/plan/cmd.yaml | 16 ++++++++++++++++ hybridse/include/node/node_enum.h | 1 + hybridse/src/node/sql_node.cc | 1 + hybridse/src/planv2/ast_node_converter.cc | 10 ++++++++++ 4 files changed, 28 insertions(+) diff --git a/cases/plan/cmd.yaml b/cases/plan/cmd.yaml index 3ca7d89ba6f..0d91ac8a1a8 100644 --- a/cases/plan/cmd.yaml +++ b/cases/plan/cmd.yaml @@ -649,6 +649,22 @@ cases: +-cmd_type: drop function +-if_exists: true +-args: [func1] + - id: truncate_stmt + desc: truncate + sql: TRUNCATE TABLE t1; + expect: + node_tree_str: | + +-node[CMD] + +-cmd_type: truncate table + +-args: [t1] + - id: truncate_stmt_db + desc: truncate + sql: TRUNCATE TABLE db1.t1; + expect: + node_tree_str: | + +-node[CMD] + +-cmd_type: truncate table + +-args: [db1, t1] - id: exit_stmt desc: exit statement sql: EXIT; diff --git a/hybridse/include/node/node_enum.h b/hybridse/include/node/node_enum.h index 4fc914799d0..888ca9eda56 100644 --- a/hybridse/include/node/node_enum.h +++ b/hybridse/include/node/node_enum.h @@ -283,6 +283,7 @@ enum CmdType { kCmdShowFunctions, kCmdDropFunction, kCmdShowJobLog, + kCmdTruncate, kCmdFake, // not a real cmd, for testing purpose only kLastCmd = kCmdFake, }; diff --git a/hybridse/src/node/sql_node.cc b/hybridse/src/node/sql_node.cc index bc5aec55cf9..0efc2039106 100644 --- a/hybridse/src/node/sql_node.cc +++ b/hybridse/src/node/sql_node.cc @@ -75,6 +75,7 @@ static absl::flat_hash_map CreateCmdTypeNamesMap() { {CmdType::kCmdDropFunction, "drop function"}, {CmdType::kCmdShowFunctions, "show functions"}, {CmdType::kCmdShowJobLog, "show joblog"}, + {CmdType::kCmdTruncate, "truncate table"}, }; for (auto kind = 0; kind < CmdType::kLastCmd; ++kind) { DCHECK(map.find(static_cast(kind)) != map.end()); diff --git a/hybridse/src/planv2/ast_node_converter.cc b/hybridse/src/planv2/ast_node_converter.cc index 19bb0ccfc6c..305d19c0f1d 100644 --- a/hybridse/src/planv2/ast_node_converter.cc +++ b/hybridse/src/planv2/ast_node_converter.cc @@ -611,6 +611,16 @@ base::Status ConvertStatement(const zetasql::ASTStatement* statement, node::Node *output = node; break; } + case zetasql::AST_TRUNCATE_STATEMENT: { + const zetasql::ASTTruncateStatement* truncate_statement = + statement->GetAsOrNull(); + std::vector names; + CHECK_STATUS(AstPathExpressionToStringList(truncate_statement->target_path(), names)); + auto node = + dynamic_cast(node_manager->MakeCmdNode(node::CmdType::kCmdTruncate, names)); + *output = node; + break; + } case zetasql::AST_DROP_FUNCTION_STATEMENT: { const zetasql::ASTDropFunctionStatement* drop_fun_statement = statement->GetAsOrNull(); From 58384faac780e66ac9ef8bea573434c92ed46aa0 Mon Sep 17 00:00:00 2001 From: dl239 Date: Mon, 9 Oct 2023 16:06:18 +0800 Subject: [PATCH 2/8] feat: add interface --- src/base/status.h | 3 +- src/client/ns_client.cc | 13 +++++++ src/client/ns_client.h | 2 + src/client/tablet_client.cc | 14 +++++++ src/client/tablet_client.h | 2 + src/nameserver/name_server_impl.cc | 62 ++++++++++++++++++++++++++++++ src/nameserver/name_server_impl.h | 4 ++ src/proto/name_server.proto | 11 ++++++ src/proto/tablet.proto | 11 ++++++ src/sdk/sql_cluster_router.cc | 35 +++++++++++------ src/tablet/tablet_impl.cc | 8 ++++ src/tablet/tablet_impl.h | 3 ++ 12 files changed, 155 insertions(+), 13 deletions(-) diff --git a/src/base/status.h b/src/base/status.h index 4a4eb867724..ef26dc717bc 100644 --- a/src/base/status.h +++ b/src/base/status.h @@ -127,7 +127,8 @@ enum ReturnCode { kCheckParameterFailed = 331, kCreateProcedureFailedOnTablet = 332, kCreateFunctionFailedOnTablet = 333, - kOPAlreadyExists = 317, + kOPAlreadyExists = 334, + kOffsetMismatch = 335, kReplicaClusterAliasDuplicate = 400, kConnectRelicaClusterZkFailed = 401, kNotSameReplicaName = 402, diff --git a/src/client/ns_client.cc b/src/client/ns_client.cc index eb37aa2719f..8f3b8aeea2c 100644 --- a/src/client/ns_client.cc +++ b/src/client/ns_client.cc @@ -297,6 +297,19 @@ bool NsClient::DropTable(const std::string& db, const std::string& name, std::st return false; } +base::Status NsClient::TruncateTable(const std::string& db, const std::string& name) { + ::openmldb::nameserver::TruncateTableRequest request; + request.set_name(name); + request.set_db(db); + ::openmldb::nameserver::TruncateTableResponse response; + bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::TruncateTable, &request, &response, + FLAGS_request_timeout_ms, 1); + if (ok && response.code() == 0) { + return {}; + } + return {response.code(), response.msg()}; +} + bool NsClient::SyncTable(const std::string& name, const std::string& cluster_alias, uint32_t pid, std::string& msg) { ::openmldb::nameserver::SyncTableRequest request; request.set_name(name); diff --git a/src/client/ns_client.h b/src/client/ns_client.h index f069ccce2d3..d5cfefe6c9e 100644 --- a/src/client/ns_client.h +++ b/src/client/ns_client.h @@ -112,6 +112,8 @@ class NsClient : public Client { bool DropTable(const std::string& db, const std::string& name, std::string& msg); // NOLINT + base::Status TruncateTable(const std::string& db, const std::string& name); + bool SyncTable(const std::string& name, const std::string& cluster_alias, uint32_t pid, std::string& msg); // NOLINT diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 9357b23e29a..fdfdb705bbb 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -153,6 +153,20 @@ bool TabletClient::SQLBatchRequestQuery(const std::string& db, const std::string return true; } +base::Status TabletClient::TruncateTable(uint32_t tid, uint32_t pid) { + ::openmldb::api::TruncateTableRequest request; + ::openmldb::api::TruncateTableResponse response; + request.set_tid(tid); + request.set_pid(pid); + if (!client_.SendRequest(&::openmldb::api::TabletServer_Stub::TruncateTable, &request, &response, + FLAGS_request_timeout_ms, 1)) { + return {base::ReturnCode::kRPCError, "send request failed!"}; + } else if (response.code() == 0) { + return {}; + } + return {response.code(), response.msg()}; +} + base::Status TabletClient::CreateTable(const ::openmldb::api::TableMeta& table_meta) { ::openmldb::api::CreateTableRequest request; ::openmldb::api::TableMeta* table_meta_ptr = request.mutable_table_meta(); diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index f955040a157..9da5b6f97c9 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -56,6 +56,8 @@ class TabletClient : public Client { base::Status CreateTable(const ::openmldb::api::TableMeta& table_meta); + base::Status TruncateTable(uint32_t tid, uint32_t pid); + bool UpdateTableMetaForAddField(uint32_t tid, const std::vector& cols, const openmldb::common::VersionPair& pair, std::string& msg); // NOLINT diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index 862ee42d320..51f9d9213d4 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -3713,6 +3713,47 @@ void NameServerImpl::CreateTable(RpcController* controller, const CreateTableReq } } +void NameServerImpl::TruncateTable(RpcController* controller, const TruncateTableRequest* request, + TruncateTableResponse* response, Closure* done) { + brpc::ClosureGuard done_guard(done); + const std::string& db = request->db(); + const std::string& name = request->name(); + std::shared_ptr<::openmldb::nameserver::TableInfo> table_info; + std::lock_guard lock(mu_); + { + std::lock_guard lock(mu_); + if (!GetTableInfoUnlock(request->name(), request->db(), &table_info)) { + PDLOG(WARNING, "table[%s] does not exist in db [%s]", name.c_str(), db.c_str()); + response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); + response->set_msg("table does not exist"); + return; + } + if (IsExistActiveOp(db, name)) { + PDLOG(WARNING, "there is active op. db [%s] name [%s]", db.c_str(), name.c_str()); + response->set_code(::openmldb::base::ReturnCode::kOPAlreadyExists); + response->set_msg("there is active op"); + return; + } + } + for (const auto& partition : table_info->table_partition()) { + uint32_t offset = 0; + for (const auto& partition_meta : partition.partition_meta()) { + if (partition_meta.offset() != offset) { + if (offset == 0) { + offset = partition_meta.offset(); + } else { + PDLOG(WARNING, "table[%s] partition [%d] offset mismatch", name.c_str(), partition.pid()); + response->set_code(::openmldb::base::ReturnCode::kOffsetMismatch); + response->set_msg("partition offset mismatch"); + return; + } + } + } + } + response->set_code(::openmldb::base::ReturnCode::kOk); + response->set_msg("ok"); +} + bool NameServerImpl::SaveTableInfo(std::shared_ptr table_info) { std::string table_value; table_info->SerializeToString(&table_value); @@ -10568,5 +10609,26 @@ bool NameServerImpl::IsExistActiveOp(const std::string& db, const std::string& n return false; } +bool NameServerImpl::IsExistActiveOp(const std::string& db, const std::string& name) { + for (const auto& op_list : task_vec_) { + if (op_list.empty()) { + continue; + } + for (const auto& op_data : op_list) { + if (!db.empty() && op_data->op_info_.db() != db) { + continue; + } + if (!name.empty() && op_data->op_info_.name() != name) { + continue; + } + if (op_data->op_info_.task_status() == api::TaskStatus::kInited || + op_data->op_info_.task_status() == api::TaskStatus::kDoing) { + return true; + } + } + } + return false; +} + } // namespace nameserver } // namespace openmldb diff --git a/src/nameserver/name_server_impl.h b/src/nameserver/name_server_impl.h index 4bfe84ad5f4..be3eed2be8d 100644 --- a/src/nameserver/name_server_impl.h +++ b/src/nameserver/name_server_impl.h @@ -160,6 +160,9 @@ class NameServerImpl : public NameServer { void DropTable(RpcController* controller, const DropTableRequest* request, GeneralResponse* response, Closure* done); + void TruncateTable(RpcController* controller, const TruncateTableRequest* request, + TruncateTableResponse* response, Closure* done); + void AddTableField(RpcController* controller, const AddTableFieldRequest* request, GeneralResponse* response, Closure* done); @@ -683,6 +686,7 @@ class NameServerImpl : public NameServer { bool IsExistDataBase(const std::string& db); bool IsExistActiveOp(const std::string& db, const std::string& name, api::OPType op_type); + bool IsExistActiveOp(const std::string& db, const std::string& name); private: std::mutex mu_; diff --git a/src/proto/name_server.proto b/src/proto/name_server.proto index b0eb526d8e7..18f3043c599 100755 --- a/src/proto/name_server.proto +++ b/src/proto/name_server.proto @@ -121,6 +121,16 @@ message DropTableRequest { optional string db = 4 [default = ""]; } +message TruncateTableRequest { + optional string name = 1; + optional string db = 2; +} + +message TruncateTableResponse { + optional int32 code = 1; + optional string msg = 2; +} + message LoadTableRequest { optional string name = 1; optional string endpoint = 2; @@ -529,6 +539,7 @@ message DeploySQLResponse { service NameServer { rpc CreateTable(CreateTableRequest) returns (GeneralResponse); rpc DropTable(DropTableRequest) returns (GeneralResponse); + rpc TruncateTable(TruncateTableRequest) returns (TruncateTableResponse); rpc ShowTablet(ShowTabletRequest) returns (ShowTabletResponse); rpc ShowTable(ShowTableRequest) returns (ShowTableResponse); rpc MakeSnapshotNS(MakeSnapshotNSRequest) returns (GeneralResponse); diff --git a/src/proto/tablet.proto b/src/proto/tablet.proto index 2944794b0d9..5139cf0bafe 100755 --- a/src/proto/tablet.proto +++ b/src/proto/tablet.proto @@ -363,6 +363,16 @@ message DropTableResponse { optional string msg = 2; } +message TruncateTableRequest { + optional int32 tid = 1; + optional int32 pid = 2; +} + +message TruncateTableResponse { + optional int32 code = 1; + optional string msg = 2; +} + message GetTableSchemaRequest { optional int32 tid = 1; optional int32 pid = 2; @@ -905,6 +915,7 @@ service TabletServer { rpc CreateTable(CreateTableRequest) returns (CreateTableResponse); rpc LoadTable(LoadTableRequest) returns (GeneralResponse); rpc DropTable(DropTableRequest) returns (DropTableResponse); + rpc TruncateTable(TruncateTableRequest) returns (TruncateTableResponse); rpc GetTableStatus(GetTableStatusRequest) returns (GetTableStatusResponse); rpc GetTableSchema(GetTableSchemaRequest) returns (GetTableSchemaResponse); rpc GetTableFollower(GetTableFollowerRequest) returns (GetTableFollowerResponse); diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 296cd3d5755..56a3daf1e93 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1932,13 +1932,9 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {}; std::string db_name = db; std::string table_name; - if (cmd_node->GetArgs().size() == 2) { - db_name = cmd_node->GetArgs()[0]; - table_name = cmd_node->GetArgs()[1]; - } else if (cmd_node->GetArgs().size() == 1) { - table_name = cmd_node->GetArgs()[0]; - } else { - *status = {StatusCode::kCmdError, "Invalid Cmd Args size"}; + if (!ParseNamesFromArgs(db, cmd_node->GetArgs(), &db_name, &table_name).IsOK()) { + *status = {StatusCode::kCmdError, msg}; + return {}; } if (!CheckAnswerIfInteractive("table", table_name)) { return {}; @@ -1948,6 +1944,23 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h } return {}; } + case hybridse::node::kCmdTruncate: { + *status = {}; + std::string db_name; + std::string table_name; + if (!ParseNamesFromArgs(db, cmd_node->GetArgs(), &db_name, &table_name).IsOK()) { + *status = {StatusCode::kCmdError, msg}; + return {}; + } + if (!CheckAnswerIfInteractive("truncate", table_name)) { + return {}; + } + auto base_status = ns_ptr->TruncateTable(db_name, table_name); + if (!base_status.OK()) { + *status = {StatusCode::kCmdError, base_status.GetMsg()}; + } + return {}; + } case hybridse::node::kCmdDropIndex: { std::string db_name = db; std::string table_name; @@ -2989,18 +3002,16 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod } ::hybridse::sdk::Status SQLClusterRouter::ParseNamesFromArgs(const std::string& db, - const std::vector& args, std::string* db_name, - std::string* sp_name) { + const std::vector& args, std::string* db_name, std::string* target_name) { if (args.size() == 1) { - // only sp name, no db_name if (db.empty()) { return {StatusCode::kCmdError, "Please enter database first"}; } *db_name = db; - *sp_name = args[0]; + *target_name = args[0]; } else if (args.size() == 2) { *db_name = args[0]; - *sp_name = args[1]; + *target_name = args[1]; } else { return {StatusCode::kCmdError, "Invalid args"}; } diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index f30f1f8b74b..26a4d4b2bf6 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -3458,6 +3458,14 @@ void TabletImpl::CreateTable(RpcController* controller, const ::openmldb::api::C response->set_msg("ok"); } +void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api::TruncateTableRequest* request, + ::openmldb::api::TruncateTableResponse* response, Closure* done) { + brpc::ClosureGuard done_guard(done); + + response->set_code(::openmldb::base::ReturnCode::kOk); + response->set_msg("ok"); +} + void TabletImpl::ExecuteGc(RpcController* controller, const ::openmldb::api::ExecuteGcRequest* request, ::openmldb::api::GeneralResponse* response, Closure* done) { brpc::ClosureGuard done_guard(done); diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index d48f192ae26..8b6b92c57a8 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -109,6 +109,9 @@ class TabletImpl : public ::openmldb::api::TabletServer { void DropTable(RpcController* controller, const ::openmldb::api::DropTableRequest* request, ::openmldb::api::DropTableResponse* response, Closure* done); + void TruncateTable(RpcController* controller, const ::openmldb::api::TruncateTableRequest* request, + ::openmldb::api::TruncateTableResponse* response, Closure* done); + void Refresh(RpcController* controller, const ::openmldb::api::RefreshRequest* request, ::openmldb::api::GeneralResponse* response, Closure* done); From 50a80936ab79e2e818b4aa4dacfaf4bfacb886b1 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 17 Oct 2023 10:30:14 +0800 Subject: [PATCH 3/8] feat: truncate mem table --- src/base/status.h | 2 + src/catalog/tablet_catalog.cc | 2 +- src/nameserver/name_server_impl.cc | 24 ++++++++- src/storage/mem_table_snapshot.cc | 34 ++++++++++++ src/storage/mem_table_snapshot.h | 2 + src/tablet/tablet_impl.cc | 86 ++++++++++++++++++++++++++++-- 6 files changed, 144 insertions(+), 6 deletions(-) diff --git a/src/base/status.h b/src/base/status.h index ef26dc717bc..d608276f02d 100644 --- a/src/base/status.h +++ b/src/base/status.h @@ -129,6 +129,8 @@ enum ReturnCode { kCreateFunctionFailedOnTablet = 333, kOPAlreadyExists = 334, kOffsetMismatch = 335, + kGetTabletFailed = 336, + kTruncateTableFailed = 337, kReplicaClusterAliasDuplicate = 400, kConnectRelicaClusterZkFailed = 401, kNotSameReplicaName = 402, diff --git a/src/catalog/tablet_catalog.cc b/src/catalog/tablet_catalog.cc index a9e74ff7061..af9576cad79 100644 --- a/src/catalog/tablet_catalog.cc +++ b/src/catalog/tablet_catalog.cc @@ -213,7 +213,7 @@ void TabletTableHandler::AddTable(std::shared_ptr<::openmldb::storage::Table> ta do { old_tables = std::atomic_load_explicit(&tables_, std::memory_order_acquire); new_tables = std::make_shared(*old_tables); - new_tables->emplace(table->GetPid(), table); + new_tables->insert_or_assign(table->GetPid(), table); } while (!atomic_compare_exchange_weak(&tables_, &old_tables, new_tables)); } diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index 51f9d9213d4..92a79860433 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -3719,7 +3719,6 @@ void NameServerImpl::TruncateTable(RpcController* controller, const TruncateTabl const std::string& db = request->db(); const std::string& name = request->name(); std::shared_ptr<::openmldb::nameserver::TableInfo> table_info; - std::lock_guard lock(mu_); { std::lock_guard lock(mu_); if (!GetTableInfoUnlock(request->name(), request->db(), &table_info)) { @@ -3735,6 +3734,7 @@ void NameServerImpl::TruncateTable(RpcController* controller, const TruncateTabl return; } } + uint32_t tid = table_info->tid(); for (const auto& partition : table_info->table_partition()) { uint32_t offset = 0; for (const auto& partition_meta : partition.partition_meta()) { @@ -3750,6 +3750,28 @@ void NameServerImpl::TruncateTable(RpcController* controller, const TruncateTabl } } } + for (const auto& partition : table_info->table_partition()) { + uint32_t pid = partition.pid(); + for (const auto& partition_meta : partition.partition_meta()) { + const auto& endpoint = partition_meta.endpoint(); + auto tablet_ptr = GetTablet(endpoint); + if (!tablet_ptr) { + PDLOG(WARNING, "endpoint[%s] can not find client", endpoint.c_str()); + response->set_code(::openmldb::base::ReturnCode::kGetTabletFailed); + response->set_msg("fail to get client, endpint " + endpoint); + return; + } + auto status = tablet_ptr->client_->TruncateTable(tid, pid); + if (!status.OK()) { + PDLOG(WARNING, "truncate failed, tid[%u] pid[%u] endpoint[%s] msg [%s]", + tid, pid, endpoint.c_str(), status.GetMsg().c_str()); + response->set_code(::openmldb::base::ReturnCode::kTruncateTableFailed); + response->set_msg(status.GetMsg()); + return; + } + } + } + PDLOG(INFO, "truncate success, db[%u] name[%u]", db.c_str(), name.c_str()); response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); } diff --git a/src/storage/mem_table_snapshot.cc b/src/storage/mem_table_snapshot.cc index 3eaacd3e2ac..2b085df59be 100644 --- a/src/storage/mem_table_snapshot.cc +++ b/src/storage/mem_table_snapshot.cc @@ -1041,5 +1041,39 @@ ::openmldb::base::Status MemTableSnapshot::ExtractIndexData(const std::shared_pt return status; } +int MemTableSnapshot::Truncate(uint64_t offset, uint64_t term) { + if (making_snapshot_.load(std::memory_order_acquire)) { + PDLOG(INFO, "snapshot is doing now!"); + return -1; + } + if (offset < offset_) { + PDLOG(WARNING, "end_offset %lu less than offset_ %lu, do nothing", offset, offset_); + return -1; + } + making_snapshot_.store(true, std::memory_order_release); + absl::Cleanup clean = [this] { + this->making_snapshot_.store(false, std::memory_order_release); + this->delete_collector_.Clear(); + }; + MemSnapshotMeta snapshot_meta(GenSnapshotName(), snapshot_path_, FLAGS_snapshot_compression); + snapshot_meta.term = term; + snapshot_meta.count = 0; + snapshot_meta.offset = offset; + auto wh = ::openmldb::log::CreateWriteHandle(FLAGS_snapshot_compression, + snapshot_meta.snapshot_name, snapshot_meta.tmp_file_path); + if (!wh) { + PDLOG(WARNING, "fail to create file %s", snapshot_meta.tmp_file_path.c_str()); + return -1; + } + wh->EndLog(); + wh.reset(); + auto status = WriteSnapshot(snapshot_meta); + if (!status.OK()) { + PDLOG(WARNING, "write snapshot failed. tid %u pid %u msg is %s ", tid_, pid_, status.GetMsg().c_str()); + return -1; + } + return 0; +} + } // namespace storage } // namespace openmldb diff --git a/src/storage/mem_table_snapshot.h b/src/storage/mem_table_snapshot.h index caad8fd182c..54da3a8c8c4 100644 --- a/src/storage/mem_table_snapshot.h +++ b/src/storage/mem_table_snapshot.h @@ -192,6 +192,8 @@ class MemTableSnapshot : public Snapshot { int CheckDeleteAndUpdate(std::shared_ptr table, ::openmldb::api::LogEntry* new_entry); + int Truncate(uint64_t offset, uint64_t term); + private: // load single snapshot to table void RecoverSingleSnapshot(const std::string& path, std::shared_ptr
table, std::atomic* g_succ_cnt, diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 26a4d4b2bf6..ec1e87ab01c 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -3461,7 +3461,87 @@ void TabletImpl::CreateTable(RpcController* controller, const ::openmldb::api::C void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api::TruncateTableRequest* request, ::openmldb::api::TruncateTableResponse* response, Closure* done) { brpc::ClosureGuard done_guard(done); + uint32_t tid = request->tid(); + uint32_t pid = request->pid(); + std::shared_ptr
table; + std::shared_ptr snapshot; + std::shared_ptr replicator; + { + std::lock_guard spin_lock(spin_mutex_); + table = GetTableUnLock(tid, pid); + if (!table) { + DEBUGLOG("table does not exist. tid %u pid %u", tid, pid); + response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); + response->set_msg("table not found"); + return; + } + snapshot = GetSnapshotUnLock(tid, pid); + if (!snapshot) { + PDLOG(WARNING, "snapshot does not exist. tid[%u] pid[%u]", tid, pid); + response->set_code(::openmldb::base::ReturnCode::kSnapshotIsNotExist); + response->set_msg("snapshot not found"); + return; + } + replicator = GetReplicatorUnLock(tid, pid); + if (!replicator) { + PDLOG(WARNING, "replicator does not exist. tid[%u] pid[%u]", tid, pid); + response->set_code(::openmldb::base::ReturnCode::kReplicatorIsNotExist); + response->set_msg("replicator not found"); + return; + } + } + if (replicator->GetOffset() == 0) { + PDLOG(INFO, "table is empty, truncate success. tid[%u] pid[%u]", tid, pid); + response->set_code(::openmldb::base::ReturnCode::kOk); + response->set_msg("ok"); + return; + } + if (table->GetTableStat() == ::openmldb::storage::kMakingSnapshot) { + PDLOG(WARNING, "making snapshot task is running now. tid[%u] pid[%u]", tid, pid); + response->set_code(::openmldb::base::ReturnCode::kTableStatusIsKmakingsnapshot); + response->set_msg("table status is kMakingSnapshot"); + return; + } else if (table->GetTableStat() == ::openmldb::storage::kLoading) { + PDLOG(WARNING, "table is loading now. tid[%u] pid[%u]", tid, pid); + response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); + response->set_msg("table is loading data"); + return; + } + auto table_meta = table->GetTableMeta(); + std::shared_ptr
new_table; + if (table->GetStorageMode() == openmldb::common::kMemory) { + new_table = std::make_shared(*table_meta); + } else { + std::string db_root_path; + if (!ChooseDBRootPath(tid, pid, table_meta->storage_mode(), db_root_path)) { + PDLOG(WARNING, "fail to get table db root path"); + response->set_msg("fail to get table db root path"); + return; + } + std::string table_db_path = GetDBPath(db_root_path, tid, pid); + new_table = std::make_shared(*table_meta, table_db_path); + } + if (!new_table->Init()) { + PDLOG(WARNING, "fail to init table. tid %u, pid %u", table_meta->tid(), table_meta->pid()); + response->set_msg("fail to init table"); + return; + } + new_table->SetTableStat(::openmldb::storage::kNormal); + { + std::lock_guard spin_lock(spin_mutex_); + tables_[tid].insert_or_assign(pid, new_table); + } + auto mem_snapshot = std::dynamic_pointer_cast(snapshot); + mem_snapshot->Truncate(replicator->GetOffset(), replicator->GetLeaderTerm()); + if (table_meta->mode() == ::openmldb::api::TableMode::kTableLeader) { + if (catalog_->AddTable(*table_meta, new_table)) { + LOG(INFO) << "add table " << table_meta->name() << " to catalog with db " << table_meta->db(); + } else { + LOG(WARNING) << "fail to add table " << table_meta->name() << " to catalog with db " << table_meta->db(); + } + } + PDLOG(INFO, "truncate table success. tid[%u] pid[%u]", tid, pid); response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); } @@ -3875,13 +3955,11 @@ int TabletImpl::CreateTableInternal(const ::openmldb::api::TableMeta* table_meta return -1; } std::string table_db_path = GetDBPath(db_root_path, tid, pid); - Table* table_ptr; if (table_meta->storage_mode() == openmldb::common::kMemory) { - table_ptr = new MemTable(*table_meta); + table = std::make_shared(*table_meta); } else { - table_ptr = new DiskTable(*table_meta, table_db_path); + table = std::make_shared(*table_meta, table_db_path); } - table.reset(table_ptr); if (!table->Init()) { PDLOG(WARNING, "fail to init table. tid %u, pid %u", table_meta->tid(), table_meta->pid()); From 9f8ec7fbb55601ab5be9ea24943807946e6252d7 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 17 Oct 2023 15:55:01 +0800 Subject: [PATCH 4/8] feat: truncate disk table --- src/storage/disk_table.cc | 42 +++++++++++++++++++++++++++++++ src/storage/disk_table.h | 2 ++ src/tablet/tablet_impl.cc | 53 +++++++++++++++++++-------------------- 3 files changed, 70 insertions(+), 27 deletions(-) diff --git a/src/storage/disk_table.cc b/src/storage/disk_table.cc index 8f508bac6c5..a19917503d8 100644 --- a/src/storage/disk_table.cc +++ b/src/storage/disk_table.cc @@ -363,6 +363,48 @@ bool DiskTable::Get(uint32_t idx, const std::string& pk, uint64_t ts, std::strin bool DiskTable::Get(const std::string& pk, uint64_t ts, std::string& value) { return Get(0, pk, ts, value); } +base::Status DiskTable::Truncate() { + const rocksdb::Snapshot* snapshot = db_->GetSnapshot(); + absl::Cleanup release_snapshot = [this, snapshot] { this->db_->ReleaseSnapshot(snapshot); }; + rocksdb::ReadOptions ro = rocksdb::ReadOptions(); + ro.snapshot = snapshot; + ro.prefix_same_as_start = true; + ro.pin_data = true; + rocksdb::WriteBatch batch; + for (const auto& inner_index : *(table_index_.GetAllInnerIndex())) { + uint32_t idx = inner_index->GetId(); + std::unique_ptr it(db_->NewIterator(ro, cf_hs_[idx + 1])); + it->SeekToFirst(); + if (it->Valid()) { + std::string start_key(it->key().data(), it->key().size()); + it->SeekToLast(); + if (it->Valid()) { + rocksdb::Slice cur_pk; + uint64_t ts = 0; + uint32_t ts_idx = 0; + const auto& indexs = inner_index->GetIndex(); + std::string end_key; + if (indexs.size() > 1) { + ParseKeyAndTs(true, it->key(), &cur_pk, &ts, &ts_idx); + end_key = CombineKeyTs(cur_pk, 0, ts_idx); + } else { + ParseKeyAndTs(false, it->key(), &cur_pk, &ts, &ts_idx); + end_key = CombineKeyTs(cur_pk, 0); + } + PDLOG(INFO, "delete range. start key %s end key %s inner idx %u tid %u pid %u", + start_key.c_str(), end_key.c_str(), idx, id_, pid_); + batch.DeleteRange(cf_hs_[idx + 1], rocksdb::Slice(start_key), rocksdb::Slice(end_key)); + } + } + } + rocksdb::Status s = db_->Write(write_opts_, &batch); + if (!s.ok()) { + PDLOG(WARNING, "delete failed, tid %u pid %u msg %s", id_, pid_, s.ToString().c_str()); + return {-1, s.ToString()}; + } + return {}; +} + void DiskTable::SchedGc() { GcHead(); UpdateTTL(); diff --git a/src/storage/disk_table.h b/src/storage/disk_table.h index 20f25f9a7ae..913833a7a0f 100644 --- a/src/storage/disk_table.h +++ b/src/storage/disk_table.h @@ -181,6 +181,8 @@ class DiskTable : public Table { bool Delete(const ::openmldb::api::LogEntry& entry) override; + base::Status Truncate(); + uint64_t GetExpireTime(const TTLSt& ttl_st) override; uint64_t GetRecordCnt() override { diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index ec1e87ab01c..4db555a1f81 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -3507,40 +3507,39 @@ void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api: response->set_msg("table is loading data"); return; } - auto table_meta = table->GetTableMeta(); - std::shared_ptr
new_table; if (table->GetStorageMode() == openmldb::common::kMemory) { + auto table_meta = table->GetTableMeta(); + std::shared_ptr
new_table; new_table = std::make_shared(*table_meta); + if (!new_table->Init()) { + PDLOG(WARNING, "fail to init table. tid %u, pid %u", table_meta->tid(), table_meta->pid()); + response->set_msg("fail to init table"); + return; + } + new_table->SetTableStat(::openmldb::storage::kNormal); + { + std::lock_guard spin_lock(spin_mutex_); + tables_[tid].insert_or_assign(pid, new_table); + } + auto mem_snapshot = std::dynamic_pointer_cast(snapshot); + mem_snapshot->Truncate(replicator->GetOffset(), replicator->GetLeaderTerm()); + if (table_meta->mode() == ::openmldb::api::TableMode::kTableLeader) { + if (catalog_->AddTable(*table_meta, new_table)) { + LOG(INFO) << "add table " << table_meta->name() << " to catalog with db " << table_meta->db(); + } else { + LOG(WARNING) << "fail to add table " << table_meta->name() + << " to catalog with db " << table_meta->db(); + } + } } else { - std::string db_root_path; - if (!ChooseDBRootPath(tid, pid, table_meta->storage_mode(), db_root_path)) { - PDLOG(WARNING, "fail to get table db root path"); - response->set_msg("fail to get table db root path"); + auto disk_table = std::dynamic_pointer_cast(table); + if (auto status = disk_table->Truncate(); !status.OK()) { + response->set_code(::openmldb::base::ReturnCode::kTruncateTableFailed); + response->set_msg(status.GetMsg()); return; } - std::string table_db_path = GetDBPath(db_root_path, tid, pid); - new_table = std::make_shared(*table_meta, table_db_path); } - if (!new_table->Init()) { - PDLOG(WARNING, "fail to init table. tid %u, pid %u", table_meta->tid(), table_meta->pid()); - response->set_msg("fail to init table"); - return; - } - new_table->SetTableStat(::openmldb::storage::kNormal); - { - std::lock_guard spin_lock(spin_mutex_); - tables_[tid].insert_or_assign(pid, new_table); - } - auto mem_snapshot = std::dynamic_pointer_cast(snapshot); - mem_snapshot->Truncate(replicator->GetOffset(), replicator->GetLeaderTerm()); - if (table_meta->mode() == ::openmldb::api::TableMode::kTableLeader) { - if (catalog_->AddTable(*table_meta, new_table)) { - LOG(INFO) << "add table " << table_meta->name() << " to catalog with db " << table_meta->db(); - } else { - LOG(WARNING) << "fail to add table " << table_meta->name() << " to catalog with db " << table_meta->db(); - } - } PDLOG(INFO, "truncate table success. tid[%u] pid[%u]", tid, pid); response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); From b25b165aaf2e659554b6dcca13754643a3a18674 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 17 Oct 2023 17:51:46 +0800 Subject: [PATCH 5/8] feat: truncate aggrate table --- src/base/status.h | 1 + src/cmd/sql_cmd_test.cc | 41 ++++++++++++++++++++ src/storage/aggregator.h | 2 + src/tablet/tablet_impl.cc | 68 +++++++++++++++++++++------------- src/tablet/tablet_impl.h | 2 + src/tablet/tablet_impl_test.cc | 36 ++++++++++++++++++ 6 files changed, 124 insertions(+), 26 deletions(-) diff --git a/src/base/status.h b/src/base/status.h index d608276f02d..a6854e287b6 100644 --- a/src/base/status.h +++ b/src/base/status.h @@ -93,6 +93,7 @@ enum ReturnCode { kExceedMaxMemory = 160, kInvalidArgs = 161, kCheckIndexFailed = 162, + kCatalogUpdateFailed = 163, kNameserverIsNotLeader = 300, kAutoFailoverIsEnabled = 301, kEndpointIsNotExist = 302, diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index 1896ac7c674..4926fe2744c 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -1039,6 +1039,47 @@ TEST_P(DBSDKTest, DeployWithBias) { ASSERT_TRUE(cs->GetNsClient()->DropDatabase(db, msg)); } +TEST_P(DBSDKTest, Truncate) { + auto cli = GetParam(); + sr = cli->sr; + std::string db_name = "test2"; + std::string table_name = "test1"; + std::string ddl = "create table test1 (c1 string, c2 int, c3 bigint, INDEX(KEY=c1, ts=c3));"; + ProcessSQLs(sr, { + "set @@execute_mode = 'online'", + absl::StrCat("create database ", db_name, ";"), + absl::StrCat("use ", db_name, ";"), + ddl, + }); + hybridse::sdk::Status status; + sr->ExecuteSQL(absl::StrCat("truncate table ", table_name, ";"), &status); + ASSERT_TRUE(status.IsOK()) << status.ToString(); + auto res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 0); + for (int i = 0; i < 10; i++) { + std::string key = absl::StrCat("key", i); + for (int j = 0; j < 10; j++) { + uint64_t ts = 1000 + j; + sr->ExecuteSQL(absl::StrCat("insert into ", table_name, " values ('", key, "', 11, ", ts, ");"), &status); + } + } + + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 100); + sr->ExecuteSQL(absl::StrCat("truncate table ", table_name, ";"), &status); + ASSERT_TRUE(status.IsOK()) << status.ToString(); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 0); + sr->ExecuteSQL(absl::StrCat("insert into ", table_name, " values ('aa', 11, 100);"), &status); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 1); + ProcessSQLs(sr, { + absl::StrCat("use ", db_name, ";"), + absl::StrCat("drop table ", table_name), + absl::StrCat("drop database ", db_name), + }); +} + TEST_P(DBSDKTest, DeletetRange) { auto cli = GetParam(); sr = cli->sr; diff --git a/src/storage/aggregator.h b/src/storage/aggregator.h index f007ffc18e4..efcc607a32b 100644 --- a/src/storage/aggregator.h +++ b/src/storage/aggregator.h @@ -156,6 +156,8 @@ class Aggregator { // set the filter column info that not initialized in constructor bool SetFilter(absl::string_view filter_col); + std::shared_ptr
GetAggTable() { return aggr_table_; } + protected: codec::Schema base_table_schema_; codec::Schema aggr_table_schema_; diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 4db555a1f81..b9e8c90e0ed 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -3463,6 +3463,38 @@ void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api: brpc::ClosureGuard done_guard(done); uint32_t tid = request->tid(); uint32_t pid = request->pid(); + if (auto status = TruncateTableInternal(tid, pid); !status.OK()) { + base::SetResponseStatus(status, response); + return; + } + auto aggrs = GetAggregators(tid, pid); + if (aggrs) { + for (const auto& aggr : *aggrs) { + auto agg_table = aggr->GetAggTable(); + if (!agg_table) { + PDLOG(WARNING, "aggrate table does not exist. tid[%u] pid[%u] index pos[%u]", + tid, pid, aggr->GetIndexPos()); + response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); + response->set_msg("aggrate table does not exist"); + return; + } + uint32_t agg_tid = agg_table->GetId(); + uint32_t agg_pid = agg_table->GetPid(); + if (auto status = TruncateTableInternal(agg_tid, agg_pid); !status.OK()) { + PDLOG(WARNING, "truncate aggrate table failed. tid[%u] pid[%u] index pos[%u]", + agg_tid, agg_pid, aggr->GetIndexPos()); + base::SetResponseStatus(status, response); + return; + } + PDLOG(INFO, "truncate aggrate table success. tid[%u] pid[%u] index pos[%u]", + agg_tid, agg_pid, aggr->GetIndexPos()); + } + } + response->set_code(::openmldb::base::ReturnCode::kOk); + response->set_msg("ok"); +} + +base::Status TabletImpl::TruncateTableInternal(uint32_t tid, uint32_t pid) { std::shared_ptr
table; std::shared_ptr snapshot; std::shared_ptr replicator; @@ -3471,41 +3503,29 @@ void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api: table = GetTableUnLock(tid, pid); if (!table) { DEBUGLOG("table does not exist. tid %u pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table not found"); - return; + return {::openmldb::base::ReturnCode::kTableIsNotExist, "table not found"}; } snapshot = GetSnapshotUnLock(tid, pid); if (!snapshot) { PDLOG(WARNING, "snapshot does not exist. tid[%u] pid[%u]", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kSnapshotIsNotExist); - response->set_msg("snapshot not found"); - return; + return {::openmldb::base::ReturnCode::kSnapshotIsNotExist, "snapshot not found"}; } replicator = GetReplicatorUnLock(tid, pid); if (!replicator) { PDLOG(WARNING, "replicator does not exist. tid[%u] pid[%u]", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kReplicatorIsNotExist); - response->set_msg("replicator not found"); - return; + return {::openmldb::base::ReturnCode::kReplicatorIsNotExist, "replicator not found"}; } } if (replicator->GetOffset() == 0) { PDLOG(INFO, "table is empty, truncate success. tid[%u] pid[%u]", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kOk); - response->set_msg("ok"); - return; + return {}; } if (table->GetTableStat() == ::openmldb::storage::kMakingSnapshot) { PDLOG(WARNING, "making snapshot task is running now. tid[%u] pid[%u]", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableStatusIsKmakingsnapshot); - response->set_msg("table status is kMakingSnapshot"); - return; + return {::openmldb::base::ReturnCode::kTableStatusIsKmakingsnapshot, "table status is kMakingSnapshot"}; } else if (table->GetTableStat() == ::openmldb::storage::kLoading) { PDLOG(WARNING, "table is loading now. tid[%u] pid[%u]", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); - response->set_msg("table is loading data"); - return; + return {::openmldb::base::ReturnCode::kTableIsLoading, "table is loading data"}; } if (table->GetStorageMode() == openmldb::common::kMemory) { auto table_meta = table->GetTableMeta(); @@ -3513,8 +3533,7 @@ void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api: new_table = std::make_shared(*table_meta); if (!new_table->Init()) { PDLOG(WARNING, "fail to init table. tid %u, pid %u", table_meta->tid(), table_meta->pid()); - response->set_msg("fail to init table"); - return; + return {::openmldb::base::ReturnCode::kTableMetaIsIllegal, "fail to init table"}; } new_table->SetTableStat(::openmldb::storage::kNormal); { @@ -3529,20 +3548,17 @@ void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api: } else { LOG(WARNING) << "fail to add table " << table_meta->name() << " to catalog with db " << table_meta->db(); + return {::openmldb::base::ReturnCode::kCatalogUpdateFailed, "fail to update catalog"}; } } } else { auto disk_table = std::dynamic_pointer_cast(table); if (auto status = disk_table->Truncate(); !status.OK()) { - response->set_code(::openmldb::base::ReturnCode::kTruncateTableFailed); - response->set_msg(status.GetMsg()); - return; + return {::openmldb::base::ReturnCode::kTruncateTableFailed, status.GetMsg()}; } } - PDLOG(INFO, "truncate table success. tid[%u] pid[%u]", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kOk); - response->set_msg("ok"); + return {}; } void TabletImpl::ExecuteGc(RpcController* controller, const ::openmldb::api::ExecuteGcRequest* request, diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index 8b6b92c57a8..fb337d5edb5 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -335,6 +335,8 @@ class TabletImpl : public ::openmldb::api::TabletServer { uint32_t partition_num, uint64_t last_time, std::shared_ptr<::openmldb::api::TaskInfo> task); + base::Status TruncateTableInternal(uint32_t tid, uint32_t pid); + void ExtractIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table, std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot, const std::vector<::openmldb::common::ColumnKey>& column_key, diff --git a/src/tablet/tablet_impl_test.cc b/src/tablet/tablet_impl_test.cc index da5cc626bf0..70aa915e205 100644 --- a/src/tablet/tablet_impl_test.cc +++ b/src/tablet/tablet_impl_test.cc @@ -1416,6 +1416,42 @@ TEST_P(TabletImplTest, ScanWithLatestN) { ASSERT_FALSE(kv_it.Valid()); } +TEST_P(TabletImplTest, Truncate) { + ::openmldb::common::StorageMode storage_mode = GetParam(); + TabletImpl tablet; + uint32_t id = counter++; + tablet.Init(""); + ASSERT_EQ(0, CreateDefaultTable("db0", "t0", id, 1, 0, 0, kAbsoluteTime, storage_mode, &tablet)); + MockClosure closure; + for (int ts = 100; ts < 200; ts++) { + ::openmldb::api::PutRequest prequest; + PackDefaultDimension("test1", &prequest); + prequest.set_time(ts); + prequest.set_value(::openmldb::test::EncodeKV("test1", "test" + std::to_string(ts))); + prequest.set_tid(id); + prequest.set_pid(1); + ::openmldb::api::PutResponse presponse; + tablet.Put(NULL, &prequest, &presponse, &closure); + ASSERT_EQ(0, presponse.code()); + } + ::openmldb::api::TraverseRequest sr; + sr.set_tid(id); + sr.set_pid(1); + sr.set_limit(1000); + auto srp = std::make_shared<::openmldb::api::TraverseResponse>(); + tablet.Traverse(NULL, &sr, srp.get(), &closure); + ASSERT_EQ(0, srp->code()); + ASSERT_EQ(100, (signed)srp->count()); + ::openmldb::api::TruncateTableRequest tr; + tr.set_tid(id); + tr.set_pid(1); + auto trp = std::make_shared<::openmldb::api::TruncateTableResponse>(); + tablet.TruncateTable(NULL, &tr, trp.get(), &closure); + ASSERT_EQ(0, trp->code()); + tablet.Traverse(NULL, &sr, srp.get(), &closure); + ASSERT_EQ(0, srp->code()); + ASSERT_EQ(0, (signed)srp->count()); +} TEST_P(TabletImplTest, Traverse) { ::openmldb::common::StorageMode storage_mode = GetParam(); From d33d41676ca9f8646baa00b2625d2df3e804f640 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 17 Oct 2023 17:55:56 +0800 Subject: [PATCH 6/8] fix: fix log --- src/nameserver/name_server_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index 92a79860433..5b3e0276ff2 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -3771,7 +3771,7 @@ void NameServerImpl::TruncateTable(RpcController* controller, const TruncateTabl } } } - PDLOG(INFO, "truncate success, db[%u] name[%u]", db.c_str(), name.c_str()); + PDLOG(INFO, "truncate success, db[%s] name[%s]", db.c_str(), name.c_str()); response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); } From 24760848a2259dac25b403a6a7c5010737157fdb Mon Sep 17 00:00:00 2001 From: dl239 Date: Wed, 18 Oct 2023 09:53:05 +0800 Subject: [PATCH 7/8] feat: makesanpshot after truncate disk table --- src/tablet/tablet_impl.cc | 11 ++++++----- src/tablet/tablet_impl.h | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index b9e8c90e0ed..3f6db57c149 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -2481,7 +2481,7 @@ void TabletImpl::SetExpire(RpcController* controller, const ::openmldb::api::Set } void TabletImpl::MakeSnapshotInternal(uint32_t tid, uint32_t pid, uint64_t end_offset, - std::shared_ptr<::openmldb::api::TaskInfo> task) { + std::shared_ptr<::openmldb::api::TaskInfo> task, bool is_force) { PDLOG(INFO, "MakeSnapshotInternal begin, tid[%u] pid[%u]", tid, pid); std::shared_ptr
table; std::shared_ptr snapshot; @@ -2524,7 +2524,7 @@ void TabletImpl::MakeSnapshotInternal(uint32_t tid, uint32_t pid, uint64_t end_o uint64_t cur_offset = replicator->GetOffset(); uint64_t snapshot_offset = snapshot->GetOffset(); int ret = 0; - if (cur_offset < snapshot_offset + FLAGS_make_snapshot_threshold_offset && end_offset == 0) { + if (!is_force && cur_offset < snapshot_offset + FLAGS_make_snapshot_threshold_offset && end_offset == 0) { PDLOG(INFO, "offset can't reach the threshold. tid[%u] pid[%u] " "cur_offset[%lu], snapshot_offset[%lu] end_offset[%lu]", @@ -2597,7 +2597,7 @@ void TabletImpl::MakeSnapshot(RpcController* controller, const ::openmldb::api:: break; } } - snapshot_pool_.AddTask(boost::bind(&TabletImpl::MakeSnapshotInternal, this, tid, pid, offset, task_ptr)); + snapshot_pool_.AddTask(boost::bind(&TabletImpl::MakeSnapshotInternal, this, tid, pid, offset, task_ptr, false)); response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); return; @@ -2631,7 +2631,7 @@ void TabletImpl::SchedMakeSnapshot() { } for (auto iter = table_set.begin(); iter != table_set.end(); ++iter) { PDLOG(INFO, "start make snapshot tid[%u] pid[%u]", iter->first, iter->second); - MakeSnapshotInternal(iter->first, iter->second, 0, std::shared_ptr<::openmldb::api::TaskInfo>()); + MakeSnapshotInternal(iter->first, iter->second, 0, std::shared_ptr<::openmldb::api::TaskInfo>(), false); } // delay task one hour later avoid execute more than one time snapshot_pool_.DelayTask(FLAGS_make_snapshot_check_interval + 60 * 60 * 1000, @@ -3532,7 +3532,7 @@ base::Status TabletImpl::TruncateTableInternal(uint32_t tid, uint32_t pid) { std::shared_ptr
new_table; new_table = std::make_shared(*table_meta); if (!new_table->Init()) { - PDLOG(WARNING, "fail to init table. tid %u, pid %u", table_meta->tid(), table_meta->pid()); + PDLOG(WARNING, "fail to init table. tid %u, pid %u", tid, pid); return {::openmldb::base::ReturnCode::kTableMetaIsIllegal, "fail to init table"}; } new_table->SetTableStat(::openmldb::storage::kNormal); @@ -3556,6 +3556,7 @@ base::Status TabletImpl::TruncateTableInternal(uint32_t tid, uint32_t pid) { if (auto status = disk_table->Truncate(); !status.OK()) { return {::openmldb::base::ReturnCode::kTruncateTableFailed, status.GetMsg()}; } + snapshot_pool_.AddTask(boost::bind(&TabletImpl::MakeSnapshotInternal, this, tid, pid, 0, nullptr, true)); } PDLOG(INFO, "truncate table success. tid[%u] pid[%u]", tid, pid); return {}; diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index fb337d5edb5..80ffd7e1526 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -316,7 +316,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { int CreateTableInternal(const ::openmldb::api::TableMeta* table_meta, std::string& msg); // NOLINT void MakeSnapshotInternal(uint32_t tid, uint32_t pid, uint64_t end_offset, - std::shared_ptr<::openmldb::api::TaskInfo> task); + std::shared_ptr<::openmldb::api::TaskInfo> task, bool is_force); void SendSnapshotInternal(const std::string& endpoint, uint32_t tid, uint32_t pid, uint32_t remote_tid, std::shared_ptr<::openmldb::api::TaskInfo> task); From 089e781c3d509ef8d0a4ce8aa7ded89d10ccc52d Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 20 Oct 2023 11:52:40 +0800 Subject: [PATCH 8/8] docs: add doc --- .../sql/ddl/TRUNCATE_TABLE_STATEMENT.md | 16 ++ docs/en/reference/sql/ddl/index.rst | 1 + .../ddl/TRUNCATE_TABLE_STATEMENT.md | 16 ++ docs/zh/openmldb_sql/ddl/index.rst | 1 + src/sdk/interactive.h | 144 ++++++++++++++++++ src/sdk/sql_cluster_router.cc | 60 ++------ src/sdk/sql_cluster_router.h | 3 +- 7 files changed, 191 insertions(+), 50 deletions(-) create mode 100644 docs/en/reference/sql/ddl/TRUNCATE_TABLE_STATEMENT.md create mode 100644 docs/zh/openmldb_sql/ddl/TRUNCATE_TABLE_STATEMENT.md create mode 100644 src/sdk/interactive.h diff --git a/docs/en/reference/sql/ddl/TRUNCATE_TABLE_STATEMENT.md b/docs/en/reference/sql/ddl/TRUNCATE_TABLE_STATEMENT.md new file mode 100644 index 00000000000..3bd9360d920 --- /dev/null +++ b/docs/en/reference/sql/ddl/TRUNCATE_TABLE_STATEMENT.md @@ -0,0 +1,16 @@ +# TRUNCATE TABLE + +``` +TRUNCATE TABLE table_name +``` + +`TRUNCATE TABLE` statement is used to clear the specified table. + +## Example: clear t1 + +```sql +TRUNCATE TABLE t1; +-- Truncate table t1? yes/no +-- yes +-- SUCCEED +``` \ No newline at end of file diff --git a/docs/en/reference/sql/ddl/index.rst b/docs/en/reference/sql/ddl/index.rst index dbc94cc1f3d..bff9db48fb0 100644 --- a/docs/en/reference/sql/ddl/index.rst +++ b/docs/en/reference/sql/ddl/index.rst @@ -24,3 +24,4 @@ Data Definition Statement (DDL) SHOW_FUNCTIONS DROP_FUNCTION SHOW_CREATE_TABLE_STATEMENT + TRUNCATE_TABLE_STATEMENT diff --git a/docs/zh/openmldb_sql/ddl/TRUNCATE_TABLE_STATEMENT.md b/docs/zh/openmldb_sql/ddl/TRUNCATE_TABLE_STATEMENT.md new file mode 100644 index 00000000000..8ffb623f26f --- /dev/null +++ b/docs/zh/openmldb_sql/ddl/TRUNCATE_TABLE_STATEMENT.md @@ -0,0 +1,16 @@ +# TRUNCATE TABLE + +``` +TRUNCATE TABLE table_name +``` + +`TRUNCATE TABLE`语句用清空指定的表。 + +## Example: 清空t1表 + +```sql +TRUNCATE TABLE t1; +-- Truncate table t1? yes/no +-- yes +-- SUCCEED +``` \ No newline at end of file diff --git a/docs/zh/openmldb_sql/ddl/index.rst b/docs/zh/openmldb_sql/ddl/index.rst index efd36734261..9e420def154 100644 --- a/docs/zh/openmldb_sql/ddl/index.rst +++ b/docs/zh/openmldb_sql/ddl/index.rst @@ -24,3 +24,4 @@ SHOW_FUNCTIONS DROP_FUNCTION SHOW_CREATE_TABLE_STATEMENT + TRUNCATE_TABLE_STATEMENT \ No newline at end of file diff --git a/src/sdk/interactive.h b/src/sdk/interactive.h new file mode 100644 index 00000000000..c4480da9bc7 --- /dev/null +++ b/src/sdk/interactive.h @@ -0,0 +1,144 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SRC_SDK_INTERACTIVE_H_ +#define SRC_SDK_INTERACTIVE_H_ + +#include +#include + +#include "base/status.h" + +namespace openmldb { +namespace sdk { + +inline const std::string DROP_TABLE_MSG = + "DROP TABLE is a dangerous operation. Once deleted, it is very difficult to recover. \n" + "You may also note that: \n" + "- If a snapshot of a partition is being generated while dropping a table, " + "the partition will not be deleted successfully.\n" + "- By default, the deleted data is moved to the folder `recycle`.\n" + "Please refer to this link for more details: " + base::NOTICE_URL; + +inline const std::string DROP_DEPLOYMENT_MSG = + "- DROP DEPLOYMENT will not delete the index that is created automatically.\n" + "- DROP DEPLOYMENT will not delete data in the pre-aggregation table in the long window setting."; + +inline const std::string DROP_INDEX_MSG = + "DROP INDEX is a dangerous operation. Once deleted, it is very difficult to recover.\n" + "You may also note that: \n" + "- You have to wait for 2 garbage collection intervals (gc_interval) to create the same index.\n" + "- The index will not be deleted immediately, " + "it remains until after 2 garbage collection intervals.\n" + "Please refer to the doc for more details: " + base::NOTICE_URL; + +inline const std::string DROP_FUNCTION_MSG = + "This will lead to execution failure or system crash " + "if any active deployment is using the function."; + +enum class CmdType { + kDrop = 1, + kTruncate = 2, +}; + +enum class TargetType { + kTable = 1, + kDeployment = 2, + kIndex = 3, + kFunction = 4, + kProcedure = 5, +}; + +class InteractiveValidator { + public: + InteractiveValidator() = default; + explicit InteractiveValidator(bool interactive) : interactive_(interactive) {} + + bool Interactive() { return interactive_; } + void SetInteractive(bool interactive) { interactive_ = interactive; } + + bool Check(CmdType cmd_type, TargetType target, const std::string& name) { + if (!interactive_) { + return true; + } + std::string msg; + if (cmd_type == CmdType::kDrop) { + switch (target) { + case TargetType::kTable: + msg = DROP_TABLE_MSG; + break; + case TargetType::kDeployment: + msg = DROP_DEPLOYMENT_MSG; + break; + case TargetType::kIndex: + msg = DROP_INDEX_MSG; + break; + case TargetType::kFunction: + msg = DROP_FUNCTION_MSG; + break; + default: + break; + } + } + if (!msg.empty()) { + printf("%s\n", msg.c_str()); + } + std::string cmd_str = CmdType2Str(cmd_type); + std::string target_str = TargetType2Str(target); + printf("%s %s %s? yes/no\n", cmd_str.c_str(), target_str.c_str(), name.c_str()); + std::string input; + std::cin >> input; + std::transform(input.begin(), input.end(), input.begin(), ::tolower); + if (input != "yes") { + printf("'%s %s' cmd is canceled!\n", cmd_str.c_str(), name.c_str()); + return false; + } + return true; + } + + private: + std::string CmdType2Str(CmdType type) { + if (type == CmdType::kDrop) { + return "Drop"; + } else { + return "Truncate"; + } + } + + std::string TargetType2Str(TargetType type) { + switch (type) { + case TargetType::kTable: + return "table"; + case TargetType::kDeployment: + return "deployment"; + case TargetType::kIndex: + return "index"; + case TargetType::kFunction: + return "function"; + default: + return ""; + } + return ""; + } + + private: + bool interactive_ = false; +}; + +} // namespace sdk +} // namespace openmldb + +#endif // SRC_SDK_INTERACTIVE_H_ diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index a12f0c914d6..9212962df25 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -210,7 +210,7 @@ class BatchQueryFutureImpl : public QueryFuture { SQLClusterRouter::SQLClusterRouter(const SQLRouterOptions& options) : options_(std::make_shared(options)), is_cluster_mode_(true), - interactive_(false), + interactive_validator_(), cluster_sdk_(nullptr), mu_(), rand_(::baidu::common::timer::now_time()) {} @@ -218,7 +218,7 @@ SQLClusterRouter::SQLClusterRouter(const SQLRouterOptions& options) SQLClusterRouter::SQLClusterRouter(const StandaloneOptions& options) : options_(std::make_shared(options)), is_cluster_mode_(false), - interactive_(false), + interactive_validator_(), cluster_sdk_(nullptr), mu_(), rand_(::baidu::common::timer::now_time()) {} @@ -226,7 +226,7 @@ SQLClusterRouter::SQLClusterRouter(const StandaloneOptions& options) SQLClusterRouter::SQLClusterRouter(DBSDK* sdk) : options_(), is_cluster_mode_(sdk->IsClusterMode()), - interactive_(false), + interactive_validator_(), cluster_sdk_(sdk), mu_(), rand_(::baidu::common::timer::now_time()) { @@ -1752,7 +1752,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h } case hybridse::node::kCmdDropFunction: { std::string name = cmd_node->GetArgs()[0]; - if (!CheckAnswerIfInteractive("function", name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kFunction, name)) { return {}; } auto base_status = ns_ptr->DropFunction(name, cmd_node->IsIfExists()); @@ -1808,7 +1808,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h return {}; } std::string sp_name = cmd_node->GetArgs()[0]; - if (!CheckAnswerIfInteractive("procedure", sp_name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kProcedure, sp_name)) { return {}; } if (ns_ptr->DropProcedure(db, sp_name, msg)) { @@ -1878,7 +1878,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {StatusCode::kCmdError, sp ? "not a deployment" : "deployment not found"}; return {}; } - if (!CheckAnswerIfInteractive("deployment", deploy_name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kDeployment, deploy_name)) { return {}; } if (ns_ptr->DropProcedure(db_name, deploy_name, msg)) { @@ -1959,7 +1959,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {StatusCode::kCmdError, msg}; return {}; } - if (!CheckAnswerIfInteractive("table", table_name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kTable, table_name)) { return {}; } if (DropTable(db_name, table_name, cmd_node->IsIfExists(), status)) { @@ -1975,7 +1975,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {StatusCode::kCmdError, msg}; return {}; } - if (!CheckAnswerIfInteractive("truncate", table_name)) { + if (!interactive_validator_.Check(CmdType::kTruncate, TargetType::kTable, table_name)) { return {}; } auto base_status = ns_ptr->TruncateTable(db_name, table_name); @@ -1999,7 +1999,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {StatusCode::kCmdError, "Invalid Cmd Args size"}; return {}; } - if (!CheckAnswerIfInteractive("index", index_name + " on " + table_name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kIndex, index_name + " on " + table_name)) { return {}; } ret = ns_ptr->DeleteIndex(db_name, table_name, index_name, msg); @@ -2061,7 +2061,7 @@ base::Status SQLClusterRouter::HandleSQLCreateTable(hybridse::node::CreatePlanNo if (!ns_ptr->CreateTable(table_info, create_node->GetIfNotExist(), msg)) { return base::Status(base::ReturnCode::kSQLCmdRunError, msg); } - if (interactive_ && table_info.column_key_size() == 0) { + if (interactive_validator_.Interactive() && table_info.column_key_size() == 0) { return base::Status{base::ReturnCode::kOk, "As there is no index specified, a default index type `absolute 0` will be created. " "The data attached to the index will never expire to be deleted. " @@ -3041,44 +3041,6 @@ ::hybridse::sdk::Status SQLClusterRouter::ParseNamesFromArgs(const std::string& return {}; } -bool SQLClusterRouter::CheckAnswerIfInteractive(const std::string& drop_type, const std::string& name) { - if (interactive_) { - std::string msg; - if (drop_type == "table") { - msg = "DROP TABLE is a dangerous operation. Once deleted, it is very difficult to recover. \n" - "You may also note that: \n" - "- If a snapshot of a partition is being generated while dropping a table, " - "the partition will not be deleted successfully.\n" - "- By default, the deleted data is moved to the folder `recycle`.\n" - "Please refer to this link for more details: " + base::NOTICE_URL; - } else if (drop_type == "deployment") { - msg = "- DROP DEPLOYMENT will not delete the index that is created automatically.\n" - "- DROP DEPLOYMENT will not delete data in the pre-aggregation table in the long window setting."; - } else if (drop_type == "index") { - msg = "DROP INDEX is a dangerous operation. Once deleted, it is very difficult to recover.\n" - "You may also note that: \n" - "- You have to wait for 2 garbage collection intervals (gc_interval) to create the same index.\n" - "- The index will not be deleted immediately, " - "it remains until after 2 garbage collection intervals.\n" - "Please refer to the doc for more details: " + base::NOTICE_URL; - } else if (drop_type == "function") { - msg = "This will lead to execution failure or system crash if any active deployment is using the function."; - } - if (!msg.empty()) { - printf("%s\n", msg.c_str()); - } - printf("Drop %s %s? yes/no\n", drop_type.c_str(), name.c_str()); - std::string input; - std::cin >> input; - std::transform(input.begin(), input.end(), input.begin(), ::tolower); - if (input != "yes") { - printf("'Drop %s' cmd is canceled!\n", name.c_str()); - return false; - } - } - return true; -} - std::string SQLClusterRouter::GetDatabase() { std::lock_guard<::openmldb::base::SpinMutex> lock(mu_); return db_; @@ -3089,7 +3051,7 @@ void SQLClusterRouter::SetDatabase(const std::string& db) { db_ = db; } -void SQLClusterRouter::SetInteractive(bool value) { interactive_ = value; } +void SQLClusterRouter::SetInteractive(bool value) { interactive_validator_.SetInteractive(value); } ::openmldb::base::Status SQLClusterRouter::SaveResultSet(const std::string& file_path, const std::shared_ptr& options_map, diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index 033bda8d090..27076732de6 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -34,6 +34,7 @@ #include "nameserver/system_table.h" #include "sdk/db_sdk.h" #include "sdk/file_option_parser.h" +#include "sdk/interactive.h" #include "sdk/sql_cache.h" #include "sdk/sql_router.h" #include "sdk/table_reader_impl.h" @@ -417,7 +418,7 @@ class SQLClusterRouter : public SQLRouter { std::string db_; std::map session_variables_; bool is_cluster_mode_; - bool interactive_; + InteractiveValidator interactive_validator_; DBSDK* cluster_sdk_; std::map>>> input_lru_cache_;