From c1066d64b2b81f45119d1ebfc2b4611807a41143 Mon Sep 17 00:00:00 2001 From: Christian Zentgraf Date: Tue, 2 Jul 2024 11:29:51 -0400 Subject: [PATCH] [native] Replace fixed worker port with ephemeral ports Previously the listener ports for the native works in the E2E tests was hard coded to be 1234 + worker count. The change looks in the OS for an available ephemeral port and uses this value when spawning the native workers. The native worker must then defer some configuration until the port selection by the OS is known. --- .../presto_cpp/main/PrestoServer.cpp | 102 ++++++++++++------ .../PrestoNativeQueryRunnerUtils.java | 5 +- 2 files changed, 70 insertions(+), 37 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 66e65c467b353..1da59bf5a661c 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -272,35 +272,38 @@ void PrestoServer::run() { address_); initializeCoordinatorDiscoverer(); - if (coordinatorDiscoverer_ != nullptr) { - announcer_ = std::make_unique( - address_, - httpsPort.has_value(), - httpsPort.has_value() ? httpsPort.value() : httpPort, - coordinatorDiscoverer_, - nodeVersion_, - environment_, - nodeId_, - nodeLocation_, - systemConfig->prestoNativeSidecar(), - catalogNames, - systemConfig->announcementMaxFrequencyMs(), - sslContext_); - updateAnnouncerDetails(); - announcer_->start(); - - uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); - if (heartbeatFrequencyMs > 0) { - heartbeatManager_ = std::make_unique( + auto startAnnouncerAndHeartbeatManager = [&](const bool& useHttps, + const int& port) { + if (coordinatorDiscoverer_ != nullptr) { + announcer_ = std::make_unique( address_, - httpsPort.has_value() ? httpsPort.value() : httpPort, + useHttps, + port, coordinatorDiscoverer_, - sslContext_, - [server = this]() { return server->fetchNodeStatus(); }, - heartbeatFrequencyMs); - heartbeatManager_->start(); + nodeVersion_, + environment_, + nodeId_, + nodeLocation_, + systemConfig->prestoNativeSidecar(), + catalogNames, + systemConfig->announcementMaxFrequencyMs(), + sslContext_); + updateAnnouncerDetails(); + announcer_->start(); + + uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); + if (heartbeatFrequencyMs > 0) { + heartbeatManager_ = std::make_unique( + address_, + port, + coordinatorDiscoverer_, + sslContext_, + [server = this]() { return server->fetchNodeStatus(); }, + heartbeatFrequencyMs); + heartbeatManager_->start(); + } } - } + }; const bool reusePort = SystemConfig::instance()->httpServerReusePort(); auto httpConfig = @@ -450,14 +453,16 @@ void PrestoServer::run() { taskManager_ = std::make_unique( driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get()); - std::string taskUri; - if (httpsPort.has_value()) { - taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value()); - } else { - taskUri = fmt::format(kTaskUriFormat, kHttp, address_, httpPort); - } + auto setTaskUri = [&](const bool& useHttps, const int& port) { + std::string taskUri; + if (useHttps) { + taskUri = fmt::format(kTaskUriFormat, kHttps, address_, port); + } else { + taskUri = fmt::format(kTaskUriFormat, kHttp, address_, port); + } + taskManager_->setBaseUri(taskUri); + }; - taskManager_->setBaseUri(taskUri); taskManager_->setNodeId(nodeId_); taskManager_->setOldTaskCleanUpMs(systemConfig->oldTaskCleanUpMs()); @@ -540,7 +545,36 @@ void PrestoServer::run() { // Start everything. After the return from the following call we are shutting // down. - httpServer_->start(getHttpServerFilters()); + httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) { + const auto addresses = server->addresses(); + for (auto address : addresses) { + PRESTO_STARTUP_LOG(INFO) << fmt::format( + "Server listening at {}:{} - https {}", + address.address.getIPAddress().str(), + address.address.getPort(), + address.sslConfigs.size() != 0); + // We could be bound to both http and https ports. + // If https is set we must use that. + if (httpsPort.has_value() && address.sslConfigs.size() == 0) { + continue; + } + startAnnouncerAndHeartbeatManager( + httpsPort.has_value(), address.address.getPort()); + setTaskUri(httpsPort.has_value(), address.address.getPort()); + break; + } + if (coordinatorDiscoverer_ != nullptr) { + VELOX_CHECK( + announcer_ != nullptr, + "The announcer is expected to have been created but wasn't."); + const auto heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); + if (heartbeatFrequencyMs > 0) { + VELOX_CHECK( + heartbeatManager_ != nullptr, + "The heartbeat manager is expected to have been created but wasn't."); + } + } + }); if (announcer_ != nullptr) { PRESTO_SHUTDOWN_LOG(INFO) << "Stopping announcer"; diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index b4b8dba60eca5..004603b377d95 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -420,13 +420,12 @@ public static Optional> getExternalWorkerLaunc Files.createDirectories(dir); Path tempDirectoryPath = Files.createTempDirectory(dir, "worker"); log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString()); - int port = 1234 + workerIndex; - // Write config file + // Write config file - use an ephemeral port for the worker. String configProperties = format("discovery.uri=%s%n" + "presto.version=testversion%n" + "system-memory-gb=4%n" + - "http-server.http.port=%d", discoveryUri, port); + "http-server.http.port=0%n", discoveryUri); if (remoteFunctionServerUds.isPresent()) { String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();