Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Migration framework #1768

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ add_subdirectory(etlng)
add_subdirectory(feed)
add_subdirectory(rpc)
add_subdirectory(web)
add_subdirectory(migration)
kuznetsss marked this conversation as resolved.
Show resolved Hide resolved
add_subdirectory(app)
add_subdirectory(main)
2 changes: 1 addition & 1 deletion src/app/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add_library(clio_app)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp WebHandlers.cpp)

target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc)
target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration)
10 changes: 10 additions & 0 deletions src/app/CliArgs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "app/CliArgs.hpp"

#include "migration/MigrationApplication.hpp"
#include "util/build/Build.hpp"

#include <boost/program_options/options_description.hpp>
Expand All @@ -45,6 +46,7 @@
("version,v", "print version and exit")
("conf,c", po::value<std::string>()->default_value(defaultConfigPath), "configuration file")
("ng-web-server,w", "Use ng-web-server")
("migrate", po::value<std::string>(),"start migration helper")
cindyyan317 marked this conversation as resolved.
Show resolved Hide resolved
;
// clang-format on
po::positional_options_description positional;
Expand All @@ -65,6 +67,14 @@
}

auto configPath = parsed["conf"].as<std::string>();

if (parsed.count("migrate") != 0u) {
auto const opt = parsed["migrate"].as<std::string>();
if (opt == "status")
return Action{Action::Migrate{std::move(configPath), MigrateSubCmd::status()}};

Check warning on line 74 in src/app/CliArgs.cpp

View check run for this annotation

Codecov / codecov/patch

src/app/CliArgs.cpp#L74

Added line #L74 was not covered by tests
return Action{Action::Migrate{std::move(configPath), MigrateSubCmd::migration(opt)}};
}

Check warning on line 76 in src/app/CliArgs.cpp

View check run for this annotation

Codecov / codecov/patch

src/app/CliArgs.cpp#L76

Added line #L76 was not covered by tests

return Action{Action::Run{.configPath = std::move(configPath), .useNgWebServer = parsed.count("ng-web-server") != 0}
};
}
Expand Down
12 changes: 10 additions & 2 deletions src/app/CliArgs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include "migration/MigrationApplication.hpp"
#include "util/OverloadSet.hpp"

#include <string>
Expand Down Expand Up @@ -52,13 +53,20 @@ class CliArgs {
int exitCode; ///< Exit code.
};

/** @brief Migration action. */
struct Migrate {
std::string configPath;
MigrateSubCmd subCmd;
};

/**
* @brief Construct an action from a Run.
*
* @param action Run action.
*/
template <typename ActionType>
requires std::is_same_v<ActionType, Run> or std::is_same_v<ActionType, Exit>
requires std::is_same_v<ActionType, Run> or std::is_same_v<ActionType, Exit> or
std::is_same_v<ActionType, Migrate>
explicit Action(ActionType&& action) : action_(std::forward<ActionType>(action))
{
}
Expand All @@ -78,7 +86,7 @@ class CliArgs {
}

private:
std::variant<Run, Exit> action_;
std::variant<Run, Exit, Migrate> action_;
};

/**
Expand Down
19 changes: 19 additions & 0 deletions src/data/BackendInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,16 @@ class BackendInterface {
boost::asio::yield_context yield
) const;

/**
* @brief Fetches the status of migrator by name.
*
* @param migratorName The name of the migrator
* @param yield The coroutine context
* @return The status of the migrator if found; nullopt otherwise
*/
virtual std::optional<std::string>
fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const = 0;

/**
* @brief Synchronously fetches the ledger range from DB.
*
Expand Down Expand Up @@ -673,6 +683,15 @@ class BackendInterface {
bool
finishWrites(std::uint32_t ledgerSequence);

/**
*@brief Mark the migration status of a migrator as Migrated in the database
cindyyan317 marked this conversation as resolved.
Show resolved Hide resolved
*
*@param migratorName The name of the migrator
*@param status The status to set
*/
virtual void
writeMigratorStatus(std::string const& migratorName, std::string const& status) = 0;

/**
* @return true if database is overwhelmed; false otherwise
*/
Expand Down
35 changes: 33 additions & 2 deletions src/data/CassandraBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <stdexcept>
#include <string>
#include <tuple>
#include <unordered_set>
#include <utility>
#include <vector>

