diff --git a/src/server/main_service.cc b/src/server/main_service.cc index a33a86739af9..d61808242049 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -884,7 +884,11 @@ void Service::Shutdown() { VLOG(1) << "Service::Shutdown"; // We mark that we are shutting down. After this incoming requests will be - // rejected + // rejected. + mu_.lock(); + global_state_ = GlobalState::SHUTTING_DOWN; + mu_.unlock(); + pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->EnterLameDuck(); facade::Connection::ShutdownThreadLocal(); @@ -2503,27 +2507,20 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) { return to; } -void Service::RequestLoadingState() { - bool switch_state = false; - { +bool Service::RequestLoadingState() { + if (SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING) { util::fb2::LockGuard lk(mu_); - ++loading_state_counter_; - if (global_state_ != GlobalState::LOADING) { - DCHECK_EQ(global_state_, GlobalState::ACTIVE); - switch_state = true; - } - } - if (switch_state) { - SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); + loading_state_counter_++; + return true; } + return false; } void Service::RemoveLoadingState() { bool switch_state = false; { util::fb2::LockGuard lk(mu_); - DCHECK_EQ(global_state_, GlobalState::LOADING); - DCHECK_GT(loading_state_counter_, 0u); + CHECK_GT(loading_state_counter_, 0u); --loading_state_counter_; switch_state = loading_state_counter_ == 0; } @@ -2532,11 +2529,6 @@ void Service::RemoveLoadingState() { } } -GlobalState Service::GetGlobalState() const { - util::fb2::LockGuard lk(mu_); - return global_state_; -} - void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) { // We skip authentication on privileged listener if the flag admin_nopass is set // We also skip authentication if requirepass is empty diff --git a/src/server/main_service.h b/src/server/main_service.h index bbaee3e62da2..151f61c08518 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -78,11 +78,9 @@ class Service : public facade::ServiceInterface { // Upon switch, updates cached global state in threadlocal ServerState struct. GlobalState SwitchState(GlobalState from, GlobalState to) ABSL_LOCKS_EXCLUDED(mu_); - void RequestLoadingState() ABSL_LOCKS_EXCLUDED(mu_); + bool RequestLoadingState() ABSL_LOCKS_EXCLUDED(mu_); void RemoveLoadingState() ABSL_LOCKS_EXCLUDED(mu_); - GlobalState GetGlobalState() const ABSL_LOCKS_EXCLUDED(mu_); - void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final; void OnConnectionClose(facade::ConnectionContext* cntx) final; diff --git a/src/server/replica.cc b/src/server/replica.cc index 977ec0f885d4..f1f109cd7350 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -417,7 +417,11 @@ error_code Replica::InitiatePSync() { io::PrefixSource ps{io_buf.InputBuffer(), Sock()}; // Set LOADING state. - service_.RequestLoadingState(); + if (!service_.RequestLoadingState()) { + return cntx_.ReportError(std::make_error_code(errc::state_not_recoverable), + "Failed to enter LOADING state"); + } + absl::Cleanup cleanup = [this]() { service_.RemoveLoadingState(); }; if (slot_range_.has_value()) { @@ -502,10 +506,14 @@ error_code Replica::InitiateDflySync() { for (auto& flow : shard_flows_) flow->Cancel(); }; + RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler))); // Make sure we're in LOADING state. - service_.RequestLoadingState(); + if (!service_.RequestLoadingState()) { + return cntx_.ReportError(std::make_error_code(errc::state_not_recoverable), + "Failed to enter LOADING state"); + } // Start full sync flows. state_mask_.fetch_or(R_SYNCING); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index dea8d5d297fd..5674ac30bba6 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1168,10 +1168,12 @@ void ServerFamily::SnapshotScheduling() { return; } - const auto loading_check_interval = std::chrono::seconds(10); - while (service_.GetGlobalState() == GlobalState::LOADING) { - schedule_done_.WaitFor(loading_check_interval); - } + ServerState* ss = ServerState::tlocal(); + do { + if (schedule_done_.WaitFor(100ms)) { + return; + } + } while (ss->gstate() == GlobalState::LOADING); while (true) { const std::chrono::time_point now = std::chrono::system_clock::now(); @@ -1657,9 +1659,10 @@ GenericError ServerFamily::DoSave(bool ignore_state) { GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view basename, Transaction* trans, bool ignore_state) { - auto state = service_.GetGlobalState(); + auto state = ServerState::tlocal()->gstate(); + // In some cases we want to create a snapshot even if server is not active, f.e in takeover - if (!ignore_state && (state != GlobalState::ACTIVE)) { + if (!ignore_state && (state != GlobalState::ACTIVE && state != GlobalState::SHUTTING_DOWN)) { return GenericError{make_error_code(errc::operation_in_progress), StrCat(GlobalStateName(state), " - can not save database")}; } @@ -2242,6 +2245,8 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) { absl::StrAppend(&info, a1, ":", a2, "\r\n"); }; + ServerState* ss = ServerState::tlocal(); + if (should_enter("SERVER")) { auto kind = ProactorBase::me()->GetKind(); const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll"; @@ -2467,7 +2472,7 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) { append("last_saved_file", save_info.file_name); append("last_success_save_duration_sec", save_info.success_duration_sec); - size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING; + unsigned is_loading = (ss->gstate() == GlobalState::LOADING); append("loading", is_loading); append("saving", is_saving); append("current_save_duration_sec", curent_durration_sec); @@ -2752,7 +2757,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time // We should not execute replica of command while loading from snapshot. - if (ServerState::tlocal()->is_master && service_.GetGlobalState() == GlobalState::LOADING) { + ServerState* ss = ServerState::tlocal(); + if (ss->is_master && ss->gstate() == GlobalState::LOADING) { builder->SendError(kLoadingErr); return; } @@ -2766,7 +2772,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply // If NO ONE was supplied, just stop the current replica (if it exists) if (replicaof_args->IsReplicaOfNoOne()) { - if (!ServerState::tlocal()->is_master) { + if (!ss->is_master) { CHECK(replica_); SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica @@ -2776,8 +2782,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply StopAllClusterReplicas(); } - CHECK_EQ(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE), GlobalState::ACTIVE) - << "Server is set to replica no one, yet state is not active!"; + // May not switch to ACTIVE if the process is, for example, shutting down at the same time. + service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); return builder->SendOk(); } diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index a2695e5b2e81..5b0341df3c68 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1976,18 +1976,22 @@ async def test_replicaof_reject_on_load(df_factory, df_seeder_factory): replica.stop() replica.start() c_replica = replica.client() + + @assert_eventually + async def check_replica_isloading(): + persistence = await c_replica.info("PERSISTENCE") + assert persistence["loading"] == 1 + + # If this fails adjust `keys` and the `assert dbsize >= 30000` above. + # Keep in mind that if the assert False is triggered below, it doesn't mean + # that there is a bug because it could be the case that while executing + # INFO PERSISTENCE df is in loading state but when we call REPLICAOF df + # is no longer in loading state and the assertion false is triggered. + await check_replica_isloading() + # Check replica of not alowed while loading snapshot - try: - # If this fails adjust `keys` and the `assert dbsize >= 30000` above. - # Keep in mind that if the assert False is triggered below, it doesn't mean - # that there is a bug because it could be the case that while executing - # INFO PERSISTENCE df is in loading state but when we call REPLICAOF df - # is no longer in loading state and the assertion false is triggered. - assert "loading:1" in (await c_replica.execute_command("INFO PERSISTENCE")) + with pytest.raises(aioredis.BusyLoadingError): await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - assert False - except aioredis.BusyLoadingError as e: - assert "Dragonfly is loading the dataset in memory" in str(e) # Check one we finish loading snapshot replicaof success await wait_available_async(c_replica, timeout=180)