Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport to branch(3) : Optimize Coordinator#getStateForGroupCommit() by first looking up the coordinator table with parent transaction ID #2338

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,41 +90,30 @@ public Optional<Coordinator.State> getState(String id) throws CoordinatorExcepti

@VisibleForTesting
Optional<Coordinator.State> getStateForGroupCommit(String fullId) throws CoordinatorException {
// Reading a coordinator state is likely to occur during lazy recovery, as follows:
// 1. Transaction T1 starts and creates PREPARED state records but hasn't committed or aborted
// yet.
// 2. Transaction T2 starts and reads the PREPARED state records created by T1.
// 3. T2 reads the coordinator table record for T1 to decide whether to roll back or roll
// forward.
//
// The likelihood of step 2 would increase if T1 is delayed.
//
// With the group commit feature enabled, delayed transactions are isolated from a normal group
// that is looked up by a parent ID into a delayed group that is looked up by a full ID.
// Therefore, looking up with the full transaction ID should be tried first to minimize read
// operations as much as possible.

// Scan with the full ID for a delayed group that contains only a single transaction.
// The normal lookup logic can be used as is.
Optional<State> stateOfDelayedTxn = get(createGetWith(fullId));
if (stateOfDelayedTxn.isPresent()) {
return stateOfDelayedTxn;
}

// Scan with the parent ID for a normal group that contains multiple transactions.
Keys<String, String, String> idForGroupCommit = keyManipulator.keysFromFullKey(fullId);

String parentId = idForGroupCommit.parentKey;
String childId = idForGroupCommit.childKey;
Get get = createGetWith(parentId);
Optional<State> state = get(get);
return state.flatMap(
s -> {
if (s.getChildIds().contains(childId)) {
return state;
}
return Optional.empty();
});
// The current implementation is optimized for cases where most transactions are
// group-committed. It first looks up a transaction state using the parent ID with a single read
// operation. If no matching transaction state is found (i.e., the transaction was delayed and
// committed individually), it issues an additional read operation using the full ID.
Optional<State> stateContainingTargetTxId =
state.flatMap(
s -> {
if (s.getChildIds().contains(childId)) {
return state;
}
return Optional.empty();
});
if (stateContainingTargetTxId.isPresent()) {
return stateContainingTargetTxId;
}

return get(createGetWith(fullId));
}

public void putState(Coordinator.State state) throws CoordinatorException {
Expand Down Expand Up @@ -253,7 +242,8 @@ private void putStateForLazyRecoveryRollbackForGroupCommit(String id)
putState(new Coordinator.State(id, TransactionState.ABORTED));
}

private Get createGetWith(String id) {
@VisibleForTesting
Get createGetWith(String id) {
return new Get(new Key(Attribute.toIdValue(id)))
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(coordinatorNamespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,9 @@ public void getState_TransactionIdForGroupCommitGivenAndParentIdAndChildIdMatch_
// The IDs used to find the state are:
// - parentId:childId1
// - parentId:childId2
doReturn(
// For the first call,
// - The first get with the full ID shouldn't find a state.
Optional.empty(),
// - The second get with the parent ID should return the state.
Optional.of(resultForGroupCommitState),
// For the second call,
// - The first get with the full ID shouldn't find a state.
Optional.empty(),
// - The second get with the parent ID should return the state.
Optional.of(resultForGroupCommitState))
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(any(Get.class));
.get(coordinator.createGetWith(parentId));

// Act
Optional<Coordinator.State> state1 = spiedCoordinator.getState(fullId1);
Expand All @@ -291,9 +281,9 @@ public void getState_TransactionIdForGroupCommitGivenAndParentIdAndChildIdMatch_
assertThat(state1.get().getCreatedAt()).isEqualTo(ANY_TIME_1);
verify(spiedCoordinator).getStateForGroupCommit(fullId1);
verify(spiedCoordinator).getStateForGroupCommit(fullId2);
verify(storage, times(4)).get(getArgumentCaptor.capture());
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Arrays.asList(fullId1, parentId, fullId2, parentId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, parentId));
}

@ParameterizedTest
Expand All @@ -310,6 +300,20 @@ public void getState_TransactionIdForSingleCommitGivenAndFullIdMatches_ShouldRet
String childId = UUID.randomUUID().toString();
String fullId = keyManipulator.fullKey(parentId, childId);
List<String> childIds = Collections.emptyList();
String dummyChildId1 = UUID.randomUUID().toString();
String dummyChildId2 = UUID.randomUUID().toString();
List<String> dummyChildIds = Arrays.asList(dummyChildId1, dummyChildId2);

Result resultForGroupCommitState = mock(Result.class);
when(resultForGroupCommitState.getValue(Attribute.ID))
.thenReturn(Optional.of(new TextValue(Attribute.ID, parentId)));
when(resultForGroupCommitState.getValue(Attribute.CHILD_IDS))
.thenReturn(
Optional.of(new TextValue(Attribute.CHILD_IDS, Joiner.on(',').join(dummyChildIds))));
when(resultForGroupCommitState.getValue(Attribute.STATE))
.thenReturn(Optional.of(new IntValue(Attribute.STATE, transactionState.get())));
when(resultForGroupCommitState.getValue(Attribute.CREATED_AT))
.thenReturn(Optional.of(new BigIntValue(Attribute.CREATED_AT, ANY_TIME_1)));

Result resultForSingleCommitState = mock(Result.class);
when(resultForSingleCommitState.getValue(Attribute.ID))
Expand All @@ -323,13 +327,18 @@ public void getState_TransactionIdForSingleCommitGivenAndFullIdMatches_ShouldRet

// Assuming these states exist:
//
// id | child_ids | state
// ------------------+-----------+----------
// parentId:childId | [] | COMMITTED
// id | child_ids | state
// ------------------+----------------------+----------
// parentId:childId | [childId1, childId2] | COMMITTED
//
// The IDs used to find the state are:
// - parentId:childId
doReturn(Optional.of(resultForSingleCommitState)).when(storage).get(any(Get.class));
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(coordinator.createGetWith(parentId));
doReturn(Optional.of(resultForSingleCommitState))
.when(storage)
.get(coordinator.createGetWith(fullId));

// Act
Optional<Coordinator.State> state = spiedCoordinator.getState(fullId);
Expand All @@ -341,9 +350,9 @@ public void getState_TransactionIdForSingleCommitGivenAndFullIdMatches_ShouldRet
Assertions.assertThat(state.get().getState()).isEqualTo(transactionState);
assertThat(state.get().getCreatedAt()).isEqualTo(ANY_TIME_1);
verify(spiedCoordinator).getStateForGroupCommit(fullId);
verify(storage).get(getArgumentCaptor.capture());
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Collections.singletonList(fullId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, fullId));
}

