From 94513f07c3bb657b79df87020e2d92bddac1b4cd Mon Sep 17 00:00:00 2001 From: Anne <102554163+alovew@users.noreply.github.com> Date: Wed, 4 Jan 2023 16:07:48 -0800 Subject: [PATCH] remove config repository from config fetch activity impl scheduling (#20908) * Remove config repository from config fetch activity impl for scheduling data --- .../activities/ConfigFetchActivityImpl.java | 97 ++++++++--- .../activities/ConfigFetchActivityTest.java | 162 +++++++++--------- 2 files changed, 153 insertions(+), 106 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index 27df5e708f25..7aff6ee2c3cd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -11,15 +11,17 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionSchedule; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule.TimeUnitEnum; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron; +import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; -import io.airbyte.config.Cron; import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.ScheduleType; -import io.airbyte.config.StandardSync.Status; -import io.airbyte.config.helpers.ScheduleHelpers; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.metrics.lib.ApmTraceUtils; @@ -41,6 +43,7 @@ import java.util.Set; import java.util.TimeZone; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.joda.time.DateTimeZone; @@ -106,48 +109,48 @@ public StandardSync getStandardSync(final UUID connectionId) throws JsonValidati public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) { try { ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId())); - final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId()); - - if (standardSync.getScheduleType() != null) { - return this.getTimeToWaitFromScheduleType(standardSync, input.getConnectionId()); + final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(input.getConnectionId()); + final ConnectionRead connectionRead = connectionApi.getConnection(connectionIdRequestBody); + if (connectionRead.getScheduleType() != null) { + return this.getTimeToWaitFromScheduleType(connectionRead, input.getConnectionId()); } - return this.getTimeToWaitFromLegacy(standardSync, input.getConnectionId()); - } catch (final IOException | JsonValidationException | ConfigNotFoundException e) { + return this.getTimeToWaitFromLegacy(connectionRead, input.getConnectionId()); + } catch (final IOException | ApiException e) { throw new RetryableException(e); } } /** - * @param standardSync + * @param connectionRead * @param connectionId * @return * @throws IOException * * This method consumes the `scheduleType` and `scheduleData` fields. */ - private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final StandardSync standardSync, final UUID connectionId) throws IOException { - if (standardSync.getScheduleType() == ScheduleType.MANUAL || standardSync.getStatus() != Status.ACTIVE) { + private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId) throws IOException { + if (connectionRead.getScheduleType() == ConnectionScheduleType.MANUAL || connectionRead.getStatus() != ConnectionStatus.ACTIVE) { // Manual syncs wait for their first run return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365)); } final Optional previousJobOptional = jobPersistence.getLastReplicationJob(connectionId); - if (standardSync.getScheduleType() == ScheduleType.BASIC_SCHEDULE) { + if (connectionRead.getScheduleType() == ConnectionScheduleType.BASIC) { if (previousJobOptional.isEmpty()) { // Basic schedules don't wait for their first run. return new ScheduleRetrieverOutput(Duration.ZERO); } final Job previousJob = previousJobOptional.get(); final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond()); - final long nextRunStart = prevRunStart + ScheduleHelpers.getIntervalInSecond(standardSync.getScheduleData().getBasicSchedule()); + final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getScheduleData().getBasicSchedule()); final Duration timeToWait = Duration.ofSeconds( Math.max(0, nextRunStart - currentSecondsSupplier.get())); return new ScheduleRetrieverOutput(timeToWait); } - else { // standardSync.getScheduleType() == ScheduleType.CRON - final Cron scheduleCron = standardSync.getScheduleData().getCron(); + else { // connectionRead.getScheduleType() == ConnectionScheduleType.CRON + final ConnectionScheduleDataCron scheduleCron = connectionRead.getScheduleData().getCron(); final TimeZone timeZone = DateTimeZone.forID(scheduleCron.getCronTimeZone()).toTimeZone(); try { final CronExpression cronExpression = new CronExpression(scheduleCron.getCronExpression()); @@ -164,7 +167,7 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final StandardSync Duration timeToWait = Duration.ofSeconds( Math.max(0, nextRunStart.getTime() / MS_PER_SECOND - currentSecondsSupplier.get())); - timeToWait = addSchedulingNoiseForAllowListedWorkspace(timeToWait, standardSync); + timeToWait = addSchedulingNoiseForAllowListedWorkspace(timeToWait, connectionRead); return new ScheduleRetrieverOutput(timeToWait); } catch (final ParseException e) { throw (DateTimeException) new DateTimeException(e.getMessage()).initCause(e); @@ -172,10 +175,10 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final StandardSync } } - private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, StandardSync standardSync) { + private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, ConnectionRead connectionRead) { final UUID workspaceId; try { - workspaceId = workspaceHelper.getWorkspaceForConnectionId(standardSync.getConnectionId()); + workspaceId = workspaceHelper.getWorkspaceForConnectionId(connectionRead.getConnectionId()); } catch (JsonValidationException | ConfigNotFoundException e) { // We tolerate exceptions and fail open by doing nothing. return timeToWait; @@ -184,7 +187,7 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, // Only apply to a specific set of workspaces. return timeToWait; } - if (!standardSync.getScheduleType().equals(ScheduleType.CRON)) { + if (!connectionRead.getScheduleType().equals(ConnectionScheduleType.CRON)) { // Only apply noise to cron connections. return timeToWait; } @@ -197,22 +200,22 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, } /** - * @param standardSync + * @param connectionRead * @param connectionId * @return * @throws IOException * * This method consumes the `schedule` field. */ - private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final StandardSync standardSync, final UUID connectionId) throws IOException { - if (standardSync.getSchedule() == null || standardSync.getStatus() != Status.ACTIVE) { + private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId) throws IOException { + if (connectionRead.getSchedule() == null || connectionRead.getStatus() != ConnectionStatus.ACTIVE) { // Manual syncs wait for their first run return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365)); } final Optional previousJobOptional = jobPersistence.getLastReplicationJob(connectionId); - if (previousJobOptional.isEmpty() && standardSync.getSchedule() != null) { + if (previousJobOptional.isEmpty() && connectionRead.getSchedule() != null) { // Non-manual syncs don't wait for their first run return new ScheduleRetrieverOutput(Duration.ZERO); } @@ -220,7 +223,7 @@ private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final StandardSync stand final Job previousJob = previousJobOptional.get(); final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond()); - final long nextRunStart = prevRunStart + ScheduleHelpers.getIntervalInSecond(standardSync.getSchedule()); + final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getSchedule()); final Duration timeToWait = Duration.ofSeconds( Math.max(0, nextRunStart - currentSecondsSupplier.get())); @@ -261,4 +264,46 @@ public Optional getStatus(final UUID connectionId) { } } + private Long getIntervalInSecond(final ConnectionScheduleDataBasicSchedule schedule) { + return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits(); + } + + private Long getIntervalInSecond(final ConnectionSchedule schedule) { + return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits(); + } + + private Long getSecondsInUnit(final TimeUnitEnum timeUnitEnum) { + switch (timeUnitEnum) { + case MINUTES: + return TimeUnit.MINUTES.toSeconds(1); + case HOURS: + return TimeUnit.HOURS.toSeconds(1); + case DAYS: + return TimeUnit.DAYS.toSeconds(1); + case WEEKS: + return TimeUnit.DAYS.toSeconds(1) * 7; + case MONTHS: + return TimeUnit.DAYS.toSeconds(1) * 30; + default: + throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnitEnum); + } + } + + private Long getSecondsInUnit(final ConnectionSchedule.TimeUnitEnum timeUnitEnum) { + switch (timeUnitEnum) { + case MINUTES: + return TimeUnit.MINUTES.toSeconds(1); + case HOURS: + return TimeUnit.HOURS.toSeconds(1); + case DAYS: + return TimeUnit.DAYS.toSeconds(1); + case WEEKS: + return TimeUnit.DAYS.toSeconds(1) * 7; + case MONTHS: + return TimeUnit.DAYS.toSeconds(1) * 30; + default: + throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnitEnum); + } + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java index b7b7d8fae438..e6cb60e04d4a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java @@ -8,13 +8,15 @@ import static org.mockito.Mockito.when; import io.airbyte.api.client.generated.ConnectionApi; -import io.airbyte.config.BasicSchedule; -import io.airbyte.config.Cron; -import io.airbyte.config.Schedule; -import io.airbyte.config.ScheduleData; -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.ScheduleType; -import io.airbyte.config.StandardSync.Status; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionSchedule; +import io.airbyte.api.client.model.generated.ConnectionScheduleData; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule.TimeUnitEnum; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron; +import io.airbyte.api.client.model.generated.ConnectionScheduleType; +import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.JobPersistence; @@ -62,45 +64,45 @@ class ConfigFetchActivityTest { private ConfigFetchActivityImpl configFetchActivity; private final static UUID connectionId = UUID.randomUUID(); - private final static StandardSync standardSyncWithLegacySchedule = new StandardSync() - .withSchedule(new Schedule() - .withTimeUnit(Schedule.TimeUnit.MINUTES) - .withUnits(5L)) - .withStatus(Status.ACTIVE); - - private final static StandardSync standardSyncWithManualScheduleType = new StandardSync() - .withScheduleType(ScheduleType.MANUAL) - .withStatus(Status.ACTIVE); - - private final static StandardSync standardSyncWithBasicScheduleType = new StandardSync() - .withScheduleType(ScheduleType.BASIC_SCHEDULE) - .withStatus(Status.ACTIVE) - .withScheduleData(new ScheduleData() - .withBasicSchedule(new BasicSchedule() - .withTimeUnit(BasicSchedule.TimeUnit.MINUTES) - .withUnits(5L))); + private final static ConnectionRead connectionReadWithLegacySchedule = new ConnectionRead() + .schedule(new ConnectionSchedule() + .timeUnit(ConnectionSchedule.TimeUnitEnum.MINUTES) + .units(5L)) + .status(ConnectionStatus.ACTIVE); + + private final static ConnectionRead connectionReadWithManualScheduleType = new ConnectionRead() + .scheduleType(ConnectionScheduleType.MANUAL) + .status(ConnectionStatus.ACTIVE); + + private final static ConnectionRead connectionReadWithBasicScheduleType = new ConnectionRead() + .scheduleType(ConnectionScheduleType.BASIC) + .status(ConnectionStatus.ACTIVE) + .scheduleData(new ConnectionScheduleData() + .basicSchedule(new ConnectionScheduleDataBasicSchedule() + .timeUnit(TimeUnitEnum.MINUTES) + .units(5L))); public static final String UTC = "UTC"; - private final static StandardSync standardSyncWithCronScheduleType = new StandardSync() - .withScheduleType(ScheduleType.CRON) - .withStatus(Status.ACTIVE) - .withScheduleData(new ScheduleData() - .withCron(new Cron() - .withCronExpression("0 0 12 * * ?") - .withCronTimeZone(UTC))); - - private final static StandardSync standardSyncWithScheduleDisable = new StandardSync() - .withSchedule(new Schedule() - .withTimeUnit(Schedule.TimeUnit.MINUTES) - .withUnits(5L)) - .withStatus(Status.INACTIVE); - - private final static StandardSync standardSyncWithScheduleDeleted = new StandardSync() - .withSchedule(new Schedule() - .withTimeUnit(Schedule.TimeUnit.MINUTES) - .withUnits(5L)) - .withStatus(Status.DEPRECATED); - private static final StandardSync standardSyncWithoutSchedule = new StandardSync(); + private final static ConnectionRead connectionReadWithCronScheduleType = new ConnectionRead() + .scheduleType(ConnectionScheduleType.CRON) + .status(ConnectionStatus.ACTIVE) + .scheduleData(new ConnectionScheduleData() + .cron(new ConnectionScheduleDataCron() + .cronExpression("0 0 12 * * ?") + .cronTimeZone(UTC))); + + private final static ConnectionRead connectionReadWithScheduleDisable = new ConnectionRead() + .schedule(new ConnectionSchedule() + .timeUnit(ConnectionSchedule.TimeUnitEnum.MINUTES) + .units(5L)) + .status(ConnectionStatus.INACTIVE); + + private final static ConnectionRead connectionReadWithScheduleDeleted = new ConnectionRead() + .schedule(new ConnectionSchedule() + .timeUnit(ConnectionSchedule.TimeUnitEnum.MINUTES) + .units(5L)) + .status(ConnectionStatus.DEPRECATED); + private static final ConnectionRead connectionReadWithoutSchedule = new ConnectionRead(); @BeforeEach void setup() { @@ -114,12 +116,12 @@ class TimeToWaitTest { @Test @DisplayName("Test that the job gets scheduled if it is not manual and if it is the first run with legacy schedule schema") - void testFirstJobNonManual() throws IOException, JsonValidationException, ConfigNotFoundException { + void testFirstJobNonManual() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.empty()); - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithLegacySchedule); + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithLegacySchedule); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -131,9 +133,9 @@ void testFirstJobNonManual() throws IOException, JsonValidationException, Config @Test @DisplayName("Test that the job will wait for a long time if it is manual in the legacy schedule schema") - void testManual() throws IOException, JsonValidationException, ConfigNotFoundException { - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithoutSchedule); + void testManual() throws ApiException { + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithoutSchedule); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -145,9 +147,9 @@ void testManual() throws IOException, JsonValidationException, ConfigNotFoundExc @Test @DisplayName("Test that the job will wait for a long time if it is disabled") - void testDisable() throws IOException, JsonValidationException, ConfigNotFoundException { - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithScheduleDisable); + void testDisable() throws ApiException { + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithScheduleDisable); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -159,9 +161,9 @@ void testDisable() throws IOException, JsonValidationException, ConfigNotFoundEx @Test @DisplayName("Test that the connection will wait for a long time if it is deleted") - void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundException { - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithScheduleDeleted); + void testDeleted() throws ApiException { + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithScheduleDeleted); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -173,7 +175,7 @@ void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundEx @Test @DisplayName("Test we will wait the required amount of time with legacy config") - void testWait() throws IOException, JsonValidationException, ConfigNotFoundException { + void testWait() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); @@ -183,8 +185,8 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.of(mJob)); - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithLegacySchedule); + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithLegacySchedule); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -196,7 +198,7 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep @Test @DisplayName("Test we will not wait if we are late in the legacy schedule schema") - void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotFoundException { + void testNotWaitIfLate() throws IOException, ApiException { configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi); @@ -206,8 +208,8 @@ void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotF when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.of(mJob)); - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithLegacySchedule); + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithLegacySchedule); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -221,9 +223,9 @@ void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotF @Test @DisplayName("Test that the job will wait a long time if it is MANUAL scheduleType") - void testManualScheduleType() throws IOException, JsonValidationException, ConfigNotFoundException { - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithManualScheduleType); + void testManualScheduleType() throws ApiException { + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithManualScheduleType); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -235,12 +237,12 @@ void testManualScheduleType() throws IOException, JsonValidationException, Confi @Test @DisplayName("Test that the job will be immediately scheduled if it is a BASIC_SCHEDULE type on the first run") - void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException, ConfigNotFoundException { + void testBasicScheduleTypeFirstRun() throws IOException, ApiException { when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.empty()); - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithBasicScheduleType); + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithBasicScheduleType); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -252,7 +254,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException @Test @DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run") - void testBasicScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException { + void testBasicScheduleSubsequentRun() throws IOException, ApiException { configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); when(mJob.getStartedAtInSecond()) @@ -261,8 +263,8 @@ void testBasicScheduleSubsequentRun() throws IOException, JsonValidationExceptio when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.of(mJob)); - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithBasicScheduleType); + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithBasicScheduleType); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -274,7 +276,7 @@ void testBasicScheduleSubsequentRun() throws IOException, JsonValidationExceptio @Test @DisplayName("Test that the job will wait to be scheduled if it is a CRON type") - void testCronScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException { + void testCronScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { final Calendar mockRightNow = Calendar.getInstance(TimeZone.getTimeZone(UTC)); mockRightNow.set(Calendar.HOUR_OF_DAY, 0); mockRightNow.set(Calendar.MINUTE, 0); @@ -290,8 +292,8 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.of(mJob)); - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithCronScheduleType); + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithCronScheduleType); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -303,7 +305,7 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException @Test @DisplayName("Test that the job will only be scheduled once per minimum cron interval") - void testCronScheduleMinimumInterval() throws IOException, JsonValidationException, ConfigNotFoundException { + void testCronScheduleMinimumInterval() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { final Calendar mockRightNow = Calendar.getInstance(TimeZone.getTimeZone(UTC)); mockRightNow.set(Calendar.HOUR_OF_DAY, 12); mockRightNow.set(Calendar.MINUTE, 0); @@ -320,8 +322,8 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.of(mJob)); - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithCronScheduleType); + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithCronScheduleType); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId); @@ -333,7 +335,7 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti @Test @DisplayName("Test that for specific workspace ids, we add some noise in the cron scheduling") - void testCronSchedulingNoise() throws IOException, JsonValidationException, ConfigNotFoundException { + void testCronSchedulingNoise() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { final Calendar mockRightNow = Calendar.getInstance(TimeZone.getTimeZone(UTC)); mockRightNow.set(Calendar.HOUR_OF_DAY, 0); mockRightNow.set(Calendar.MINUTE, 0); @@ -350,8 +352,8 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.of(mJob)); - when(mConfigRepository.getStandardSync(connectionId)) - .thenReturn(standardSyncWithCronScheduleType); + when(mConnectionApi.getConnection(any())) + .thenReturn(connectionReadWithCronScheduleType); final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);