Skip to content
This repository has been archived by the owner on Jan 16, 2023. It is now read-only.

Commit

Permalink
Remove the quarantine status (airbytehq#21088)
Browse files Browse the repository at this point in the history
* Rm temporal version

* Remove temporal version

* Update the replayed workflow

* Remove quarantine information
  • Loading branch information
benmoriceau authored Jan 6, 2023
1 parent f9fc56c commit 3c52168
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,6 @@ public ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClie
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to having COMPLETED status.", connectionId));
}

if (workflowState.isQuarantined()) {
throw new UnreachableWorkflowException(
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to being in a quarantined state.", connectionId));
}

return connectionManagerWorkflow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -84,22 +83,4 @@ class JobInformation {
@QueryMethod
JobInformation getJobInformation();

@Data
@NoArgsConstructor
@AllArgsConstructor
class QuarantinedInformation {

private UUID connectionId;
private long jobId;
private int attemptId;
private boolean isQuarantined;

}

/**
* Return if a job is stuck or not with the job information
*/
@QueryMethod
QuarantinedInformation getQuarantinedInformation();

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField;
import java.util.UUID;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;

Expand All @@ -32,6 +33,8 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
private final boolean resetConnection = false;
@Deprecated
private final boolean continueAsReset = false;
@Deprecated
@Getter(AccessLevel.NONE)
private boolean quarantined = false;
private boolean success = true;
private boolean cancelledForReset = false;
Expand Down Expand Up @@ -88,14 +91,6 @@ public void setFailed(final boolean failed) {
this.failed = failed;
}

public void setQuarantined(final boolean quarantined) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.QUARANTINED,
quarantined);
stateChangedListener.addEvent(id, event);
this.quarantined = quarantined;
}

public void setSuccess(final boolean success) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.SUCCESS,
Expand Down Expand Up @@ -138,7 +133,6 @@ public void reset() {
this.setCancelled(false);
this.setFailed(false);
this.setSuccess(false);
this.setQuarantined(false);
this.setDoneWaiting(false);
this.setSkipSchedulingNextWorkflow(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ enum StateField {
FAILED,
RESET,
CONTINUE_AS_RESET,
QUARANTINED,
SUCCESS,
CANCELLED_FOR_RESET,
RESET_WITH_SCHEDULING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,54 +734,12 @@ void testResetConnectionDeletedWorkflow() throws IOException {

}

@Test
@DisplayName("Test manual operation on quarantined workflow causes a restart")
void testManualOperationOnQuarantinedWorkflow() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isQuarantined()).thenReturn(true);

final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mNewWorkflowState = mock(WorkflowState.class);
when(mNewConnectionManagerWorkflow.getState()).thenReturn(mNewWorkflowState);
when(mNewWorkflowState.isRunning()).thenReturn(false).thenReturn(true);
when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID));
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
final BatchRequest mBatchRequest = mock(BatchRequest.class);
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);

when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow, mConnectionManagerWorkflow,
mNewConnectionManagerWorkflow);

final WorkflowStub mWorkflowStub = mock(WorkflowStub.class);
when(workflowClient.newUntypedWorkflowStub(anyString())).thenReturn(mWorkflowStub);

final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID);

assertTrue(result.getJobId().isPresent());
assertEquals(JOB_ID, result.getJobId().get());
assertFalse(result.getFailingReason().isPresent());
verify(workflowClient).signalWithStart(mBatchRequest);
verify(mWorkflowStub).terminate(anyString());

// Verify that the submitManualSync signal was passed to the batch request by capturing the
// argument,
// executing the signal, and verifying that the desired signal was executed
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
final Proc signal = batchRequestAddArgCaptor.getValue();
signal.apply();
verify(mNewConnectionManagerWorkflow).submitManualSync();
}

@Test
@DisplayName("Test manual operation on completed workflow causes a restart")
void testManualOperationOnCompletedWorkflow() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isQuarantined()).thenReturn(false);
when(mWorkflowState.isDeleted()).thenReturn(false);
when(workflowServiceBlockingStub.describeWorkflowExecution(any()))
.thenReturn(DescribeWorkflowExecutionResponse.newBuilder().setWorkflowExecutionInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,19 +495,6 @@ public JobInformation getJobInformation() {
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber);
}

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
public QuarantinedInformation getQuarantinedInformation() {
final Long jobId = workflowInternalState.getJobId() != null ? workflowInternalState.getJobId() : NON_RUNNING_JOB_ID;
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId));
return new QuarantinedInformation(
connectionId,
jobId,
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber,
workflowState.isQuarantined());
}

/**
* return true if the workflow is in a state that require it to continue. If the state is to process
* an update or delete the workflow, it won't continue with a run of the {@link SyncWorkflow} but it
Expand Down

0 comments on commit 3c52168

Please sign in to comment.