@ParameterizedTest
Expand Down Expand Up @@ -381,14 +390,11 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
//
// The IDs used to find the state are:
// - parentId:childIdX
doReturn(
// The first get with the full ID should return empty.
Optional.empty(),
// The second get with the parent ID should return a state, but it doesn't contain the
// child ID.
Optional.of(resultForGroupCommitState))
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(any(Get.class));
.get(coordinator.createGetWith(parentId));

doReturn(Optional.empty()).when(storage).get(coordinator.createGetWith(targetFullId));

// Act
Optional<Coordinator.State> state = spiedCoordinator.getState(targetFullId);
Expand All @@ -398,7 +404,7 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
verify(spiedCoordinator).getStateForGroupCommit(targetFullId);
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Arrays.asList(targetFullId, parentId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, targetFullId));
}

@ParameterizedTest
Expand All @@ -413,11 +419,23 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String parentId = keyManipulator.generateParentKey();
List<String> childIds =
Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString());

// Look up with the same parent ID and a wrong child ID.
// But the full ID matches the single committed state.
String targetFullId = keyManipulator.fullKey(parentId, UUID.randomUUID().toString());

Result resultForGroupCommitState = mock(Result.class);
when(resultForGroupCommitState.getValue(Attribute.ID))
.thenReturn(Optional.of(new TextValue(Attribute.ID, parentId)));
when(resultForGroupCommitState.getValue(Attribute.CHILD_IDS))
.thenReturn(Optional.of(new TextValue(Attribute.CHILD_IDS, Joiner.on(',').join(childIds))));
when(resultForGroupCommitState.getValue(Attribute.STATE))
.thenReturn(Optional.of(new IntValue(Attribute.STATE, transactionState.get())));
when(resultForGroupCommitState.getValue(Attribute.CREATED_AT))
.thenReturn(Optional.of(new BigIntValue(Attribute.CREATED_AT, ANY_TIME_1)));

Result resultForSingleCommitState = mock(Result.class);
when(resultForSingleCommitState.getValue(Attribute.ID))
.thenReturn(Optional.of(new TextValue(Attribute.ID, targetFullId)));
Expand All @@ -437,7 +455,12 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
//
// The IDs used to find the state are:
// - parentId:childIdX
doReturn(Optional.of(resultForSingleCommitState)).when(storage).get(any(Get.class));
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(coordinator.createGetWith(parentId));
doReturn(Optional.of(resultForSingleCommitState))
.when(storage)
.get(coordinator.createGetWith(targetFullId));

// Act
Optional<Coordinator.State> state = spiedCoordinator.getState(targetFullId);
Expand All @@ -449,9 +472,9 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
Assertions.assertThat(state.get().getState()).isEqualTo(transactionState);
assertThat(state.get().getCreatedAt()).isEqualTo(ANY_TIME_1);
verify(spiedCoordinator).getStateForGroupCommit(targetFullId);
verify(storage).get(getArgumentCaptor.capture());
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Collections.singletonList(targetFullId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, targetFullId));
}

@ParameterizedTest
Expand Down Expand Up @@ -491,7 +514,10 @@ public void getState_TransactionIdGivenButNoIdMatches_ShouldReturnEmpty(
//
// The IDs used to find the state are:
// - parentId:childIdY
when(storage.get(any(Get.class))).thenReturn(Optional.empty());
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(coordinator.createGetWith(parentId));
doReturn(Optional.empty()).when(storage).get(coordinator.createGetWith(targetFullId));

// Act
Optional<Coordinator.State> state = spiedCoordinator.getState(targetFullId);
Expand All @@ -501,7 +527,7 @@ public void getState_TransactionIdGivenButNoIdMatches_ShouldReturnEmpty(
verify(spiedCoordinator).getStateForGroupCommit(targetFullId);
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Arrays.asList(targetFullId, parentId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, targetFullId));
}

@Test
Expand Down