Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit behavior of multiple mutations for same record in transaction in Consensus Commit #2340

Conversation

brfrn169
Copy link
Collaborator

@brfrn169 brfrn169 commented Nov 14, 2024

Description

This PR revisits the behavior of multiple mutations for the same record in a transaction in Consensus Commit.

Expected behaviors for combinations of mutations on the same record are as follows:

  1. Insert then Insert: the second Insert fails.
  2. Insert then Upsert: merge Insert and Upsert (with implicit pre-read disabled), then execute.
  3. Insert then Update: merge Insert and Update (with implicit pre-read disabled), then execute.
  4. Insert then Delete: only execute Delete. not allowed in Consensus Commit.
  5. Upsert then Insert: Insert fails.
  6. Upsert then Upsert: merge the two Upserts, then execute.
  7. Upsert then Update: merge Upsert and Update, then execute.
  8. Upsert then Delete: only execute Delete.
  9. Update then Insert: if the target record exists, Insert fails; if not, only execute Insert.
  10. Update then Upsert: if the target record exists, only execute Upsert; if not, merge Update and Upsert, then execute.
  11. Update then Update: if the target record exists, merge the two Updates; if not, do nothing.
  12. Update then Delete: only execute Delete.
  13. Delete then Insert: not allowed in Consensus Commit.
  14. Delete then Upsert: not allowed in Consensus Commit.
  15. Delete then Update: do nothing.
  16. Delete then Delete: only execute the second Delete.

I've added some test cases to ConsensusCommitIntegrationTestBase.java to check the behavior.

To achieve this behavior, we need to make the following changes:

  • Consider the write set for conditional updates.
  • Add validation to ensure that Insert is not executed on an already written record in the same transaction.
  • Updated the merge logic for the write set. Please see the inline comment for details.

[ADDED]

For the expected behaviors 1, 2, 3, and 4 (where the first operation is Insert), as @komamitsu pointed out in his comment, if the target record already exists, the transaction should fail.

For cases 1, 2, and 3, the transaction correctly fails in such situations. However, for case 4, it does not fail, as @komamitsu mentioned in the following comment:
#2340 (comment)

I think this behavior is confusing for users. To address this, we can make the following additional change:

  • Add validation to ensure that Delete is not executed on a record that has already been inserted in the same transaction.

Related issues and/or PRs

N/A

Changes made

  • Use the result that's merged with mutations in the write set for conditional updates.
  • Added validation to ensure that Insert is not executed for already written record.
  • Updated the merge logic for the write set.
  • Refactored Snapshot.

Checklist

  • I have commented my code, particularly in hard-to-understand areas.
  • I have updated the documentation to reflect the changes.
  • Any remaining open issues linked to this PR are documented and up-to-date (Jira, GitHub, etc.).
  • Tests (unit, integration, etc.) have been added for the changes.
  • My changes generate no new warnings.
  • Any dependent changes in other PRs have been merged and published.

Additional notes (optional)

The Insert, Upsert, and Update are converted to Put internally. See the following for details:

static Put createPutForInsert(Insert insert) {
PutBuilder.Buildable buildable =
Put.newBuilder()
.namespace(insert.forNamespace().orElse(null))
.table(insert.forTable().orElse(null))
.partitionKey(insert.getPartitionKey());
insert.getClusteringKey().ifPresent(buildable::clusteringKey);
insert.getColumns().values().forEach(buildable::value);
buildable.enableInsertMode();
return buildable.build();
}
static Put createPutForUpsert(Upsert upsert) {
PutBuilder.Buildable buildable =
Put.newBuilder()
.namespace(upsert.forNamespace().orElse(null))
.table(upsert.forTable().orElse(null))
.partitionKey(upsert.getPartitionKey());
upsert.getClusteringKey().ifPresent(buildable::clusteringKey);
upsert.getColumns().values().forEach(buildable::value);
buildable.enableImplicitPreRead();
return buildable.build();
}
static Put createPutForUpdate(Update update) {
PutBuilder.Buildable buildable =
Put.newBuilder()
.namespace(update.forNamespace().orElse(null))
.table(update.forTable().orElse(null))
.partitionKey(update.getPartitionKey());
update.getClusteringKey().ifPresent(buildable::clusteringKey);
update.getColumns().values().forEach(buildable::value);
if (update.getCondition().isPresent()) {
if (update.getCondition().get() instanceof UpdateIf) {
update
.getCondition()
.ifPresent(c -> buildable.condition(ConditionBuilder.putIf(c.getExpressions())));
} else {
assert update.getCondition().get() instanceof UpdateIfExists;
buildable.condition(ConditionBuilder.putIfExists());
}
} else {
buildable.condition(ConditionBuilder.putIfExists());
}
buildable.enableImplicitPreRead();
return buildable.build();
}

