Skip to content
This repository has been archived by the owner on Jan 16, 2023. It is now read-only.

Commit

Permalink
Remove workspace helper from fetchConfigActivity (airbytehq#21048)
Browse files Browse the repository at this point in the history
* Remove workspace helper and replace with workspaceApi
  • Loading branch information
alovew authored Jan 5, 2023
1 parent 6c08f43 commit 84cd154
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.micronaut.context.BeanProvider;
Expand Down Expand Up @@ -73,6 +74,11 @@ public ConnectionApi connectionApi(final ApiClient apiClient) {
return new ConnectionApi(apiClient);
}

@Singleton
public WorkspaceApi workspaceApi(final ApiClient apiClient) {
return new WorkspaceApi(apiClient);
}

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import io.airbyte.config.SlackNotificationConfiguration;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.notification.SlackNotificationClient;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivity;
import io.airbyte.workers.temporal.scheduling.activities.SlackConfigActivity;
import io.temporal.workflow.Workflow;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -25,6 +25,9 @@
@Slf4j
public class ConnectionNotificationWorkflowImpl implements ConnectionNotificationWorkflow {

private static final String GET_BREAKING_CHANGE_TAG = "get_breaking_change";
private static final int GET_BREAKING_CHANGE_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private NotifySchemaChangeActivity notifySchemaChangeActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
Expand All @@ -35,14 +38,20 @@ public class ConnectionNotificationWorkflowImpl implements ConnectionNotificatio
@Override
public boolean sendSchemaChangeNotification(final UUID connectionId)
throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException {
final StandardSync standardSync = configFetchActivity.getStandardSync(connectionId);
final Optional<SlackNotificationConfiguration> slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId);
if (slackConfig.isPresent()) {
final Notification notification =
new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false)
.withSlackConfiguration(slackConfig.get());
final SlackNotificationClient notificationClient = new SlackNotificationClient(notification);
return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, standardSync.getBreakingChange());
final int getBreakingChangeVersion =
Workflow.getVersion(GET_BREAKING_CHANGE_TAG, Workflow.DEFAULT_VERSION, GET_BREAKING_CHANGE_VERSION);
if (getBreakingChangeVersion >= GET_BREAKING_CHANGE_VERSION) {
final Optional<Boolean> breakingChange = configFetchActivity.getBreakingChange(connectionId);
final Optional<SlackNotificationConfiguration> slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId);
if (slackConfig.isPresent() && breakingChange.isPresent()) {
final Notification notification =
new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false)
.withSlackConfiguration(slackConfig.get());
final SlackNotificationClient notificationClient = new SlackNotificationClient(notification);
return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, breakingChange.get());
} else {
return false;
}
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -27,6 +23,9 @@ public interface ConfigFetchActivity {
@ActivityMethod
Optional<ConnectionStatus> getStatus(UUID connectionId);

@ActivityMethod
public Optional<Boolean> getBreakingChange(final UUID connectionId);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -45,8 +44,6 @@ class ScheduleRetrieverOutput {

}

StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException;

/**
* Return how much time to wait before running the next sync. It will query the DB to get the last
* starting time of the latest terminal job (Failed, canceled or successful) and return the amount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.ConnectionRead;
Expand All @@ -19,16 +20,12 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Named;
Expand Down Expand Up @@ -67,43 +64,25 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"));
private static final long SCHEDULING_NOISE_CONSTANT = 15;

private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final WorkspaceHelper workspaceHelper;
private final WorkspaceApi workspaceApi;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;
private final ConnectionApi connectionApi;

public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier,
connectionApi);
}

@VisibleForTesting
protected ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final WorkspaceHelper workspaceHelper,
protected ConfigFetchActivityImpl(final JobPersistence jobPersistence,
final WorkspaceApi workspaceApi,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.workspaceHelper = workspaceHelper;
this.workspaceApi = workspaceApi;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
this.connectionApi = connectionApi;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException {
return configRepository.getStandardSync(connectionId);
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) {
Expand Down Expand Up @@ -176,10 +155,12 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRe
}

private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, ConnectionRead connectionRead) {
final UUID workspaceId;
UUID workspaceId;
try {
workspaceId = workspaceHelper.getWorkspaceForConnectionId(connectionRead.getConnectionId());
} catch (JsonValidationException | ConfigNotFoundException e) {
ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionRead.getConnectionId());
final WorkspaceRead workspaceRead = workspaceApi.getWorkspaceByConnectionId(connectionIdRequestBody);
workspaceId = workspaceRead.getWorkspaceId();
} catch (ApiException e) {
// We tolerate exceptions and fail open by doing nothing.
return timeToWait;
}
Expand Down Expand Up @@ -264,6 +245,19 @@ public Optional<ConnectionStatus> getStatus(final UUID connectionId) {
}
}

@Override
public Optional<Boolean> getBreakingChange(final UUID connectionId) {
try {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
return Optional.ofNullable(connectionRead.getBreakingChange());
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's breaking change status: ", e);
return Optional.empty();
}
}

private Long getIntervalInSecond(final ConnectionScheduleDataBasicSchedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.commons.temporal.scheduling.ConnectionNotificationWorkflow;
import io.airbyte.config.SlackNotificationConfiguration;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.notification.SlackNotificationClient;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -102,7 +101,7 @@ void sendSchemaChangeNotificationNonBreakingChangeTest()

final UUID connectionId = UUID.randomUUID();

when(mConfigFetchActivity.getStandardSync(connectionId)).thenReturn(new StandardSync().withBreakingChange(false));
when(mConfigFetchActivity.getBreakingChange(connectionId)).thenReturn(Optional.of(false));
workflow.sendSchemaChangeNotification(connectionId);

verify(mNotifySchemaChangeActivity, times(1)).notifySchemaChange(any(SlackNotificationClient.class), any(UUID.class), any(boolean.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.api.client.model.generated.ConnectionSchedule;
Expand All @@ -17,10 +18,9 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput;
Expand All @@ -46,15 +46,11 @@ class ConfigFetchActivityTest {

private static final Integer SYNC_JOB_MAX_ATTEMPTS = 3;

@Mock
private ConfigRepository mConfigRepository;

@Mock
private JobPersistence mJobPersistence;

@Mock
private WorkspaceHelper mWorkspaceHelper;

private WorkspaceApi mWorkspaceApi;
@Mock
private Job mJob;

Expand Down Expand Up @@ -107,7 +103,7 @@ class ConfigFetchActivityTest {
@BeforeEach
void setup() {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS,
() -> Instant.now().getEpochSecond(), mConnectionApi);
}

Expand Down Expand Up @@ -177,7 +173,7 @@ void testDeleted() throws ApiException {
@DisplayName("Test we will wait the required amount of time with legacy config")
void testWait() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand All @@ -200,7 +196,7 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep
@DisplayName("Test we will not wait if we are late in the legacy schedule schema")
void testNotWaitIfLate() throws IOException, ApiException {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -255,7 +251,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, ApiException {
@Test
@DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run")
void testBasicScheduleSubsequentRun() throws IOException, ApiException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);
configFetchActivity = new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -283,10 +279,10 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.randomUUID());
when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID()));

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJobPersistence.getLastReplicationJob(connectionId))
Expand All @@ -312,10 +308,10 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.randomUUID());
when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID()));

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
Expand All @@ -342,10 +338,11 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"));
when(mWorkspaceApi.getWorkspaceByConnectionId(any()))
.thenReturn(new WorkspaceRead().workspaceId(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d")));

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
Expand All @@ -371,7 +368,7 @@ class TestGetMaxAttempt {
void testGetMaxAttempt() {
final int maxAttempt = 15031990;
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt())
.isEqualTo(maxAttempt);
}
Expand Down

0 comments on commit 84cd154

Please sign in to comment.