diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index e1d3ef9fa28..2a03c9a6b81 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -483,6 +483,17 @@ bool TabletClient::UpdateTTL(uint32_t tid, uint32_t pid, const ::openmldb::type: return false; } +bool TabletClient::Refresh() { + ::openmldb::api::RefreshRequest request; + ::openmldb::api::GeneralResponse response; + bool ret = client_.SendRequest(&::openmldb::api::TabletServer_Stub::Refresh, &request, &response, + FLAGS_request_timeout_ms, FLAGS_request_max_retry); + if (!ret || response.code() != 0) { + return false; + } + return true; +} + bool TabletClient::Refresh(uint32_t tid) { ::openmldb::api::RefreshRequest request; request.set_tid(tid); diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index 895960b9bdc..dfbec7ed8a2 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -236,6 +236,7 @@ class TabletClient : public Client { bool DropProcedure(const std::string& db_name, const std::string& sp_name); + bool Refresh(); bool Refresh(uint32_t tid); bool SubQuery(const ::openmldb::api::QueryRequest& request, diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index 1c9fd6b1dc6..3e97587f690 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -2998,8 +2998,15 @@ TEST_P(DBSDKTest, LongWindowsCleanup) { HandleSQL("create database test2;"); HandleSQL("use test2;"); HandleSQL(create_sql); + // sleep(5); // sleep to avoid tablet metadata // revert sleep to check error + ASSERT_TRUE(sr->RefreshCatalog()); // avoid cache in sdk + HandleSQL("show deployments;"); // ns deployment metadata, not tablet + // TODO if refresh is not good, sleep more + // sp_cache_->ProcedureExist in tablet get deployment here, but nameserver no deployment + // refresh won't effet sp_cache_ in tablet sr->ExecuteSQL(deploy_sql, &status); - ASSERT_TRUE(status.IsOK()); + // may get error `Fail to transform data_provider op: table test2.trans not exist!` TODO + ASSERT_TRUE(status.IsOK()) << "deploy failed on " << status.ToString(); std::string msg; std::string result_sql = "select * from __INTERNAL_DB.PRE_AGG_META_INFO;"; auto rs = sr->ExecuteSQL("", result_sql, &status); @@ -3014,6 +3021,8 @@ TEST_P(DBSDKTest, LongWindowsCleanup) { ASSERT_FALSE(cs->GetNsClient()->DropTable("test2", "trans", msg)); ASSERT_TRUE(cs->GetNsClient()->DropProcedure("test2", "demo1", msg)) << msg; ASSERT_TRUE(cs->GetNsClient()->DropTable("test2", "trans", msg)) << msg; + + ASSERT_TRUE(sr->RefreshCatalog()); // avoid cache in sdk // helpful for debug HandleSQL("show tables;"); HandleSQL("show deployments;"); @@ -3032,10 +3041,10 @@ TEST_P(DBSDKTest, CreateWithoutIndexCol) { "c8 date, index(ts=c7));"; hybridse::sdk::Status status; sr->ExecuteSQL(create_sql, &status); - ASSERT_TRUE(status.IsOK()); + ASSERT_TRUE(status.IsOK()) << status.ToString(); std::string msg; - ASSERT_TRUE(cs->GetNsClient()->DropTable("test2", "trans", msg)); - ASSERT_TRUE(cs->GetNsClient()->DropDatabase("test2", msg)); + ASSERT_TRUE(cs->GetNsClient()->DropTable("test2", "trans", msg)) << msg; + ASSERT_TRUE(cs->GetNsClient()->DropDatabase("test2", msg)) << msg; } TEST_P(DBSDKTest, CreateIfNotExists) { diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index 252fefd5917..ca61290eead 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -8586,7 +8586,7 @@ bool NameServerImpl::AddIndexToTableInfo(const std::string& name, const std::str endpoint_set.insert(meta.endpoint()); } } - // locked on top + // locked on top TODO(hw): for (const auto& tablet : tablets_) { if (!tablet.second->Health()) { continue; @@ -9428,6 +9428,8 @@ void NameServerImpl::DropProcedure(RpcController* controller, const api::DropPro db_sp_info_map_.erase(db_name); } NotifyTableChanged(::openmldb::type::NotifyType::kTable); + // Refresh on tablet to avoid meta inconsistent, notify may be slow TODO refresh works on procedure? + // RefreshHealthTabletsUnlockWith([](const std::shared_ptr& tablet_info) { return true; }); } response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); @@ -10512,5 +10514,19 @@ base::Status NameServerImpl::CreateDeployOP(const DeploySQLRequest& request, uin return {}; } +template +inline bool NameServerImpl::RefreshHealthTabletsUnlockWith(T pred) { + bool ret = true; + for (const auto& tablet : tablets_) { + if (!tablet.second->Health()) { + continue; + } + if (pred(tablet.second)) { + ret &= tablet.second->client_->Refresh(); + } + } + return ret; +} + } // namespace nameserver } // namespace openmldb diff --git a/src/nameserver/name_server_impl.h b/src/nameserver/name_server_impl.h index d8fc2245401..d1aa84fba7f 100644 --- a/src/nameserver/name_server_impl.h +++ b/src/nameserver/name_server_impl.h @@ -682,6 +682,8 @@ class NameServerImpl : public NameServer { bool IsExistDataBase(const std::string& db); + template bool RefreshHealthTabletsUnlockWith(T pred); + private: std::mutex mu_; Tablets tablets_; diff --git a/src/tablet/sql_cluster_availability_test.cc b/src/tablet/sql_cluster_availability_test.cc index 267b9144a3d..431ed83fd66 100644 --- a/src/tablet/sql_cluster_availability_test.cc +++ b/src/tablet/sql_cluster_availability_test.cc @@ -235,8 +235,10 @@ TEST_F(SqlClusterTest, RecoverProcedure) { ::openmldb::tablet::TabletImpl* tablet2 = new ::openmldb::tablet::TabletImpl(); StartTablet(&tb_server2, tablet2); sleep(3); + // ensure tablet added in sdk + ASSERT_TRUE(router->RefreshCatalog()); rs = router->CallProcedure(db, sp_name, request_row, &status); - if (!rs) FAIL() << "call procedure failed"; + if (!rs) FAIL() << "call procedure failed, " + status.ToString(); schema = rs->GetSchema(); ASSERT_EQ(schema->GetColumnCnt(), 3); ASSERT_TRUE(rs->Next()); diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 03335c60a7a..bcdedc393b1 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -553,8 +553,13 @@ void TabletImpl::Refresh(RpcController* controller, const ::openmldb::api::Refre ::openmldb::api::GeneralResponse* response, Closure* done) { brpc::ClosureGuard done_guard(done); if (IsClusterMode()) { - if (RefreshSingleTable(request->tid())) { - PDLOG(INFO, "refresh success. tid %u", request->tid()); + if(request->has_tid()) { + if (RefreshSingleTable(request->tid())) { + PDLOG(INFO, "refresh success. tid %u", request->tid()); + } + } else { + LOG(INFO) << "refresh all by rpc without tid"; + RefreshTableInfo(); } } } @@ -5222,7 +5227,8 @@ void TabletImpl::CreateProcedure(RpcController* controller, const openmldb::api: if (sp_cache_->ProcedureExist(db_name, sp_name)) { response->set_code(::openmldb::base::ReturnCode::kProcedureAlreadyExists); response->set_msg("store procedure already exists"); - PDLOG(WARNING, "store procedure[%s] already exists in db[%s]", sp_name.c_str(), db_name.c_str()); + // print endpoint for ut debug + PDLOG(WARNING, "store procedure[%s] already exists in db[%s] on %s", sp_name.c_str(), db_name.c_str(), endpoint_.c_str()); return; } ::hybridse::base::Status status; @@ -5283,7 +5289,7 @@ void TabletImpl::CreateProcedure(RpcController* controller, const openmldb::api: response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); - LOG(INFO) << "create procedure success! sp_name: " << sp_name << ", db: " << db_name << ", sql: " << sql; + LOG(INFO) << "create procedure success! sp_name: " << sp_name << ", db: " << db_name << ", sql: " << sql << " on " << endpoint_; } void TabletImpl::DropProcedure(RpcController* controller, const ::openmldb::api::DropProcedureRequest* request, @@ -5311,7 +5317,7 @@ void TabletImpl::DropProcedure(RpcController* controller, const ::openmldb::api: } response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); - PDLOG(INFO, "drop procedure success. db_name[%s] sp_name[%s]", db_name.c_str(), sp_name.c_str()); + PDLOG(INFO, "drop procedure success. db_name[%s] sp_name[%s] on %s", db_name.c_str(), sp_name.c_str(), endpoint_.c_str()); } void TabletImpl::RunRequestQuery(RpcController* ctrl, const openmldb::api::QueryRequest& request, @@ -5395,7 +5401,7 @@ void TabletImpl::CreateProcedure(const std::shared_ptrInsertSQLProcedureCacheEntry(db_name, sp_name, sp_info, session.GetCompileInfo(), batch_session.GetCompileInfo()); - + // only called by RefreshTableInfo LOG(INFO) << "refresh procedure success! sp_name: " << sp_name << ", db: " << db_name << ", sql: " << sql; }