Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base event migration on minimum in-progress transaction id #406

Merged
merged 4 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions db/sequent_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
t.text "event_json", :null => false
t.integer "command_record_id", :null => false
t.integer "stream_record_id", :null => false
t.bigint "xact_id"
end

execute %Q{
ALTER TABLE event_records ALTER COLUMN xact_id SET DEFAULT pg_current_xact_id()::text::bigint
}
execute %Q{
CREATE UNIQUE INDEX unique_event_per_aggregate ON event_records (
aggregate_id,
Expand All @@ -24,6 +28,7 @@
add_index "event_records", ["command_record_id"], :name => "index_event_records_on_command_record_id"
add_index "event_records", ["event_type"], :name => "index_event_records_on_event_type"
add_index "event_records", ["created_at"], :name => "index_event_records_on_created_at"
add_index "event_records", ["xact_id"], :name => "index_event_records_on_xact_id"

create_table "command_records", :force => true do |t|
t.string "user_id"
Expand Down
7 changes: 3 additions & 4 deletions docs/docs/concepts/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Autoregistered classes will be appended to any already manually registered `comm
Sequent detects duplicates it will currently fail.
When setting `enable_autoregistration` to `true` in your `initializer`
any [CommandHandlers](command-handler.html), [Projectors](projector.html) and [Workflows](workflow.html) are
automatically registered in your Sequent configuration.
automatically registered in your Sequent configuration.
When you have base classes that you don't want to have automatically registered you can
set `self.abstract_class = true` for these classes. Another option to skip autoregistration is to set
`self.skip_autoregister` to `true`.
Expand Down Expand Up @@ -135,9 +135,8 @@ For the latest configuration possibilities please check the `Sequent::Configurat
| number_of_replay_processes | The [number of process](#number_of_replay_processes) used while offline migration | `4` |
| offline_replay_persistor_class | The class used to persist the `Projector`s during the offline migration part. | `Sequent::Core::Persistors::ActiveRecordPersistor` |
| online_replay_persistor_class | The class used to persist the `Projector`s. | `Sequent::Core::Persistors::ActiveRecordPersistor` |
| primary_database_key | A symbol indicating the primary database if multiple databases are specified within the provided db_config | `:primary` |
| primary_database_key | A symbol indicating the primary database if multiple databases are specified within the provided db_config | `:primary` |
| primary_database_role | A symbol indicating the primary database role if using multiple databases with active record | `:writing` |
| replayed_ids_table_name | The name of the table in which Sequent keeps track of which events are already replayed during a [migration](migrations.html) | `'sequent_replayed_ids'` |
| snapshot_event_class | The event class marking something as a [Snapshot event](snapshotting.html) | `Sequent::Core::SnapshotEvent` |
| stream_record_class | The [class](event_store.html) mapped to the `stream_records` table | `Sequent::Core::StreamRecord` |
| strict_check_attributes_on_apply_events | Whether or not sequent should fail on calling `apply` with invalid attributes. | `false`. Will be enabled by default in the next major release. |
Expand All @@ -146,4 +145,4 @@ For the latest configuration possibilities please check the `Sequent::Configurat
| uuid_generator | The UUID Generator used. Mainly useful for testing | `Sequent::Core::RandomUuidGenerator` |
| versions_table_name | The name of the table in which Sequent checks which [migration version](migrations.html) is currently active | `'sequent_versions'` |
| view_schema_name | The name of the view_schema in which the projections are created. | `'view_schema'` |
| enable_autoregistration | Enable autoregistration. This will autoregister `Sequent::CommandHandler`s, `Sequent::Projector`s and `Sequent::Workflow`s | `false` |
| enable_autoregistration | Enable autoregistration. This will autoregister `Sequent::CommandHandler`s, `Sequent::Projector`s and `Sequent::Workflow`s | `false` |
12 changes: 1 addition & 11 deletions lib/sequent/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
module Sequent
class Configuration
DEFAULT_VERSIONS_TABLE_NAME = 'sequent_versions'
DEFAULT_REPLAYED_IDS_TABLE_NAME = 'sequent_replayed_ids'

DEFAULT_MIGRATION_SQL_FILES_DIRECTORY = 'db/tables'
DEFAULT_DATABASE_CONFIG_DIRECTORY = 'db'
Expand Down Expand Up @@ -68,8 +67,7 @@ class Configuration
:enable_autoregistration

attr_reader :migrations_class_name,
:versions_table_name,
:replayed_ids_table_name
:versions_table_name

def self.instance
@instance ||= new
Expand Down Expand Up @@ -104,7 +102,6 @@ def initialize
self.event_publisher = Sequent::Core::EventPublisher.new
self.disable_event_handlers = false
self.versions_table_name = DEFAULT_VERSIONS_TABLE_NAME
self.replayed_ids_table_name = DEFAULT_REPLAYED_IDS_TABLE_NAME
self.migration_sql_files_directory = DEFAULT_MIGRATION_SQL_FILES_DIRECTORY
self.view_schema_name = DEFAULT_VIEW_SCHEMA_NAME
self.event_store_schema_name = DEFAULT_EVENT_STORE_SCHEMA_NAME
Expand Down Expand Up @@ -135,13 +132,6 @@ def can_use_multiple_databases?
enable_multiple_database_support && ActiveRecord.version > Gem::Version.new('6.1.0')
end

def replayed_ids_table_name=(table_name)
fail ArgumentError, 'table_name can not be nil' unless table_name

@replayed_ids_table_name = table_name
Sequent::Migrations::ReplayedIds.table_name = table_name
end

def versions_table_name=(table_name)
fail ArgumentError, 'table_name can not be nil' unless table_name

Expand Down
1 change: 1 addition & 0 deletions lib/sequent/core/event_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class EventRecord < Sequent::ApplicationRecord
include SerializesEvent

self.table_name = 'event_records'
self.ignored_columns = %w[xact_id]

belongs_to :stream_record
belongs_to :command_record
Expand Down
13 changes: 0 additions & 13 deletions lib/sequent/migrations/replayed_ids.rb

This file was deleted.

13 changes: 11 additions & 2 deletions lib/sequent/migrations/versions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def self.migration_sql
CREATE TABLE IF NOT EXISTS #{table_name} (version integer NOT NULL, CONSTRAINT version_pk PRIMARY KEY(version));
ALTER TABLE #{table_name} drop constraint if exists only_one_running;
ALTER TABLE #{table_name} ADD COLUMN IF NOT EXISTS status INTEGER DEFAULT NULL CONSTRAINT only_one_running CHECK (status in (1,2,3));
ALTER TABLE #{table_name} ADD COLUMN IF NOT EXISTS xmin_xact_id BIGINT;
DROP INDEX IF EXISTS single_migration_running;
CREATE UNIQUE INDEX single_migration_running ON #{table_name} ((status * 0)) where status is not null;
SQL
Expand All @@ -33,11 +34,15 @@ def self.version_currently_migrating
end

def self.latest_version
order('version desc').limit(1).first&.version
latest&.version
end

def self.latest
order('version desc').limit(1).first
end

def self.start_online!(new_version)
create!(version: new_version, status: MIGRATE_ONLINE_RUNNING)
create!(version: new_version, status: MIGRATE_ONLINE_RUNNING, xmin_xact_id: current_snapshot_xmin_xact_id)
rescue ActiveRecord::RecordNotUnique
raise ConcurrentMigration, "Migration for version #{new_version} is already running"
end
Expand Down Expand Up @@ -68,6 +73,10 @@ def self.start_offline!(new_version)
def self.end_offline!(new_version)
find_by!(version: new_version, status: MIGRATE_OFFLINE_RUNNING).update(status: DONE)
end

def self.current_snapshot_xmin_xact_id
connection.execute('SELECT pg_snapshot_xmin(pg_current_snapshot())::text::bigint AS xmin').first['xmin']
end
end
end
end
79 changes: 47 additions & 32 deletions lib/sequent/migrations/view_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
require_relative 'executor'
require_relative 'sql'
require_relative 'versions'
require_relative 'replayed_ids'

module Sequent
module Migrations
Expand Down Expand Up @@ -200,13 +199,16 @@ def migrate_online
in_view_schema do
Versions.start_online!(Sequent.new_version)

truncate_replay_ids_table!

drop_old_tables(Sequent.new_version)
executor.execute_online(plan)
end

replay!(Sequent.configuration.online_replay_persistor_class.new) if plan.projectors.any?
if plan.projectors.any?
replay!(
Sequent.configuration.online_replay_persistor_class.new,
maximum_xact_id_exclusive: Versions.running.first.xmin_xact_id,
)
end

in_view_schema do
executor.create_indexes_after_execute_online(plan)
Expand Down Expand Up @@ -238,7 +240,7 @@ def migrate_online
# 2.1 Rename current tables with the +current version+ as SUFFIX
# 2.2 Rename the new tables and remove the +new version+ suffix
# 2.3 Add the new version in the +Versions+ table
# 3. Performs cleanup of replayed event ids
# 3. Update the versions table to complete the migration
#
# If anything fails an exception is raised and everything is rolled back
#
Expand All @@ -257,7 +259,7 @@ def migrate_offline
if plan.projectors.any?
replay!(
Sequent.configuration.offline_replay_persistor_class.new,
exclude_ids: true,
minimum_xact_id_inclusive: Versions.running.first.xmin_xact_id,
group_exponent: 1,
)
end
Expand All @@ -269,9 +271,6 @@ def migrate_offline
# 2.3 Create migration record
Versions.end_offline!(Sequent.new_version)
end

# 3. Truncate replayed ids
truncate_replay_ids_table!
end
logger.info "Migrated to version #{Sequent.new_version}"
rescue ConcurrentMigration
Expand All @@ -292,7 +291,7 @@ def ensure_valid_plan!
def migrate_metadata_tables
Sequent::ApplicationRecord.transaction do
in_view_schema do
exec_sql([ReplayedIds.migration_sql, Versions.migration_sql].join("\n"))
exec_sql([Versions.migration_sql].join("\n"))
lvonk marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
Expand All @@ -307,7 +306,13 @@ def ensure_version_correct!
end
end

def replay!(replay_persistor, projectors: plan.projectors, exclude_ids: false, group_exponent: 3)
def replay!(
replay_persistor,
projectors: plan.projectors,
minimum_xact_id_inclusive: nil,
maximum_xact_id_exclusive: nil,
group_exponent: 3
)
logger.info "group_exponent: #{group_exponent.inspect}"

with_sequent_config(replay_persistor, projectors) do
Expand All @@ -331,7 +336,14 @@ def replay!(replay_persistor, projectors: plan.projectors, exclude_ids: false, g
Group (#{aggregate_prefixes.first}-#{aggregate_prefixes.last}) #{index + 1}/#{number_of_groups} replayed
EOS
time(msg) do
replay_events(aggregate_prefixes, event_types, exclude_ids, replay_persistor, &insert_ids)
replay_events(
aggregate_prefixes,
event_types,
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
replay_persistor,
&on_progress
)
end
nil
rescue StandardError => e
Expand All @@ -346,10 +358,19 @@ def replay!(replay_persistor, projectors: plan.projectors, exclude_ids: false, g
end
end

def replay_events(aggregate_prefixes, event_types, exclude_already_replayed, replay_persistor, &on_progress)
def replay_events(
aggregate_prefixes,
event_types,
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
replay_persistor,
&on_progress
)
Sequent.configuration.event_store.replay_events_from_cursor(
block_size: 1000,
get_events: -> { event_stream(aggregate_prefixes, event_types, exclude_already_replayed) },
get_events: -> {
event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive)
},
on_progress: on_progress,
)

Expand All @@ -364,15 +385,10 @@ def rollback_migration
establish_connection
drop_old_tables(Sequent.new_version)

truncate_replay_ids_table!
executor.reset_table_names(plan)
Versions.rollback!(Sequent.new_version)
end

def truncate_replay_ids_table!
exec_sql("truncate table #{ReplayedIds.table_name}")
end

def groups_of_aggregate_id_prefixes(number_of_groups)
all_prefixes = (0...16**LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE).to_a.map do |i|
i.to_s(16)
Expand Down Expand Up @@ -405,15 +421,8 @@ def drop_old_tables(new_version)
end
end

def insert_ids
def on_progress
->(progress, done, ids) do
unless ids.empty?
exec_sql(
"insert into #{ReplayedIds.table_name} (event_id) values #{ids.map do |id|
"(#{id})"
end.join(',')}",
)
end
Sequent::Core::EventStore::PRINT_PROGRESS[progress, done, ids] if progress > 0
end
end
Expand All @@ -436,18 +445,24 @@ def with_sequent_config(replay_persistor, projectors, &block)
Sequent::Configuration.restore(old_config)
end

def event_stream(aggregate_prefixes, event_types, exclude_already_replayed)
def event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive)
fail ArgumentError, 'aggregate_prefixes is mandatory' unless aggregate_prefixes.present?

event_stream = Sequent.configuration.event_record_class.where(event_type: event_types)
event_stream = event_stream.where(<<~SQL, aggregate_prefixes)
substring(aggregate_id::text from 1 for #{LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE}) in (?)
SQL
if exclude_already_replayed
event_stream = event_stream
.where("NOT EXISTS (SELECT 1 FROM #{ReplayedIds.table_name} WHERE event_id = event_records.id)")
if minimum_xact_id_inclusive && maximum_xact_id_exclusive
event_stream = event_stream.where(
'xact_id >= ? AND xact_id < ?',
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
)
elsif minimum_xact_id_inclusive
event_stream = event_stream.where('xact_id >= ?', minimum_xact_id_inclusive)
elsif maximum_xact_id_exclusive
event_stream = event_stream.where('xact_id IS NULL OR xact_id < ?', maximum_xact_id_exclusive)
end
event_stream = event_stream.where('event_records.created_at > ?', 1.day.ago) if exclude_already_replayed
event_stream
.order('aggregate_id ASC, sequence_number ASC')
.select('id, event_type, event_json, sequence_number')
Expand Down
38 changes: 38 additions & 0 deletions lib/sequent/rake/migration_tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,44 @@ def register_tasks!
end
end

desc <<-EOS
Shows the current status of the migrations
EOS
task status: ['sequent:init', :init] do
ensure_sequent_env_set!
db_config = Sequent::Support::Database.read_config(@env)
view_schema = Sequent::Migrations::ViewSchema.new(db_config: db_config)

latest_done_version = Sequent::Migrations::Versions.done.latest
latest_version = Sequent::Migrations::Versions.latest
pending_version = Sequent.new_version
case latest_version.status
when Sequent::Migrations::Versions::DONE
if pending_version == latest_version.version
puts "Current version #{latest_version.version}, no pending changes"
else
puts "Current version #{latest_version.version}, pending version #{pending_version}"
end
when Sequent::Migrations::Versions::MIGRATE_ONLINE_RUNNING
puts "Online migration from #{latest_done_version.version} to #{latest_version.version} is running"
when Sequent::Migrations::Versions::MIGRATE_ONLINE_FINISHED
projectors = view_schema.plan.projectors
event_types = projectors.flat_map { |projector| projector.message_mapping.keys }.uniq.map(&:name)

current_snapshot_xmin_xact_id = Sequent::Migrations::Versions.current_snapshot_xmin_xact_id
pending_events = Sequent.configuration.event_record_class
.where(event_type: event_types)
.where('xact_id >= ?', current_snapshot_xmin_xact_id)
.count
print <<~EOS
Online migration from #{latest_done_version.version} to #{latest_version.version} is finished.
#{current_snapshot_xmin_xact_id - latest_version.xmin_xact_id} transactions behind current state (#{pending_events} pending events).
EOS
when Sequent::Migrations::Versions::MIGRATE_OFFLINE_RUNNING
puts "Offline migration from #{latest_done_version.version} to #{latest_version.version} is running"
end
end

desc <<~EOS
Migrates the Projectors while the app is running. Call +sequent:migrate:offline+ after this successfully completed.
EOS
Expand Down
Loading
Loading