Skip to content
This repository has been archived by the owner on Jan 16, 2023. It is now read-only.

Commit

Permalink
Progress Bar Read APIs (airbytehq#20937)
Browse files Browse the repository at this point in the history
Follow up PR to airbytehq#20787 . Make stats available to the read apis so these are available to the webapp.

After this, all that is left is writing these stats as the job progresses.

Add the required logic in JobHistoryHandler.java.

Took the chance to also rename our internal Attempt models field from id to attemptNumber to better reflect that the field stores not the row's database id, but the job's attempt number. Most of the files changes here are due to that rename.
  • Loading branch information
davinchia authored Jan 6, 2023
1 parent 7e097b7 commit 3a2b040
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -72,6 +73,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.InsertValuesStepN;
Expand Down Expand Up @@ -511,6 +513,72 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t
});
}

@Override
public Map<JobAttemptPair, AttemptStats> getAttemptStats(final List<Long> jobIds) throws IOException {
final var jobIdsStr = StringUtils.join(jobIds, ',');
return jobDatabase.query(ctx -> {
// Instead of one massive join query, separate this query into two queries for better readability
// for now.
// We can combine the queries at a later date if this still proves to be not efficient enough.
final Map<JobAttemptPair, AttemptStats> attemptStats = hydrateSyncStats(jobIdsStr, ctx);
hydrateStreamStats(jobIdsStr, ctx, attemptStats);
return attemptStats;
});
}

private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) {
final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>();
final var syncResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id,"
+ "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "FROM sync_stats stats "
+ "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id "
+ "WHERE job_id IN ( " + jobIdsStr + ");");
syncResults.forEach(r -> {
final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
final var syncStats = new SyncStats()
.withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED))
.withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES));
attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList()));
});
return attemptStats;
}

/**
* This method needed to be called after
* {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats
* has prepopulated the map.
*/
private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map<JobAttemptPair, AttemptStats> attemptStats) {
final var streamResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id, "
+ "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "FROM stream_stats stats "
+ "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id "
+ "WHERE attempt_id IN "
+ "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));");

streamResults.forEach(r -> {
final var streamSyncStats = new StreamSyncStats()
.withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE))
.withStreamName(r.get(STREAM_STATS.STREAM_NAME))
.withStats(new SyncStats()
.withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED))
.withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES)));

final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
if (!attemptStats.containsKey(key)) {
LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key);
return;
}
attemptStats.get(key).perStreamStats().add(streamSyncStats);
});
}

@Override
public List<NormalizationSummary> getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
Expand Down Expand Up @@ -849,7 +917,7 @@ private static Job getJobFromRecord(final Record record) {

private static Attempt getAttemptFromRecord(final Record record) {
return new Attempt(
record.get(ATTEMPT_NUMBER, Long.class),
record.get(ATTEMPT_NUMBER, int.class),
record.get(JOB_ID, Long.class),
Path.of(record.get("log_path", String.class)),
record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface JobPersistence {
*/
record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStats) {}

record JobAttemptPair(long id, int attemptNumber) {}

/**
* Retrieve the combined and per stream stats for a single attempt.
*
Expand All @@ -57,6 +59,19 @@ record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStat
*/
AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException;

/**
* Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to
* avoid overloading the database with too many queries.
* <p>
* This implementation is intended to utilise complex joins under the hood to reduce the potential
* N+1 database pattern.
*
* @param jobIds
* @return
* @throws IOException
*/
Map<JobAttemptPair, AttemptStats> getAttemptStats(List<Long> jobIds) throws IOException;

List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;

Job getJob(long jobId) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class Attempt {

private final long id;
private final int attemptNumber;
private final long jobId;
private final JobOutput output;
private final AttemptStatus status;
Expand All @@ -23,7 +23,7 @@ public class Attempt {
private final long createdAtInSecond;
private final Long endedAtInSecond;

public Attempt(final long id,
public Attempt(final int attemptNumber,
final long jobId,
final Path logPath,
final @Nullable JobOutput output,
Expand All @@ -32,7 +32,7 @@ public Attempt(final long id,
final long createdAtInSecond,
final long updatedAtInSecond,
final @Nullable Long endedAtInSecond) {
this.id = id;
this.attemptNumber = attemptNumber;
this.jobId = jobId;
this.output = output;
this.status = status;
Expand All @@ -43,8 +43,8 @@ public Attempt(final long id,
this.endedAtInSecond = endedAtInSecond;
}

public long getId() {
return id;
public int getAttemptNumber() {
return attemptNumber;
}

public long getJobId() {
Expand Down Expand Up @@ -92,7 +92,7 @@ public boolean equals(final Object o) {
return false;
}
final Attempt attempt = (Attempt) o;
return id == attempt.id &&
return attemptNumber == attempt.attemptNumber &&
jobId == attempt.jobId &&
updatedAtInSecond == attempt.updatedAtInSecond &&
createdAtInSecond == attempt.createdAtInSecond &&
Expand All @@ -105,13 +105,13 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
}

@Override
public String toString() {
return "Attempt{" +
"id=" + id +
"id=" + attemptNumber +
", jobId=" + jobId +
", output=" + output +
", status=" + status +
Expand Down
Loading

0 comments on commit 3a2b040

Please sign in to comment.