Skip to content

Commit

Permalink
Merge pull request #182 from GoogleCloudPlatform/5789B3FF9E750C15D6B7…
Browse files Browse the repository at this point in the history
…4524C048BD98

Project import generated by Copybara.
  • Loading branch information
skuruppu authored Aug 22, 2024
2 parents d7f0559 + e732e5a commit c25ad94
Show file tree
Hide file tree
Showing 94 changed files with 2,081 additions and 453 deletions.
8 changes: 4 additions & 4 deletions backend/actions/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ absl::Status IndexEffector::Effect(const ActionContext* ctx,
Row base_row = MakeRow(op.columns, op.values);
ZETASQL_ASSIGN_OR_RETURN(Key index_key, ComputeIndexKey(base_row, index_));
ValueList index_values = ComputeIndexValues(base_row, index_);
if (ShouldFilterIndexKey(index_, index_key)) {
if (ShouldFilterIndexKeyOrValue(index_, index_key, base_row)) {
return absl::OkStatus();
}

Expand All @@ -92,7 +92,7 @@ absl::Status IndexEffector::Effect(const ActionContext* ctx,

// If a previous index entry existed, delete it.
ZETASQL_ASSIGN_OR_RETURN(Key old_index_key, ComputeIndexKey(base_row, index_));
if (!ShouldFilterIndexKey(index_, old_index_key)) {
if (!ShouldFilterIndexKeyOrValue(index_, old_index_key, base_row)) {
ctx->effects()->Delete(index_->index_data_table(), old_index_key);
}

Expand All @@ -102,7 +102,7 @@ absl::Status IndexEffector::Effect(const ActionContext* ctx,
}
ZETASQL_ASSIGN_OR_RETURN(Key new_index_key, ComputeIndexKey(base_row, index_));
ValueList index_values = ComputeIndexValues(base_row, index_);
if (ShouldFilterIndexKey(index_, new_index_key)) {
if (ShouldFilterIndexKeyOrValue(index_, new_index_key, base_row)) {
return absl::OkStatus();
}

Expand All @@ -125,7 +125,7 @@ absl::Status IndexEffector::Effect(const ActionContext* ctx,

// Compute the index key to delete.
ZETASQL_ASSIGN_OR_RETURN(Key index_key, ComputeIndexKey(base_row, index_));
if (ShouldFilterIndexKey(index_, index_key)) {
if (ShouldFilterIndexKeyOrValue(index_, index_key, base_row)) {
return absl::OkStatus();
}

Expand Down
31 changes: 22 additions & 9 deletions backend/common/indexing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

#include "backend/common/indexing.h"

#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "backend/common/rows.h"
#include "backend/datamodel/key.h"
#include "backend/datamodel/value.h"
#include "backend/schema/catalog/column.h"
#include "backend/schema/catalog/index.h"
#include "common/errors.h"
#include "common/limits.h"
#include "zetasql/base/status_macros.h"
Expand Down Expand Up @@ -60,15 +66,22 @@ ValueList ComputeIndexValues(const Row& base_row, const Index* index) {
return values;
}

bool ShouldFilterIndexKey(const Index* index, const Key& key) {
if (!index->is_null_filtered()) {
return false;
}

// Cloud Spanner only checks index key columns for null filtering.
for (int i = 0; i < index->key_columns().size(); ++i) {
if (key.ColumnValue(i).is_null()) {
return true;
bool ShouldFilterIndexKeyOrValue(const Index* index, const Key& key,
const Row& base_row) {
if (index->is_null_filtered()) {
// NULL_FILTERED index should filter the row if any of the key columns is
// NULL.
for (int i = 0; i < index->key_columns().size(); ++i) {
if (key.ColumnValue(i).is_null()) {
return true;
}
}
} else {
// Check for null filtered columns.
for (const Column* column : index->null_filtered_columns()) {
if (GetColumnValueOrNull(base_row, column->source_column()).is_null()) {
return true;
}
}
}
return false;
Expand Down
7 changes: 4 additions & 3 deletions backend/common/indexing.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ absl::StatusOr<Key> ComputeIndexKey(const Row& base_row, const Index* index);
// Computes an the ordered list of index row values using the given base row.
ValueList ComputeIndexValues(const Row& base_row, const Index* index);

// Returns true if no index entry should be added for the given base table key.
bool ShouldFilterIndexKey(const Index* index, const Key& key);

// Returns true if no index entry should be added for the given base table key
// or value.
bool ShouldFilterIndexKeyOrValue(const Index* index, const Key& key,
const Row& base_row);
} // namespace backend
} // namespace emulator
} // namespace spanner
Expand Down
1 change: 1 addition & 0 deletions backend/database/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ cc_test(
"//backend/schema/updater:schema_updater",
"//backend/transaction:read_only_transaction",
"//common:clock",
"//common:config",
"//common:errors",
"//tests/common:proto_matchers",
"@com_github_grpc_grpc//:grpc++",
Expand Down
6 changes: 6 additions & 0 deletions backend/database/database_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "backend/schema/updater/schema_updater.h"
#include "backend/transaction/options.h"
#include "common/clock.h"
#include "common/config.h"
#include "common/errors.h"

namespace google {
Expand Down Expand Up @@ -223,6 +224,9 @@ TEST_F(DatabaseTest, UpdateSchemaPartialSuccess) {
}

TEST_F(DatabaseTest, ConcurrentSchemaChangeIsAborted) {
auto current_probability = config::abort_current_transaction_probability();
config::set_abort_current_transaction_probability(0);

std::vector<std::string> create_statements = {R"(
CREATE TABLE T(
k1 INT64,
Expand Down Expand Up @@ -253,6 +257,8 @@ TEST_F(DatabaseTest, ConcurrentSchemaChangeIsAborted) {
db->UpdateSchema(SchemaChangeOperation{.statements = update_statements},
&completed_statements, &commit_ts, &backfill_status),
error::ConcurrentSchemaChangeOrReadWriteTxnInProgress());

config::set_abort_current_transaction_probability(current_probability);
}

TEST_F(DatabaseTest, SchemaChangeLocksSuccesfullyReleased) {
Expand Down
6 changes: 6 additions & 0 deletions backend/locking/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,18 @@ cc_library(
"//backend/common:ids",
"//backend/datamodel:key_range",
"//common:clock",
"//common:config",
"//common:errors",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/random",
"@com_google_absl//absl/random:distributions",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
"@com_google_zetasql//zetasql/base:ret_check",
],
)

Expand All @@ -51,6 +56,7 @@ cc_test(
srcs = ["manager_test.cc"],
deps = [
":manager",
"//backend/common:ids",
"//tests/common:proto_matchers",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/time",
Expand Down
30 changes: 28 additions & 2 deletions backend/locking/handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,31 @@

#include "backend/locking/handle.h"

#include <functional>

#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "backend/locking/manager.h"
#include "common/errors.h"

namespace google {
namespace spanner {
namespace emulator {
namespace backend {

LockHandle::LockHandle(LockManager* manager, TransactionID tid,
const std::function<absl::Status()>& abort_fn,
TransactionPriority priority)
: manager_(manager), tid_(tid), priority_(priority) {}
: manager_(manager),
tid_(tid),
try_abort_transaction_fn_(abort_fn),
priority_(priority) {}

LockHandle::~LockHandle() = default;
LockHandle::~LockHandle() {
absl::MutexLock lock(&mu_);
try_abort_transaction_fn_ = nullptr;
}

void LockHandle::EnqueueLock(const LockRequest& request) {
manager_->EnqueueLock(this, request);
Expand Down Expand Up @@ -59,6 +70,21 @@ void LockHandle::Abort(const absl::Status& status) {
status_ = status;
}

absl::Status LockHandle::TryAbortTransaction(const absl::Status& status) {
if (mu_.TryLock()) {
if (try_abort_transaction_fn_ != nullptr) {
auto aborted = try_abort_transaction_fn_();
if (aborted.ok()) {
status_ = status;
mu_.Unlock();
return absl::OkStatus();
}
}
mu_.Unlock();
}
return error::CouldNotObtainLockHandleMutex(tid_);
}

void LockHandle::Reset() {
absl::MutexLock lock(&mu_);
status_ = absl::OkStatus();
Expand Down
12 changes: 12 additions & 0 deletions backend/locking/handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,16 @@ class LockHandle {
friend class LockManager;
friend std::unique_ptr<LockHandle>::deleter_type;
LockHandle(LockManager* manager, TransactionID tid,
const std::function<absl::Status()>& abort_fn,
TransactionPriority priority);
~LockHandle();

// Aborts the requests made by this handle (and puts it in a final state).
void Abort(const absl::Status& status) ABSL_LOCKS_EXCLUDED(mu_);
// Tries to abort this transaction. This is a best effort attempt and returns
// OK only if the transaction could successfully be aborted.
absl::Status TryAbortTransaction(const absl::Status& status)
ABSL_LOCKS_EXCLUDED(mu_);

// Resets the state of this handle.
void Reset() ABSL_LOCKS_EXCLUDED(mu_);
Expand All @@ -124,6 +129,13 @@ class LockHandle {
// The ID of the transaction which owns this lock handle.
TransactionID tid_;

// A function to try to abort the underlying transaction. The function will
// be called if another transaction wants to acquire the locks held by this
// transaction. The function will return OK only if the transaction was
// successfully aborted.
// This can be nullptr if the transaction cannot be aborted.
std::function<absl::Status()> try_abort_transaction_fn_;

// The priority of the transaction which owns this lock handle.
TransactionPriority priority_;

Expand Down
54 changes: 40 additions & 14 deletions backend/locking/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,31 @@

#include "backend/locking/manager.h"

#include <functional>
#include <memory>

#include "absl/memory/memory.h"
#include "absl/random/random.h"
#include "absl/random/uniform_int_distribution.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "backend/common/ids.h"
#include "common/config.h"
#include "common/errors.h"
#include "zetasql/base/ret_check.h"

namespace google {
namespace spanner {
namespace emulator {
namespace backend {

std::unique_ptr<LockHandle> LockManager::CreateHandle(
TransactionID tid, TransactionPriority priority) {
return absl::WrapUnique(new LockHandle(this, tid, priority));
TransactionID tid, const std::function<absl::Status()>& abort_fn,
TransactionPriority priority) {
return absl::WrapUnique(new LockHandle(this, tid, abort_fn, priority));
}

void LockManager::EnqueueLock(LockHandle* handle, const LockRequest& request) {
Expand All @@ -44,31 +52,48 @@ void LockManager::EnqueueLock(LockHandle* handle, const LockRequest& request) {
}

// If there is no transaction holding the lock, we grant it.
if (active_tid_ == kInvalidTransactionID) {
active_tid_ = handle->tid();
if (active_handle_ == nullptr) {
active_handle_ = handle;
return;
}

// If the requesting transaction is already holding the lock, we grant it.
if (active_tid_ == handle->tid()) {
if (active_handle_->tid() == handle->tid()) {
return;
}

// If we reached here, another transaction is already holding the lock, deny.
handle->Abort(error::AbortConcurrentTransaction(handle->tid(), active_tid_));
// If we reached here, another transaction is already holding the lock.
// Randomly abort the current transaction to ensure that starting a new
// transaction is not blocked by the current transaction if this is waiting
// for a new transaction to finish.
absl::BitGen gen;
if (absl::uniform_int_distribution<int>(1, 100)(gen) <=
config::abort_current_transaction_probability()) {
auto could_be_aborted = active_handle_->TryAbortTransaction(
error::AbortCurrentTransaction(active_handle_->tid(), handle->tid()));
if (could_be_aborted.ok()) {
active_handle_ = handle;
return;
}
}

// Couldn't abort the transaction currently holding the lock, so abort the
// new transaction.
handle->Abort(
error::AbortConcurrentTransaction(handle->tid(), active_handle_->tid()));
}

void LockManager::UnlockAll(LockHandle* handle) {
absl::MutexLock lock(&mu_);

// If the transaction does not hold the lock, there is nothing to do.
if (active_tid_ != handle->tid()) {
if (active_handle_ == nullptr || active_handle_->tid() != handle->tid()) {
handle->Reset();
return;
}

// Clear the active transaction if it holds the lock.
active_tid_ = kInvalidTransactionID;
active_handle_ = nullptr;
handle->Reset();
}

Expand All @@ -79,11 +104,12 @@ absl::StatusOr<absl::Time> LockManager::ReserveCommitTimestamp(
// If there is no transaction holding the lock, we grant it to the transaction
// requesting commit timestamp. This can happen if transaction has empty
// mutations and write locks weren't thus acquired yet.
if (active_tid_ == kInvalidTransactionID) {
active_tid_ = handle->tid();
} else if (active_tid_ != handle->tid()) {
if (active_handle_ == nullptr) {
active_handle_ = handle;
} else if (active_handle_->tid() != handle->tid()) {
// There is another active transaction, abort this transaction.
return error::AbortConcurrentTransaction(handle->tid(), active_tid_);
return error::AbortConcurrentTransaction(handle->tid(),
active_handle_->tid());
}

pending_commit_timestamp_ = clock_->Now();
Expand All @@ -94,7 +120,7 @@ absl::Status LockManager::MarkCommitted(LockHandle* handle) {
absl::MutexLock lock(&mu_);

// This transaction should have been set as the active transaction.
ZETASQL_RET_CHECK_EQ(active_tid_, handle->tid())
ZETASQL_RET_CHECK_EQ(active_handle_->tid(), handle->tid())
<< absl::Substitute("Transaction $0 is not active.", handle->tid());

last_commit_timestamp_ = pending_commit_timestamp_;
Expand Down
9 changes: 5 additions & 4 deletions backend/locking/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class LockManager {
// Returns a handle for a single transaction with the given id and priority.
// Subsequent communication between the transaction and the lock manager
// happens via the handle. See LockHandle methods for more details.
std::unique_ptr<LockHandle> CreateHandle(TransactionID id,
TransactionPriority priority);
std::unique_ptr<LockHandle> CreateHandle(
TransactionID id, const std::function<absl::Status()>& abort_fn,
TransactionPriority priority);

// Returns the timestamp at which last schema update or commit completed.
absl::Time LastCommitTimestamp();
Expand All @@ -70,8 +71,8 @@ class LockManager {
// Mutex to guard state below.
absl::Mutex mu_;

// The currently active transaction ID (only one transaction can be active).
TransactionID active_tid_ ABSL_GUARDED_BY(mu_) = kInvalidTransactionID;
// The currently active transaction (only one transaction can be active).
LockHandle* active_handle_ ABSL_GUARDED_BY(mu_) = nullptr;

// System wide monotonic clock used to provide commit and read timestamps.
Clock* clock_;
Expand Down
Loading

0 comments on commit c25ad94

Please sign in to comment.