From 751b14097e901049cccf42cfbffa0f0b8d4abf24 Mon Sep 17 00:00:00 2001 From: Christian Zentgraf Date: Thu, 16 May 2024 18:55:12 -0400 Subject: [PATCH] [native] Replace fixed worker port with ephemeral ports Previously the listener ports for the native works in the E2E tests was hard coded to be 1234 + worker count. The change looks in the OS for an available ephemeral port and uses this value when spawning the native workers. The native worker must then defer some configuration until the port selection by the OS is known. --- .../presto_cpp/main/PrestoServer.cpp | 98 ++++++++++++------- .../PrestoNativeQueryRunnerUtils.java | 5 +- 2 files changed, 67 insertions(+), 36 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 5bc0d5ef4b60d..41c405a14e13c 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -270,34 +270,37 @@ void PrestoServer::run() { address_); initializeCoordinatorDiscoverer(); - if (coordinatorDiscoverer_ != nullptr) { - announcer_ = std::make_unique( - address_, - httpsPort.has_value(), - httpsPort.has_value() ? httpsPort.value() : httpPort, - coordinatorDiscoverer_, - nodeVersion_, - environment_, - nodeId_, - nodeLocation_, - catalogNames, - systemConfig->announcementMaxFrequencyMs(), - sslContext_); - updateAnnouncerDetails(); - announcer_->start(); - - uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); - if (heartbeatFrequencyMs > 0) { - heartbeatManager_ = std::make_unique( + auto startAnnouncerAndHeartbeatManager = [&](const bool& useHttps, + const int& port) { + if (coordinatorDiscoverer_ != nullptr) { + announcer_ = std::make_unique( address_, - httpsPort.has_value() ? httpsPort.value() : httpPort, + useHttps, + port, coordinatorDiscoverer_, - sslContext_, - [server = this]() { return server->fetchNodeStatus(); }, - heartbeatFrequencyMs); - heartbeatManager_->start(); + nodeVersion_, + environment_, + nodeId_, + nodeLocation_, + catalogNames, + systemConfig->announcementMaxFrequencyMs(), + sslContext_); + updateAnnouncerDetails(); + announcer_->start(); + + uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); + if (heartbeatFrequencyMs > 0) { + heartbeatManager_ = std::make_unique( + address_, + port, + coordinatorDiscoverer_, + sslContext_, + [server = this]() { return server->fetchNodeStatus(); }, + heartbeatFrequencyMs); + heartbeatManager_->start(); + } } - } + }; const bool reusePort = SystemConfig::instance()->httpServerReusePort(); auto httpConfig = @@ -417,14 +420,16 @@ void PrestoServer::run() { taskManager_ = std::make_unique( driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get()); - std::string taskUri; - if (httpsPort.has_value()) { - taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value()); - } else { - taskUri = fmt::format(kTaskUriFormat, kHttp, address_, httpPort); - } + auto setTaskUri = [&](const bool& useHttps, const int& port) { + std::string taskUri; + if (useHttps) { + taskUri = fmt::format(kTaskUriFormat, kHttps, address_, port); + } else { + taskUri = fmt::format(kTaskUriFormat, kHttp, address_, port); + } + taskManager_->setBaseUri(taskUri); + }; - taskManager_->setBaseUri(taskUri); taskManager_->setNodeId(nodeId_); taskManager_->setOldTaskCleanUpMs(systemConfig->oldTaskCleanUpMs()); @@ -506,7 +511,34 @@ void PrestoServer::run() { // Start everything. After the return from the following call we are shutting // down. - httpServer_->start(getHttpServerFilters()); + httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) { + const auto addresses = server->addresses(); + for (auto address : addresses) { + PRESTO_STARTUP_LOG(INFO) << fmt::format( + "Server listening at {}:{} - https {}", + address.address.getIPAddress().str(), + address.address.getPort(), + address.sslConfigs.size() != 0); + // We could be bound to both http and https ports. + // If https is set we must use that. + if (httpsPort.has_value() && address.sslConfigs.size() == 0) { + continue; + } + startAnnouncerAndHeartbeatManager( + httpsPort.has_value(), address.address.getPort()); + setTaskUri(httpsPort.has_value(), address.address.getPort()); + break; + } + VELOX_CHECK( + announcer_ != nullptr, + "The announcer is expected to have been created but wasn't."); + const auto heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs(); + if (heartbeatFrequencyMs > 0) { + VELOX_CHECK( + heartbeatManager_ != nullptr, + "The heartbeat manager is expected to have been created but wasn't."); + } + }); if (announcer_ != nullptr) { PRESTO_SHUTDOWN_LOG(INFO) << "Stopping announcer"; diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index 32fe7ce79864e..e76a373550814 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -365,13 +365,12 @@ public static Optional> getExternalWorkerLaunc Files.createDirectories(dir); Path tempDirectoryPath = Files.createTempDirectory(dir, "worker"); log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString()); - int port = 1234 + workerIndex; - // Write config file + // Write config file - use an ephemeral port for the worker. String configProperties = format("discovery.uri=%s%n" + "presto.version=testversion%n" + "system-memory-gb=4%n" + - "http-server.http.port=%d", discoveryUri, port); + "http-server.http.port=0%n", discoveryUri); if (remoteFunctionServerUds.isPresent()) { String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();