Skip to content

Commit

Permalink
[native] Replace fixed worker port with ephemeral ports
Browse files Browse the repository at this point in the history
Previously the listener ports for the native works in the E2E tests
was hard coded to be 1234 + worker count.
The change looks in the OS for an available ephemeral port
and uses this value when spawning the native workers.

The native worker must then defer some configuration until the
port selection by the OS is known.
  • Loading branch information
czentgr committed Jul 15, 2024
1 parent 6b77a1b commit c1066d6
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 37 deletions.
102 changes: 68 additions & 34 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,35 +272,38 @@ void PrestoServer::run() {
address_);

initializeCoordinatorDiscoverer();
if (coordinatorDiscoverer_ != nullptr) {
announcer_ = std::make_unique<Announcer>(
address_,
httpsPort.has_value(),
httpsPort.has_value() ? httpsPort.value() : httpPort,
coordinatorDiscoverer_,
nodeVersion_,
environment_,
nodeId_,
nodeLocation_,
systemConfig->prestoNativeSidecar(),
catalogNames,
systemConfig->announcementMaxFrequencyMs(),
sslContext_);
updateAnnouncerDetails();
announcer_->start();

uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
heartbeatManager_ = std::make_unique<PeriodicHeartbeatManager>(
auto startAnnouncerAndHeartbeatManager = [&](const bool& useHttps,
const int& port) {
if (coordinatorDiscoverer_ != nullptr) {
announcer_ = std::make_unique<Announcer>(
address_,
httpsPort.has_value() ? httpsPort.value() : httpPort,
useHttps,
port,
coordinatorDiscoverer_,
sslContext_,
[server = this]() { return server->fetchNodeStatus(); },
heartbeatFrequencyMs);
heartbeatManager_->start();
nodeVersion_,
environment_,
nodeId_,
nodeLocation_,
systemConfig->prestoNativeSidecar(),
catalogNames,
systemConfig->announcementMaxFrequencyMs(),
sslContext_);
updateAnnouncerDetails();
announcer_->start();

uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
heartbeatManager_ = std::make_unique<PeriodicHeartbeatManager>(
address_,
port,
coordinatorDiscoverer_,
sslContext_,
[server = this]() { return server->fetchNodeStatus(); },
heartbeatFrequencyMs);
heartbeatManager_->start();
}
}
}
};

const bool reusePort = SystemConfig::instance()->httpServerReusePort();
auto httpConfig =
Expand Down Expand Up @@ -450,14 +453,16 @@ void PrestoServer::run() {
taskManager_ = std::make_unique<TaskManager>(
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());

std::string taskUri;
if (httpsPort.has_value()) {
taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value());
} else {
taskUri = fmt::format(kTaskUriFormat, kHttp, address_, httpPort);
}
auto setTaskUri = [&](const bool& useHttps, const int& port) {
std::string taskUri;
if (useHttps) {
taskUri = fmt::format(kTaskUriFormat, kHttps, address_, port);
} else {
taskUri = fmt::format(kTaskUriFormat, kHttp, address_, port);
}
taskManager_->setBaseUri(taskUri);
};

taskManager_->setBaseUri(taskUri);
taskManager_->setNodeId(nodeId_);
taskManager_->setOldTaskCleanUpMs(systemConfig->oldTaskCleanUpMs());

Expand Down Expand Up @@ -540,7 +545,36 @@ void PrestoServer::run() {

// Start everything. After the return from the following call we are shutting
// down.
httpServer_->start(getHttpServerFilters());
httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) {
const auto addresses = server->addresses();
for (auto address : addresses) {
PRESTO_STARTUP_LOG(INFO) << fmt::format(
"Server listening at {}:{} - https {}",
address.address.getIPAddress().str(),
address.address.getPort(),
address.sslConfigs.size() != 0);
// We could be bound to both http and https ports.
// If https is set we must use that.
if (httpsPort.has_value() && address.sslConfigs.size() == 0) {
continue;
}
startAnnouncerAndHeartbeatManager(
httpsPort.has_value(), address.address.getPort());
setTaskUri(httpsPort.has_value(), address.address.getPort());
break;
}
if (coordinatorDiscoverer_ != nullptr) {
VELOX_CHECK(
announcer_ != nullptr,
"The announcer is expected to have been created but wasn't.");
const auto heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
VELOX_CHECK(
heartbeatManager_ != nullptr,
"The heartbeat manager is expected to have been created but wasn't.");
}
}
});

if (announcer_ != nullptr) {
PRESTO_SHUTDOWN_LOG(INFO) << "Stopping announcer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,12 @@ public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLaunc
Files.createDirectories(dir);
Path tempDirectoryPath = Files.createTempDirectory(dir, "worker");
log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString());
int port = 1234 + workerIndex;

// Write config file
// Write config file - use an ephemeral port for the worker.
String configProperties = format("discovery.uri=%s%n" +
"presto.version=testversion%n" +
"system-memory-gb=4%n" +
"http-server.http.port=%d", discoveryUri, port);
"http-server.http.port=0%n", discoveryUri);

if (remoteFunctionServerUds.isPresent()) {
String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();
Expand Down

0 comments on commit c1066d6

Please sign in to comment.