From 1696160c10b3ed92ab638d4a7cd19b81c2653b03 Mon Sep 17 00:00:00 2001 From: Christian Zentgraf Date: Thu, 16 May 2024 18:55:12 -0400 Subject: [PATCH] [native] Replace fixed worker port with ephemeral ports Previously the listener ports for the native works in the E2E tests was hard coded to be 1234 + worker count. The change looks in the OS for an available ephemeral port and uses this value when spawning the native workers. The native worker must then defer some configuration until the port selection by the OS is known. --- .../presto_cpp/main/PrestoServer.cpp | 100 ++++++++++++------ .../PrestoNativeQueryRunnerUtils.java | 5 +- 2 files changed, 68 insertions(+), 37 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 538d2df2cac3f..b0270e5d63df7 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -276,35 +276,38 @@ void PrestoServer::run() { address_); initializeCoordinatorDiscoverer(); - if (coordinatorDiscoverer_ != nullptr) { - announcer_ = std::make_unique( - address_, - httpsPort.has_value(), - httpsPort.has_value() ? httpsPort.value() : httpPort, - coordinatorDiscoverer_, - nodeVersion_, - environment_, - nodeId_, - nodeLocation_, - systemConfig->prestoNativeSidecar(), - catalogNames, - systemConfig->announcementMaxFrequencyMs(), - sslContext_); - updateAnnouncerDetails(); - announcer_->start(); - - uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); - if (heartbeatFrequencyMs > 0) { - heartbeatManager_ = std::make_unique( + auto startAnnouncerAndHeartbeatManager = [&](const bool& useHttps, + const int& port) { + if (coordinatorDiscoverer_ != nullptr) { + announcer_ = std::make_unique( address_, - httpsPort.has_value() ? httpsPort.value() : httpPort, + useHttps, + port, coordinatorDiscoverer_, - sslContext_, - [server = this]() { return server->fetchNodeStatus(); }, - heartbeatFrequencyMs); - heartbeatManager_->start(); + nodeVersion_, + environment_, + nodeId_, + nodeLocation_, + systemConfig->prestoNativeSidecar(), + catalogNames, + systemConfig->announcementMaxFrequencyMs(), + sslContext_); + updateAnnouncerDetails(); + announcer_->start(); + + uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); + if (heartbeatFrequencyMs > 0) { + heartbeatManager_ = std::make_unique( + address_, + port, + coordinatorDiscoverer_, + sslContext_, + [server = this]() { return server->fetchNodeStatus(); }, + heartbeatFrequencyMs); + heartbeatManager_->start(); + } } - } + }; const bool reusePort = SystemConfig::instance()->httpServerReusePort(); auto httpConfig = @@ -439,14 +442,16 @@ void PrestoServer::run() { taskManager_ = std::make_unique( driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get()); - std::string taskUri; - if (httpsPort.has_value()) { - taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value()); - } else { - taskUri = fmt::format(kTaskUriFormat, kHttp, address_, httpPort); - } + auto setTaskUri = [&](const bool& useHttps, const int& port) { + std::string taskUri; + if (useHttps) { + taskUri = fmt::format(kTaskUriFormat, kHttps, address_, port); + } else { + taskUri = fmt::format(kTaskUriFormat, kHttp, address_, port); + } + taskManager_->setBaseUri(taskUri); + }; - taskManager_->setBaseUri(taskUri); taskManager_->setNodeId(nodeId_); taskManager_->setOldTaskCleanUpMs(systemConfig->oldTaskCleanUpMs()); @@ -529,7 +534,34 @@ void PrestoServer::run() { // Start everything. After the return from the following call we are shutting // down. - httpServer_->start(getHttpServerFilters()); + httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) { + const auto addresses = server->addresses(); + for (auto address : addresses) { + PRESTO_STARTUP_LOG(INFO) << fmt::format( + "Server listening at {}:{} - https {}", + address.address.getIPAddress().str(), + address.address.getPort(), + address.sslConfigs.size() != 0); + // We could be bound to both http and https ports. + // If https is set we must use that. + if (httpsPort.has_value() && address.sslConfigs.size() == 0) { + continue; + } + startAnnouncerAndHeartbeatManager( + httpsPort.has_value(), address.address.getPort()); + setTaskUri(httpsPort.has_value(), address.address.getPort()); + break; + } + VELOX_CHECK( + announcer_ != nullptr, + "The announcer is expected to have been created but wasn't."); + const auto heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); + if (heartbeatFrequencyMs > 0) { + VELOX_CHECK( + heartbeatManager_ != nullptr, + "The heartbeat manager is expected to have been created but wasn't."); + } + }); if (announcer_ != nullptr) { PRESTO_SHUTDOWN_LOG(INFO) << "Stopping announcer"; diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index af10adc032dd4..bd2dd738aba0c 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -377,13 +377,12 @@ public static Optional> getExternalWorkerLaunc Files.createDirectories(dir); Path tempDirectoryPath = Files.createTempDirectory(dir, "worker"); log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString()); - int port = 1234 + workerIndex; - // Write config file + // Write config file - use an ephemeral port for the worker. String configProperties = format("discovery.uri=%s%n" + "presto.version=testversion%n" + "system-memory-gb=4%n" + - "http-server.http.port=%d", discoveryUri, port); + "http-server.http.port=0%n", discoveryUri); if (remoteFunctionServerUds.isPresent()) { String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();