Release notes

Fixed the behavior of multiple mutations for the same record in a transaction in Consensus Commit.

@brfrn169 brfrn169 self-assigned this Nov 14, 2024
@@ -113,47 +115,55 @@ Isolation getIsolation() {

// Although this class is not thread-safe, this method is actually thread-safe because the readSet
// is a concurrent map
public void put(Key key, Optional<TransactionResult> result) {
public void putIntoReadSet(Key key, Optional<TransactionResult> result) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the put methods in Snapshot to make them more explicit.

Comment on lines +187 to +209
public Optional<TransactionResult> getResult(Key key) throws CrudException {
Optional<TransactionResult> result = readSet.getOrDefault(key, Optional.empty());
return mergeResult(key, result);
}

public Optional<TransactionResult> getResult(Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getSet.getOrDefault(get, Optional.empty());
return mergeResult(key, result, get.getConjunctions());
}

public Optional<Map<Snapshot.Key, TransactionResult>> getResults(Scan scan) throws CrudException {
if (!scanSet.containsKey(scan)) {
return Optional.empty();
}

Map<Key, TransactionResult> results = new LinkedHashMap<>();
for (Entry<Snapshot.Key, TransactionResult> entry : scanSet.get(scan).entrySet()) {
mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}

return Optional.of(results);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added getResult() and getResults() methods to Snapshot. These methods return results that are merged with mutations in the write set, if necessary. Following this change, we can make the mergeResult() methods private.

@@ -213,10 +207,10 @@ public void put(Put put) throws CrudException {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
put, snapshot.getFromReadSet(key).orElse(null));
put, snapshot.getResult(key).orElse(null));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the added getResult() method to use the result that's merged with a mutation for the same record in the write set for conditional updates.

@@ -227,10 +221,10 @@ public void delete(Delete delete) throws CrudException {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
delete, snapshot.getFromReadSet(key).orElse(null));
delete, snapshot.getResult(key).orElse(null));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Use the added getResult() method to use the result that's merged with a mutation for the same record in the write set for conditional deletes.

Comment on lines +138 to +141
if (put.isInsertModeEnabled()) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED.buildMessage());
}
Copy link
Collaborator Author

@brfrn169 brfrn169 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added validation to ensure that Insert is not executed on an already written record in the same transaction.

Comment on lines +145 to +156
PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut);
put.getColumns().values().forEach(putBuilder::value);

// If the implicit pre-read is enabled for the new put, it should also be enabled for the
// merged put. However, if the previous put is in insert mode, this doesn’t apply. This is
// because, in insert mode, the read set is not used during the preparation phase. Therefore,
// we only need to enable the implicit pre-read if the previous put is not in insert mode
if (put.isImplicitPreReadEnabled() && !originalPut.isInsertModeEnabled()) {
putBuilder.enableImplicitPreRead();
}

writeSet.put(key, putBuilder.build());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the merge logic for the write set. We should also merge the implicit pre-read, with one exception. Please see the comments for details.

@komamitsu
Copy link
Contributor

Insert then Upsert: merge Insert and Upsert (with implicit pre-read disabled), then execute.
Insert then Update: merge Insert and Update (with implicit pre-read disabled), then execute.
Insert then Delete: only execute Delete.

@brfrn169 I assume these 3 cases should fail if a record with the same key already exists, since the first Inserts should fail from semantic perspective. Is my understanding correct?

@brfrn169
Copy link
Collaborator Author

Insert then Upsert: merge Insert and Upsert (with implicit pre-read disabled), then execute.
Insert then Update: merge Insert and Update (with implicit pre-read disabled), then execute.
Insert then Delete: only execute Delete.

@brfrn169 I assume these 3 cases should fail if a record with the same key already exists, since the first Inserts should fail from semantic perspective. Is my understanding correct?

