Skip to content
This repository has been archived by the owner on Jan 16, 2023. It is now read-only.

Commit

Permalink
remove config repository from config fetch activity impl scheduling (a…
Browse files Browse the repository at this point in the history
…irbytehq#20908)

* Remove config repository from config fetch activity impl for scheduling data
  • Loading branch information
alovew authored Jan 5, 2023
1 parent 16a591e commit 94513f0
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.api.client.model.generated.ConnectionSchedule;
import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule;
import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule.TimeUnitEnum;
import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
Expand All @@ -41,6 +43,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -106,48 +109,48 @@ public StandardSync getStandardSync(final UUID connectionId) throws JsonValidati
public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId()));
final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId());

if (standardSync.getScheduleType() != null) {
return this.getTimeToWaitFromScheduleType(standardSync, input.getConnectionId());
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(input.getConnectionId());
final ConnectionRead connectionRead = connectionApi.getConnection(connectionIdRequestBody);
if (connectionRead.getScheduleType() != null) {
return this.getTimeToWaitFromScheduleType(connectionRead, input.getConnectionId());
}
return this.getTimeToWaitFromLegacy(standardSync, input.getConnectionId());
} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
return this.getTimeToWaitFromLegacy(connectionRead, input.getConnectionId());
} catch (final IOException | ApiException e) {
throw new RetryableException(e);
}
}

/**
* @param standardSync
* @param connectionRead
* @param connectionId
* @return
* @throws IOException
*
* This method consumes the `scheduleType` and `scheduleData` fields.
*/
private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final StandardSync standardSync, final UUID connectionId) throws IOException {
if (standardSync.getScheduleType() == ScheduleType.MANUAL || standardSync.getStatus() != Status.ACTIVE) {
private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId) throws IOException {
if (connectionRead.getScheduleType() == ConnectionScheduleType.MANUAL || connectionRead.getStatus() != ConnectionStatus.ACTIVE) {
// Manual syncs wait for their first run
return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365));
}

final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connectionId);

if (standardSync.getScheduleType() == ScheduleType.BASIC_SCHEDULE) {
if (connectionRead.getScheduleType() == ConnectionScheduleType.BASIC) {
if (previousJobOptional.isEmpty()) {
// Basic schedules don't wait for their first run.
return new ScheduleRetrieverOutput(Duration.ZERO);
}
final Job previousJob = previousJobOptional.get();
final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond());
final long nextRunStart = prevRunStart + ScheduleHelpers.getIntervalInSecond(standardSync.getScheduleData().getBasicSchedule());
final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getScheduleData().getBasicSchedule());
final Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart - currentSecondsSupplier.get()));
return new ScheduleRetrieverOutput(timeToWait);
}

else { // standardSync.getScheduleType() == ScheduleType.CRON
final Cron scheduleCron = standardSync.getScheduleData().getCron();
else { // connectionRead.getScheduleType() == ConnectionScheduleType.CRON
final ConnectionScheduleDataCron scheduleCron = connectionRead.getScheduleData().getCron();
final TimeZone timeZone = DateTimeZone.forID(scheduleCron.getCronTimeZone()).toTimeZone();
try {
final CronExpression cronExpression = new CronExpression(scheduleCron.getCronExpression());
Expand All @@ -164,18 +167,18 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final StandardSync
Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart.getTime() / MS_PER_SECOND - currentSecondsSupplier.get()));

timeToWait = addSchedulingNoiseForAllowListedWorkspace(timeToWait, standardSync);
timeToWait = addSchedulingNoiseForAllowListedWorkspace(timeToWait, connectionRead);
return new ScheduleRetrieverOutput(timeToWait);
} catch (final ParseException e) {
throw (DateTimeException) new DateTimeException(e.getMessage()).initCause(e);
}
}
}

private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, StandardSync standardSync) {
private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, ConnectionRead connectionRead) {
final UUID workspaceId;
try {
workspaceId = workspaceHelper.getWorkspaceForConnectionId(standardSync.getConnectionId());
workspaceId = workspaceHelper.getWorkspaceForConnectionId(connectionRead.getConnectionId());
} catch (JsonValidationException | ConfigNotFoundException e) {
// We tolerate exceptions and fail open by doing nothing.
return timeToWait;
Expand All @@ -184,7 +187,7 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait,
// Only apply to a specific set of workspaces.
return timeToWait;
}
if (!standardSync.getScheduleType().equals(ScheduleType.CRON)) {
if (!connectionRead.getScheduleType().equals(ConnectionScheduleType.CRON)) {
// Only apply noise to cron connections.
return timeToWait;
}
Expand All @@ -197,30 +200,30 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait,
}

/**
* @param standardSync
* @param connectionRead
* @param connectionId
* @return
* @throws IOException
*
* This method consumes the `schedule` field.
*/
private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final StandardSync standardSync, final UUID connectionId) throws IOException {
if (standardSync.getSchedule() == null || standardSync.getStatus() != Status.ACTIVE) {
private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId) throws IOException {
if (connectionRead.getSchedule() == null || connectionRead.getStatus() != ConnectionStatus.ACTIVE) {
// Manual syncs wait for their first run
return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365));
}

final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connectionId);

if (previousJobOptional.isEmpty() && standardSync.getSchedule() != null) {
if (previousJobOptional.isEmpty() && connectionRead.getSchedule() != null) {
// Non-manual syncs don't wait for their first run
return new ScheduleRetrieverOutput(Duration.ZERO);
}

final Job previousJob = previousJobOptional.get();
final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond());

final long nextRunStart = prevRunStart + ScheduleHelpers.getIntervalInSecond(standardSync.getSchedule());
final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getSchedule());

final Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart - currentSecondsSupplier.get()));
Expand Down Expand Up @@ -261,4 +264,46 @@ public Optional<ConnectionStatus> getStatus(final UUID connectionId) {
}
}

private Long getIntervalInSecond(final ConnectionScheduleDataBasicSchedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

private Long getIntervalInSecond(final ConnectionSchedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

private Long getSecondsInUnit(final TimeUnitEnum timeUnitEnum) {
switch (timeUnitEnum) {
case MINUTES:
return TimeUnit.MINUTES.toSeconds(1);
case HOURS:
return TimeUnit.HOURS.toSeconds(1);
case DAYS:
return TimeUnit.DAYS.toSeconds(1);
case WEEKS:
return TimeUnit.DAYS.toSeconds(1) * 7;
case MONTHS:
return TimeUnit.DAYS.toSeconds(1) * 30;
default:
throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnitEnum);
}
}

private Long getSecondsInUnit(final ConnectionSchedule.TimeUnitEnum timeUnitEnum) {
switch (timeUnitEnum) {
case MINUTES:
return TimeUnit.MINUTES.toSeconds(1);
case HOURS:
return TimeUnit.HOURS.toSeconds(1);
case DAYS:
return TimeUnit.DAYS.toSeconds(1);
case WEEKS:
return TimeUnit.DAYS.toSeconds(1) * 7;
case MONTHS:
return TimeUnit.DAYS.toSeconds(1) * 30;
default:
throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnitEnum);
}
}

}
Loading

0 comments on commit 94513f0

Please sign in to comment.