Expand All @@ -72,13 +73,15 @@ class BasicCassandraBackend : public BackendInterface {

SettingsProviderType settingsProvider_;
Schema<SettingsProviderType> schema_;

std::atomic_uint32_t ledgerSequence_ = 0u;

protected:
Handle handle_;

// have to be mutable because BackendInterface constness :(
mutable ExecutionStrategyType executor_;

std::atomic_uint32_t ledgerSequence_ = 0u;

public:
/**
* @brief Create a new cassandra/scylla backend instance.
Expand Down Expand Up @@ -835,6 +838,26 @@ class BasicCassandraBackend : public BackendInterface {
return results;
}

std::optional<std::string>
fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const override
{
auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName));
if (not res) {
LOG(log_.error()) << "Could not fetch migrator status: " << res.error();
return {};
}

auto const& results = res.value();
if (not results) {
return {};
}

for (auto [statusString] : extract<std::string>(results))
kuznetsss marked this conversation as resolved.
Show resolved Hide resolved
return statusString;

return {};
}

void
doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
{
Expand Down Expand Up @@ -962,6 +985,14 @@ class BasicCassandraBackend : public BackendInterface {
// probably was used in PG to start a transaction or smth.
}

void
writeMigratorStatus(std::string const& migratorName, std::string const& status) override
{
executor_.writeSync(
schema_->insertMigratorStatus, data::cassandra::Text{migratorName}, data::cassandra::Text(status)
);
}

bool
isTooBusy() const override
{
Expand Down
12 changes: 12 additions & 0 deletions src/data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,15 @@ CREATE TABLE clio.nf_token_transactions (
```

The `nf_token_transactions` table serves as the NFT counterpart to `account_tx`, inspired by the same motivations and fulfilling a similar role within this context. It drives the `nft_history` API.

### migrator_status

```
CREATE TABLE clio.migrator_status (
migrator_name TEXT, # The name of the migrator
status TEXT, # The status of the migrator
PRIMARY KEY (migrator_name)
)
```

The `migrator_status` table stores the status of the migratior in this database. If a migrator's status is `migrated`, it means this database has finished data migration for this migrator.
34 changes: 34 additions & 0 deletions src/data/cassandra/Schema.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ class Schema {
qualifiedTableName(settingsProvider_.get(), "mp_token_holders")
));

statements.emplace_back(fmt::format(
R"(
CREATE TABLE IF NOT EXISTS {}
(
migrator_name TEXT,
status TEXT,
PRIMARY KEY (migrator_name)
)
)",
qualifiedTableName(settingsProvider_.get(), "migrator_status")
));

return statements;
}();

Expand Down Expand Up @@ -466,6 +478,17 @@ class Schema {
));
}();

PreparedStatement insertMigratorStatus = [this]() {
return handle_.get().prepare(fmt::format(
R"(
INSERT INTO {}
(migrator_name, status)
VALUES (?, ?)
)",
qualifiedTableName(settingsProvider_.get(), "migrator_status")
));
}();

//
// Select queries
//
Expand Down Expand Up @@ -768,6 +791,17 @@ class Schema {
qualifiedTableName(settingsProvider_.get(), "ledger_range")
));
}();

PreparedStatement selectMigratorStatus = [this]() {
return handle_.get().prepare(fmt::format(
R"(
SELECT status
FROM {}
WHERE migrator_name = ?
)",
qualifiedTableName(settingsProvider_.get(), "migrator_status")
));
}();
};

/**
Expand Down
22 changes: 22 additions & 0 deletions src/data/cassandra/Types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <cstdint>
#include <expected>
#include <string>
#include <utility>

namespace data::cassandra {

Expand Down Expand Up @@ -55,6 +57,26 @@ struct Limit {
int32_t limit;
};

/**
* @brief A strong type wrapper for string
*
* This is unfortunately needed right now to support TEXT properly
* because clio uses string to represent BLOB
* If we want to bind TEXT with string, we need to use this type
*/
struct Text {
std::string text;

/**
* @brief Construct a new Text object from string type
*
* @param text The text to wrap
*/
explicit Text(std::string text) : text{std::move(text)}
{
}
};

class Handle;
class CassandraError;

Expand Down
3 changes: 3 additions & 0 deletions src/data/cassandra/impl/Statement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class Statement : public ManagedObject<CassStatement> {
// reinterpret_cast is needed here :'(
auto const rc = bindBytes(reinterpret_cast<unsigned char const*>(value.data()), value.size());
throwErrorIfNeeded(rc, "Bind string (as bytes)");
} else if constexpr (std::is_convertible_v<DecayedType, Text>) {
auto const rc = cass_statement_bind_string(*this, idx, value.text.c_str());
cindyyan317 marked this conversation as resolved.
Show resolved Hide resolved
throwErrorIfNeeded(rc, "Bind string (as TEXT)");
} else if constexpr (std::is_same_v<DecayedType, UintTupleType> ||
std::is_same_v<DecayedType, UintByteTupleType>) {
auto const rc = cass_statement_bind_tuple(*this, idx, Tuple{std::forward<Type>(value)});
Expand Down
11 changes: 11 additions & 0 deletions src/main/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "app/CliArgs.hpp"
#include "app/ClioApplication.hpp"
#include "migration/MigrationApplication.hpp"
#include "rpc/common/impl/HandlerProvider.hpp"
#include "util/TerminationHandler.hpp"
#include "util/config/Config.hpp"
Expand Down Expand Up @@ -46,6 +47,16 @@
app::ClioApplication clio{config};

return clio.run(run.useNgWebServer);
},
[](app::CliArgs::Action::Migrate const& migrate) {

Check warning on line 51 in src/main/Main.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/Main.cpp#L50-L51

Added lines #L50 - L51 were not covered by tests
auto const config = util::ConfigReader::open(migrate.configPath);
cindyyan317 marked this conversation as resolved.
Show resolved Hide resolved
if (!config) {
std::cerr << "Couldnt parse migration config '" << migrate.configPath << "'." << std::endl;
return EXIT_FAILURE;

Check warning on line 55 in src/main/Main.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/Main.cpp#L55

Added line #L55 was not covered by tests
}
util::LogService::init(config);
app::MigratorApplication migrator{config, migrate.subCmd};
return migrator.run();
}
);
} catch (std::exception const& e) {
Expand Down
8 changes: 8 additions & 0 deletions src/migration/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
add_library(clio_migration)

target_sources(
clio_migration PRIVATE MigrationApplication.cpp impl/MigrationManagerFactory.cpp MigratorStatus.cpp
cassandra/impl/ObjectsAdapter.cpp cassandra/impl/TransactionsAdapter.cpp
)

target_link_libraries(clio_migration PRIVATE clio_util clio_etl)
Loading
Loading