@komamitsu Yes, that's correct.

Strictly speaking, in Consensus Commit, when inserting a record, we don’t check if the target record exists or not. As a result, users encounter a conflict error only when committing the transaction, not when inserting the record.

@komamitsu
Copy link
Contributor

@brfrn169 Thanks for the details!

I executed this code with the implementation of this PR, but it didn't fail. I expected this failed since the INSERT conflicted with the existing record... What do you think?

    TransactionFactory factory = TransactionFactory.create("/tmp/local-pg.properties");
    DistributedTransactionManager transactionManager = factory.getTransactionManager();
    try {
      DistributedTransaction tx = transactionManager.begin();
      System.out.println(">>>> " + tx.get(
          Get.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build()));          // >>>> Optional[FilteredResult{payload=null, ycsb_key=1}]
      tx.insert(
          Insert.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build());
      tx.delete(
          Delete.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build());
      tx.commit();
    }
    finally{
      transactionManager.close();
    }

@brfrn169
Copy link
Collaborator Author

brfrn169 commented Nov 15, 2024

I executed this code with the implementation of this PR, but it didn't fail. I expected this failed since the INSERT conflicted with the existing record... What do you think?

@komamitsu That's a really good point.

As I mentioned in the previous comment, we don’t check whether the target record exists when inserting a record in Consensus Commit. As a result, if an Insert is followed by a Delete, the Insert is removed, and only the Delete is executed when committing the transaction, even if the record exists. This is a limitation of Consensus Commit.

By the way, the reason we don’t check whether the target record exists during an insert is for performance optimization. Checking for the existence of a target record requires an additional read from the database, which we wanted to avoid.

@brfrn169
Copy link
Collaborator Author

@komamitsu One possible idea is to disallow deleting a record that has been inserted in the same transaction by adding validation. What do you think?

@komamitsu
Copy link
Contributor

komamitsu commented Nov 15, 2024

@brfrn169 Thanks for the explanation. It matches my understanding from the source code.

One possible idea is to disallow deleting a record that has been inserted in the same transaction by adding validation

This seems better to me in terms of consistency. But it might be a bit too strict in terms of convenience? I want to hear opinions from @feeblefakie as well.

BTW, I also tried Upsert and Update after Insert as follows.

    TransactionFactory factory = TransactionFactory.create("/Users/mitsunorikomatsu/tmp/local-pg.properties");
    DistributedTransactionManager transactionManager = factory.getTransactionManager();
    try {
      DistributedTransaction tx = transactionManager.begin();
      System.out.println(">>>> " + tx.get(
          Get.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build()));
      tx.insert(
          Insert.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build());
      tx.upsert(
          Upsert.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .value(TextColumn.of("payload", "hello"))
              .build());
      tx.commit();
    }
    finally{
      transactionManager.close();
    }

But it failed and the error message "The record being prepared already exists" seemed a bit odd to me. It also happens with the current master branch, though.

Exception in thread "main" com.scalar.db.exception.transaction.CommitConflictException: CORE-20013: The record being prepared already exists. Transaction ID: GMiUdvxZCcPyxLLaJt5yKnfR$70452dbf-7653-46d7-af8c-85b4f3337d65
	at com.scalar.db.transaction.consensuscommit.CommitHandler.commit(CommitHandler.java:124)
	at com.scalar.db.transaction.consensuscommit.ConsensusCommit.commit(ConsensusCommit.java:227)
	at com.scalar.db.common.DecoratedDistributedTransaction.commit(DecoratedDistributedTransaction.java:129)
	at com.scalar.db.common.TransactionDecorationDistributedTransactionManager$StateManagedTransaction.commit(TransactionDecorationDistributedTransactionManager.java:147)
	at com.scalar.db.common.DecoratedDistributedTransaction.commit(DecoratedDistributedTransaction.java:129)
	at com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager$ActiveTransaction.commit(ActiveTransactionManagedDistributedTransactionManager.java:168)
	at com.scalar.db.transaction.consensuscommit.CommitHandler.main(CommitHandler.java:361)
