Skip to content

Commit

Permalink
[native] Replace fixed worker port with ephemeral ports
Browse files Browse the repository at this point in the history
Previously the listener ports for the native works in the E2E tests
was hard coded to be 1234 + worker count.
The change looks in the OS for an available ephemeral port
and uses this value when spawning the native workers.

The native worker must then defer some configuration until the
port selection by the OS is known.
  • Loading branch information
czentgr committed Jun 20, 2024
1 parent 21667e2 commit f835b46
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 37 deletions.
100 changes: 66 additions & 34 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,35 +270,38 @@ void PrestoServer::run() {
address_);

initializeCoordinatorDiscoverer();
if (coordinatorDiscoverer_ != nullptr) {
announcer_ = std::make_unique<Announcer>(
address_,
httpsPort.has_value(),
httpsPort.has_value() ? httpsPort.value() : httpPort,
coordinatorDiscoverer_,
nodeVersion_,
environment_,
nodeId_,
nodeLocation_,
systemConfig->prestoNativeSidecar(),
catalogNames,
systemConfig->announcementMaxFrequencyMs(),
sslContext_);
updateAnnouncerDetails();
announcer_->start();

uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
heartbeatManager_ = std::make_unique<PeriodicHeartbeatManager>(
auto startAnnouncerAndHeartbeatManager = [&](const bool& useHttps,
const int& port) {
if (coordinatorDiscoverer_ != nullptr) {
announcer_ = std::make_unique<Announcer>(
address_,
httpsPort.has_value() ? httpsPort.value() : httpPort,
useHttps,
port,
coordinatorDiscoverer_,
sslContext_,
[server = this]() { return server->fetchNodeStatus(); },
heartbeatFrequencyMs);
heartbeatManager_->start();
nodeVersion_,
environment_,
nodeId_,
nodeLocation_,
systemConfig->prestoNativeSidecar(),
catalogNames,
systemConfig->announcementMaxFrequencyMs(),
sslContext_);
updateAnnouncerDetails();
announcer_->start();

uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
heartbeatManager_ = std::make_unique<PeriodicHeartbeatManager>(
address_,
port,
coordinatorDiscoverer_,
sslContext_,
[server = this]() { return server->fetchNodeStatus(); },
heartbeatFrequencyMs);
heartbeatManager_->start();
}
}
}
};

const bool reusePort = SystemConfig::instance()->httpServerReusePort();
auto httpConfig =
Expand Down Expand Up @@ -419,14 +422,16 @@ void PrestoServer::run() {
taskManager_ = std::make_unique<TaskManager>(
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());

std::string taskUri;
if (httpsPort.has_value()) {
taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value());
} else {
taskUri = fmt::format(kTaskUriFormat, kHttp, address_, httpPort);
}
auto setTaskUri = [&](const bool& useHttps, const int& port) {
std::string taskUri;
if (useHttps) {
taskUri = fmt::format(kTaskUriFormat, kHttps, address_, port);
} else {
taskUri = fmt::format(kTaskUriFormat, kHttp, address_, port);
}
taskManager_->setBaseUri(taskUri);
};

taskManager_->setBaseUri(taskUri);
taskManager_->setNodeId(nodeId_);
taskManager_->setOldTaskCleanUpMs(systemConfig->oldTaskCleanUpMs());

Expand Down Expand Up @@ -509,7 +514,34 @@ void PrestoServer::run() {

// Start everything. After the return from the following call we are shutting
// down.
httpServer_->start(getHttpServerFilters());
httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) {
const auto addresses = server->addresses();
for (auto address : addresses) {
PRESTO_STARTUP_LOG(INFO) << fmt::format(
"Server listening at {}:{} - https {}",
address.address.getIPAddress().str(),
address.address.getPort(),
address.sslConfigs.size() != 0);
// We could be bound to both http and https ports.
// If https is set we must use that.
if (httpsPort.has_value() && address.sslConfigs.size() == 0) {
continue;
}
startAnnouncerAndHeartbeatManager(
httpsPort.has_value(), address.address.getPort());
setTaskUri(httpsPort.has_value(), address.address.getPort());
break;
}
VELOX_CHECK(
announcer_ != nullptr,
"The announcer is expected to have been created but wasn't.");
const auto heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
VELOX_CHECK(
heartbeatManager_ != nullptr,
"The heartbeat manager is expected to have been created but wasn't.");
}
});

if (announcer_ != nullptr) {
PRESTO_SHUTDOWN_LOG(INFO) << "Stopping announcer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,12 @@ public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLaunc
Files.createDirectories(dir);
Path tempDirectoryPath = Files.createTempDirectory(dir, "worker");
log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString());
int port = 1234 + workerIndex;

// Write config file
// Write config file - use an ephemeral port for the worker.
String configProperties = format("discovery.uri=%s%n" +
"presto.version=testversion%n" +
"system-memory-gb=4%n" +
"http-server.http.port=%d", discoveryUri, port);
"http-server.http.port=0%n", discoveryUri);

if (remoteFunctionServerUds.isPresent()) {
String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();
Expand Down

0 comments on commit f835b46

Please sign in to comment.