Skip to content

Commit

Permalink
Merge pull request #195 from GoogleCloudPlatform/5789B3FF9E750C15D6B7…
Browse files Browse the repository at this point in the history
…4524C048BD98

Project import generated by Copybara.
  • Loading branch information
olavloite authored Nov 29, 2024
2 parents 7f22a69 + df84e0b commit 0084b2c
Show file tree
Hide file tree
Showing 97 changed files with 5,565 additions and 269 deletions.
1 change: 1 addition & 0 deletions backend/actions/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ cc_test(
":ops",
"//tests/common:actions",
"//tests/common:proto_matchers",
"//tests/common:scoped_feature_flags_setter",
"//tests/common:test_schema_constructor",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/memory",
Expand Down
7 changes: 7 additions & 0 deletions backend/actions/foreign_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "backend/actions/foreign_key.h"

#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "backend/actions/action.h"
Expand All @@ -38,6 +39,9 @@ ForeignKeyReferencingVerifier::ForeignKeyReferencingVerifier(

absl::Status ForeignKeyReferencingVerifier::Verify(const ActionContext* ctx,
const InsertOp& op) const {
if (!foreign_key_->enforced()) {
return absl::OkStatus();
}
// Check that the corresponding row exists in the referenced index. Exclude
// any extra columns from the primary key that are not used by the foreign
// key.
Expand All @@ -62,6 +66,9 @@ ForeignKeyReferencedVerifier::ForeignKeyReferencedVerifier(

absl::Status ForeignKeyReferencedVerifier::Verify(const ActionContext* ctx,
const DeleteOp& op) const {
if (!foreign_key_->enforced()) {
return absl::OkStatus();
}
// Check that the corresponding row does not exist in the referencing index.
// Exclude any extra columns from the primary key that are not used by the
// foreign key.
Expand Down
31 changes: 29 additions & 2 deletions backend/actions/foreign_key_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "backend/actions/ops.h"
#include "tests/common/actions.h"
#include "tests/common/schema_constructor.h"
#include "tests/common/scoped_feature_flags_setter.h"

namespace google {
namespace spanner {
Expand All @@ -42,7 +43,10 @@ using zetasql_base::testing::StatusIs;
class ForeignKeyTest : public test::ActionsTest {
public:
ForeignKeyTest()
: schema_(emulator::test::CreateSchemaFromDDL({R"(
: flag_setter_({
.enable_fk_enforcement_option = true,
}),
schema_(emulator::test::CreateSchemaFromDDL({R"(
CREATE TABLE T (
A INT64,
B INT64,
Expand All @@ -55,11 +59,13 @@ class ForeignKeyTest : public test::ActionsTest {
Y INT64,
Z INT64,
CONSTRAINT C FOREIGN KEY (Y, Z) REFERENCES T (B, C),
CONSTRAINT D FOREIGN KEY (Y, Z) REFERENCES T (B, C) NOT ENFORCED,
) PRIMARY KEY(X)
)"},
&type_factory_)
.value()),
foreign_key_(schema_->FindTable("U")->FindForeignKey("C")),
unenforced_foreign_key_(schema_->FindTable("U")->FindForeignKey("D")),
referencing_data_(
foreign_key_->referencing_index()->index_data_table()),
referencing_columns_(referencing_data_->columns()),
Expand All @@ -68,21 +74,32 @@ class ForeignKeyTest : public test::ActionsTest {
referencing_verifier_(
std::make_unique<ForeignKeyReferencingVerifier>(foreign_key_)),
referenced_verifier_(
std::make_unique<ForeignKeyReferencedVerifier>(foreign_key_)) {}
std::make_unique<ForeignKeyReferencedVerifier>(foreign_key_)),
unenforced_referencing_verifier_(
std::make_unique<ForeignKeyReferencingVerifier>(
unenforced_foreign_key_)),
unenforced_referenced_verifier_(
std::make_unique<ForeignKeyReferencedVerifier>(
unenforced_foreign_key_)) {}

protected:
// Test components.
const ::google::spanner::emulator::test::ScopedEmulatorFeatureFlagsSetter
flag_setter_;
zetasql::TypeFactory type_factory_;
std::unique_ptr<const Schema> schema_;

// Test variables.
const ForeignKey* foreign_key_;
const ForeignKey* unenforced_foreign_key_;
const Table* referencing_data_;
absl::Span<const Column* const> referencing_columns_;
const Table* referenced_data_;
absl::Span<const Column* const> referenced_columns_;
std::unique_ptr<Verifier> referencing_verifier_;
std::unique_ptr<Verifier> referenced_verifier_;
std::unique_ptr<Verifier> unenforced_referencing_verifier_;
std::unique_ptr<Verifier> unenforced_referenced_verifier_;
};

TEST_F(ForeignKeyTest, InsertReferencingRowWithReferencedRow) {
Expand All @@ -97,6 +114,9 @@ TEST_F(ForeignKeyTest, InsertReferencingRowWithReferencedRow) {
ZETASQL_EXPECT_OK(referencing_verifier_->Verify(
ctx(), Insert(referencing_data_, Key({Int64(1), Int64(2), Int64(4)}),
referencing_columns_, {Int64(1), Int64(2), Int64(4)})));
ZETASQL_EXPECT_OK(unenforced_referencing_verifier_->Verify(
ctx(), Insert(referencing_data_, Key({Int64(1), Int64(2), Int64(4)}),
referencing_columns_, {Int64(1), Int64(2), Int64(4)})));
}

TEST_F(ForeignKeyTest, InsertReferencingRowWithoutReferencedRow) {
Expand All @@ -107,6 +127,9 @@ TEST_F(ForeignKeyTest, InsertReferencingRowWithoutReferencedRow) {
ctx(), Insert(referencing_data_, Key({Int64(1), Int64(2), Int64(3)}),
referencing_columns_, {Int64(1), Int64(2), Int64(3)})),
StatusIs(absl::StatusCode::kFailedPrecondition));
ZETASQL_EXPECT_OK(unenforced_referencing_verifier_->Verify(
ctx(), Insert(referencing_data_, Key({Int64(1), Int64(2), Int64(3)}),
referencing_columns_, {Int64(1), Int64(2), Int64(3)})));
}

TEST_F(ForeignKeyTest, DeleteReferencedRowWithReferencingRow) {
Expand All @@ -122,6 +145,8 @@ TEST_F(ForeignKeyTest, DeleteReferencedRowWithReferencingRow) {
referenced_verifier_->Verify(
ctx(), Delete(referenced_data_, Key({Int64(1), Int64(2), Int64(3)}))),
StatusIs(absl::StatusCode::kFailedPrecondition));
ZETASQL_EXPECT_OK(unenforced_referenced_verifier_->Verify(
ctx(), Delete(referenced_data_, Key({Int64(1), Int64(2), Int64(3)}))));
}

TEST_F(ForeignKeyTest, DeleteReferencedRowWithoutReferencingRow) {
Expand All @@ -140,6 +165,8 @@ TEST_F(ForeignKeyTest, DeleteReferencedRowWithoutReferencingRow) {
// should succeed.
ZETASQL_EXPECT_OK(referenced_verifier_->Verify(
ctx(), Delete(referenced_data_, Key({Int64(4), Int64(5), Int64(6)}))));
ZETASQL_EXPECT_OK(unenforced_referenced_verifier_->Verify(
ctx(), Delete(referenced_data_, Key({Int64(4), Int64(5), Int64(6)}))));
}

} // namespace
Expand Down
11 changes: 11 additions & 0 deletions backend/actions/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,23 @@ void ActionRegistry::BuildActionRegistry() {

// Actions for foreign keys.
for (const ForeignKey* foreign_key : table->foreign_keys()) {
if (!foreign_key->enforced()) {
// Not enforced foreign keys doesn't verify referential integrity on
// data.
continue;
}
table_verifiers_[foreign_key->referencing_data_table()].emplace_back(
std::make_unique<ForeignKeyReferencingVerifier>(foreign_key));
}
for (const ForeignKey* foreign_key : table->referencing_foreign_keys()) {
if (!foreign_key->enforced()) {
// Not enforced foreign keys has no actions, and doesn't verify
// referential integrity on data.
continue;
}
table_verifiers_[foreign_key->referenced_data_table()].emplace_back(
std::make_unique<ForeignKeyReferencedVerifier>(foreign_key));

if (foreign_key->on_delete_action() == ForeignKey::Action::kCascade) {
table_effectors_[table].emplace_back(
std::make_unique<ForeignKeyActionEffector>(foreign_key));
Expand Down
6 changes: 2 additions & 4 deletions backend/database/change_stream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,20 @@ cc_library(
deps = [
"//backend/access:read",
"//backend/access:write",
"//backend/actions:manager",
"//backend/common:ids",
"//backend/datamodel:key_set",
"//backend/locking:manager",
"//backend/schema/backfills:schema_backfillers",
"//backend/schema/catalog:schema",
"//backend/schema/catalog:versioned_catalog",
"//backend/transaction:read_write_transaction",
"//common:change_stream",
"//common:clock",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/log",
"@com_google_absl//absl/random",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
Expand Down
22 changes: 22 additions & 0 deletions backend/database/change_stream/change_stream_partition_churner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "backend/transaction/options.h"
#include "backend/transaction/read_write_transaction.h"
#include "common/change_stream.h"
#include "common/clock.h"
#include "zetasql/base/ret_check.h"
#include "zetasql/base/status_macros.h"

Expand All @@ -63,6 +64,11 @@ ABSL_FLAG(
ABSL_FLAG(bool, enable_change_stream_churning, true,
"Whether to enable change stream churning.");

ABSL_FLAG(
int, override_change_stream_partition_token_alive_seconds, -1,
"If set to X seconds, and it's greater than 0, then override the default "
"partition token alive time from 20-40 seconds to X-2X seconds.");

namespace google {
namespace spanner {
namespace emulator {
Expand All @@ -71,6 +77,22 @@ namespace backend {
using zetasql::values::String;
using zetasql::values::StringArray;

ChangeStreamPartitionChurner::ChangeStreamPartitionChurner(
CreateReadWriteTransactionFn create_read_write_transaction_fn, Clock* clock)
: create_read_write_transaction_fn_(create_read_write_transaction_fn),
clock_(clock) {
int oerridden_partition_token_alive_seconds =
absl::GetFlag(FLAGS_override_change_stream_partition_token_alive_seconds);
if (oerridden_partition_token_alive_seconds > 0) {
absl::SetFlag(&FLAGS_change_stream_churning_interval,
absl::Seconds(oerridden_partition_token_alive_seconds));
absl::SetFlag(&FLAGS_change_stream_churn_thread_sleep_interval,
absl::Seconds(oerridden_partition_token_alive_seconds));
absl::SetFlag(&FLAGS_change_stream_churn_thread_retry_sleep_interval,
absl::Seconds(oerridden_partition_token_alive_seconds));
}
}

void ChangeStreamPartitionChurner::CreateChurningThread(
absl::string_view change_stream_name) {
mu_.AssertHeld();
Expand Down
29 changes: 21 additions & 8 deletions backend/database/change_stream/change_stream_partition_churner.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@
#ifndef THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_DATABASE_CHANGE_STREAM_CHANGE_STREAM_PARTITION_CHURNER_H_
#define THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_DATABASE_CHANGE_STREAM_CHANGE_STREAM_PARTITION_CHURNER_H_

#include <functional>
#include <memory>
#include <string>
#include <thread> // NOLINT
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/random/random.h"
#include "backend/actions/manager.h"
#include "backend/common/ids.h"
#include "backend/locking/manager.h"
#include "backend/schema/catalog/versioned_catalog.h"
#include "absl/container/flat_hash_map.h"
#include "absl/flags/declare.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "backend/schema/catalog/schema.h"
#include "backend/transaction/options.h"
#include "backend/transaction/read_write_transaction.h"
#include "common/clock.h"

// How often to terminate currently active change stream partitions.
ABSL_DECLARE_FLAG(absl::Duration, change_stream_churning_interval);
Expand All @@ -44,6 +54,11 @@ ABSL_DECLARE_FLAG(int, change_stream_churn_thread_retry_jitter);
// Whether the change stream churning should be enabled.
ABSL_DECLARE_FLAG(bool, enable_change_stream_churning);

// If set to X seconds, and it's greater than 0, then
// override the default partition token alive seconds from 20-40 seconds to X-2X
// seconds.
ABSL_DECLARE_FLAG(int, override_change_stream_partition_token_alive_seconds);

namespace google {
namespace spanner {
namespace emulator {
Expand Down Expand Up @@ -72,9 +87,7 @@ class ChangeStreamPartitionChurner {

ChangeStreamPartitionChurner(
CreateReadWriteTransactionFn create_read_write_transaction_fn,
Clock* clock)
: create_read_write_transaction_fn_(create_read_write_transaction_fn),
clock_(clock) {}
Clock* clock);

~ChangeStreamPartitionChurner() { ClearAllChurningThreads(); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ class ChangeStreamPartitionChurnerTest : public ::testing::Test {
};

void SetUp() override {
absl::SetFlag(&FLAGS_change_stream_churning_interval, absl::Seconds(1));
absl::SetFlag(&FLAGS_change_stream_churn_thread_sleep_interval,
absl::Seconds(1));
absl::SetFlag(&FLAGS_override_change_stream_partition_token_alive_seconds,
1);
std::vector<std::string> create_statements = {R"(
CREATE TABLE T(
k1 INT64,
Expand Down
11 changes: 10 additions & 1 deletion backend/query/catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,14 @@ absl::Status Catalog::GetFunctions(
return absl::OkStatus();
}

absl::Status Catalog::GetTableValuedFunctions(
absl::flat_hash_set<const zetasql::TableValuedFunction*>* output) const {
for (const auto& [unused_name, function] : tvfs_) {
output->insert(function.get());
}
return absl::OkStatus();
}

zetasql::Catalog* Catalog::GetInformationSchemaCatalog() const {
absl::MutexLock lock(&mu_);
auto spanner_sys_catalog = GetSpannerSysCatalogWithoutLocks();
Expand Down Expand Up @@ -532,7 +540,8 @@ zetasql::Catalog* Catalog::GetPGCatalog() const {
absl::MutexLock lock(&mu_);
if (schema_->dialect() == database_api::DatabaseDialect::POSTGRESQL) {
if (!pg_catalog_) {
pg_catalog_ = std::make_unique<postgres_translator::PGCatalog>(schema_);
pg_catalog_ =
std::make_unique<postgres_translator::PGCatalog>(this, schema_);
}
return pg_catalog_.get();
}
Expand Down
3 changes: 3 additions & 0 deletions backend/query/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class Catalog : public zetasql::EnumerableCatalog {
absl::flat_hash_set<const zetasql::Type*>* output) const final;
absl::Status GetFunctions(
absl::flat_hash_set<const zetasql::Function*>* output) const final;
absl::Status GetTableValuedFunctions(
absl::flat_hash_set<const zetasql::TableValuedFunction*>* output)
const final;

absl::Status GetType(const std::string& name, const zetasql::Type** type,
const FindOptions& options) final;
Expand Down
Loading

0 comments on commit 0084b2c

Please sign in to comment.