Caused by: com.scalar.db.exception.transaction.PreparationConflictException: CORE-20013: The record being prepared already exists. Transaction ID: GMiUdvxZCcPyxLLaJt5yKnfR$70452dbf-7653-46d7-af8c-85b4f3337d65
	at com.scalar.db.transaction.consensuscommit.CommitHandler.prepare(CommitHandler.java:184)
	at com.scalar.db.transaction.consensuscommit.CommitHandler.commit(CommitHandler.java:119)
	... 6 more
Caused by: com.scalar.db.exception.storage.NoMutationException: CORE-20000: No mutation was applied
	at com.scalar.db.storage.jdbc.JdbcDatabase.put(JdbcDatabase.java:115)
	at com.scalar.db.storage.multistorage.MultiStorage.put(MultiStorage.java:105)
	at com.scalar.db.storage.multistorage.MultiStorage.mutate(MultiStorage.java:130)
	at com.scalar.db.transaction.consensuscommit.CommitHandler.lambda$prepareRecords$0(CommitHandler.java:206)
	at com.scalar.db.transaction.consensuscommit.ParallelExecutor.lambda$executeTasksInParallel$0(ParallelExecutor.java:181)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Caused by: com.scalar.db.exception.transaction.PreparationConflictException: CORE-20013: The record being prepared already exists. Transaction ID: GMiUdvxZCcPyxLLaJt5yKnfR$70452dbf-7653-46d7-af8c-85b4f3337d65

	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Caused by: com.scalar.db.exception.storage.NoMutationException: CORE-20000: No mutation was applied

	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

@brfrn169
Copy link
Collaborator Author

But it failed and the error message "The record being prepared already exists" seemed a bit odd to me. It also happens with the current master branch, though.

@komamitsu I think the error indicates that you tried to insert a record that already exists. Maybe even if you remove the Upsert from the code, you still encounter the same error, right?

@komamitsu
Copy link
Contributor

@brfrn169 Oh, sorry, I forgot to call rollback() as error handling. Probably that's the cause 🙇 Please ignore the comment about Upsert and Update.

Copy link
Contributor

@Torch3333 Torch3333 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

Copy link
Member

@josh-wong josh-wong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you!🙇🏻‍♂️

Copy link
Contributor

@feeblefakie feeblefakie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you!

@brfrn169
Copy link
Collaborator Author

@feeblefakie @komamitsu As discussed, we will add additional validation to ensure that Delete is not executed on a record that has already been inserted in the same transaction. Thanks.

@komamitsu
Copy link
Contributor

komamitsu commented Nov 18, 2024

@brfrn169 Thanks for letting me know that!

Similar validation will be introduced for the following cases?

Insert then Upsert: merge Insert and Upsert (with implicit pre-read disabled), then execute.
Insert then Update: merge Insert and Update (with implicit pre-read disabled), then execute.

@brfrn169
Copy link
Collaborator Author

Similar validation will be introduced for the following cases?

Insert then Upsert: merge Insert and Upsert (with implicit pre-read disabled), then execute.
Insert then Update: merge Insert and Update (with implicit pre-read disabled), then execute.

@komamitsu In those cases, the transaction will correctly fail with the current logic, so I don’t think we need to add validation for them. Thanks.

Comment on lines +671 to +672
CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED(
Category.USER_ERROR, "0147", "Deleting already-inserted data is not allowed", "", ""),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josh-wong Sorry, could you please review this message as well? 🙇

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brfrn169 I ran a small code to reproduce this error. I noticed the namespace, table and key of the target record weren't shown in any error message. How about adding those information into this error message? I think this is a nice-to-have improvement and won't block this PR from being approved, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brfrn169

@josh-wong Sorry, could you please review this message as well? 🙇

LGTM!👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@komamitsu Okay. I think the other messages also don’t include that information, so let’s address it in a separate PR. I’ll add it to my to-do list. Thanks.

@brfrn169
Copy link
Collaborator Author

@feeblefakie @komamitsu I've added the validation in 2d103da. Please take a look when you have time! 🙇

@brfrn169 brfrn169 requested a review from feeblefakie November 18, 2024 07:49
Comment on lines 163 to 171
Put put = writeSet.get(key);
if (put != null) {
if (put.isInsertModeEnabled()) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED.buildMessage());
}

writeSet.remove(key);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added validation to ensure that Delete is not executed on a record that has already been inserted in the same transaction.

Copy link
Contributor

@komamitsu komamitsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

Copy link
Contributor

@feeblefakie feeblefakie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants