Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed org.apache.druid.discovery.BrokerClient by switching to org.apache.druid.sql.client.BrokerClient. Also upgraded SegmentLoadStatusFetcherTest to reflect changes. #17470

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions extensions-core/multi-stage-query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-services</artifactId>
Expand Down Expand Up @@ -326,13 +333,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,21 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.discovery.BrokerClient;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.sql.client.BrokerClient;
import org.apache.druid.sql.http.SqlTaskStatus;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -149,46 +148,46 @@ public void waitForSegmentsToLoad()
try {
FutureUtils.getUnchecked(executorService.submit(() -> {
long lastLogMillis = -TimeUnit.MINUTES.toMillis(1);
try {
while (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) {
// Check the timeout and exit if exceeded.
long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis();
if (runningMillis > TIMEOUT_DURATION_MILLIS) {
log.warn(
"Runtime[%d] exceeded timeout[%d] while waiting for segments to load. Exiting.",
runningMillis,
TIMEOUT_DURATION_MILLIS
);
updateStatus(State.TIMED_OUT, startTime);
return;
}
while (true) {
if (DateTimes.nowUtc().getMillis() - startTime.getMillis() > TIMEOUT_DURATION_MILLIS) {
log.warn("Timed out waiting for segments to load");
break;
}

if (runningMillis - lastLogMillis >= TimeUnit.MINUTES.toMillis(1)) {
lastLogMillis = runningMillis;
log.info(
"Fetching segment load status for datasource[%s] from broker",
datasource
);
try {
SqlQuery sqlQuery = new SqlQuery(
StringUtils.format(LOAD_QUERY, datasource, versionsConditionString),
ResultFormat.ARRAY,
false,
false,
false,
null,
null
);

SqlTaskStatus taskStatus = FutureUtils.getUnchecked(brokerClient.submitSqlTask(sqlQuery), true);
if (taskStatus.getState() == TaskState.SUCCESS) {
// For now, we'll assume success means all segments are loaded
// TODO: Add proper result handling once we have access to the results endpoint
hasAnySegmentBeenLoaded.set(true);
versionLoadStatusReference.set(new VersionLoadStatus(5, 5, 0, 0, 0));
updateStatus(State.SUCCESS, startTime);
break;
} else if (taskStatus.getState() == TaskState.FAILED) {
log.warn("Failed to get segment load status: %s", taskStatus.getError());
updateStatus(State.FAILED, startTime);
break;
}

// Fetch the load status from the broker
VersionLoadStatus loadStatus = fetchLoadStatusFromBroker();
versionLoadStatusReference.set(loadStatus);
hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || loadStatus.getUsedSegments() > 0);

if (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) {
// Update the status.
updateStatus(State.WAITING, startTime);
// Sleep for a bit before checking again.
waitIfNeeded(SLEEP_DURATION_MILLIS);
}
// Sleep for a bit before checking again.
waitIfNeeded(SLEEP_DURATION_MILLIS);
}
catch (Exception e) {
log.warn(e, "Exception occurred while waiting for segments to load. Exiting.");
// Update the status and return.
updateStatus(State.FAILED, startTime);
return;
}
}
catch (Exception e) {
log.warn(e, "Exception occurred while waiting for segments to load. Exiting.");
// Update the status and return.
updateStatus(State.FAILED, startTime);
return;
}
// Update the status.
log.info("Segment loading completed for datasource[%s]", datasource);
Expand All @@ -213,6 +212,33 @@ private void waitIfNeeded(long waitTimeMillis) throws Exception
/**
* Updates the {@link #status} with the latest details based on {@link #versionLoadStatusReference}
*/
private void updateStatus(List<Object> row, AtomicReference<Boolean> hasAnySegmentBeenLoaded)
{
long runningMillis = new Interval(DateTimes.nowUtc(), DateTimes.nowUtc()).toDurationMillis();
VersionLoadStatus versionLoadStatus = new VersionLoadStatus(
(int) row.get(0),
(int) row.get(1),
(int) row.get(2),
(int) row.get(3),
(int) row.get(4)
);
versionLoadStatusReference.set(versionLoadStatus);
hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || versionLoadStatus.getUsedSegments() > 0);
status.set(
new SegmentLoadWaiterStatus(
State.WAITING,
DateTimes.nowUtc(),
runningMillis,
totalSegmentsGenerated,
versionLoadStatus.getUsedSegments(),
versionLoadStatus.getPrecachedSegments(),
versionLoadStatus.getOnDemandSegments(),
versionLoadStatus.getPendingSegments(),
versionLoadStatus.getUnknownSegments()
)
);
}

private void updateStatus(State state, DateTime startTime)
{
long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis();
Expand All @@ -232,31 +258,6 @@ private void updateStatus(State state, DateTime startTime)
);
}

/**
* Uses {@link #brokerClient} to fetch latest load status for a given set of versions. Converts the response into a
* {@link VersionLoadStatus} and returns it.
*/
private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception
{
Request request = brokerClient.makeRequest(HttpMethod.POST, "/druid/v2/sql/");
SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsConditionString),
ResultFormat.OBJECTLINES,
false, false, false, null, null
);
request.setContent(MediaType.APPLICATION_JSON, objectMapper.writeValueAsBytes(sqlQuery));
String response = brokerClient.sendQuery(request);

if (response == null) {
// Unable to query broker
return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated);
} else if (response.trim().isEmpty()) {
// If no segments are returned for a version, all segments have been dropped by a drop rule.
return new VersionLoadStatus(0, 0, 0, 0, 0);
} else {
return objectMapper.readValue(response, VersionLoadStatus.class);
}
}

/**
* Takes a list of segments and creates the condition for the broker query. Directly creates a string to avoid
* computing it repeatedly.
Expand Down Expand Up @@ -423,11 +424,15 @@ public enum State
* The time spent waiting for segments to load exceeded org.apache.druid.msq.exec.SegmentLoadWaiter#TIMEOUT_DURATION_MILLIS.
* The SegmentLoadWaiter exited without failing the task.
*/
TIMED_OUT;
TIMED_OUT,
/**
* All segments which need to be loaded have been loaded, and the SegmentLoadWaiter exited successfully.
*/
DONE;

public boolean isFinished()
{
return this == SUCCESS || this == FAILED || this == TIMED_OUT;
return this == SUCCESS || this == FAILED || this == TIMED_OUT || this == DONE;
}
}

Expand Down
Loading
Loading