diff --git a/core/services/pg/channels.go b/core/services/pg/channels.go index 93213a2be19..736cd407962 100644 --- a/core/services/pg/channels.go +++ b/core/services/pg/channels.go @@ -4,5 +4,5 @@ package pg const ( ChannelInsertOnTx = "evm.insert_on_txes" ChannelInsertOnCosmosMsg = "insert_on_cosmos_msg" - ChannelInsertOnEVMLogs = "insert_on_evm_logs" + ChannelInsertOnEVMLogs = "evm.insert_on_logs" ) diff --git a/core/services/relay/evm/mercury/helpers_test.go b/core/services/relay/evm/mercury/helpers_test.go index 4a7b8f70d75..4e3587b5de6 100644 --- a/core/services/relay/evm/mercury/helpers_test.go +++ b/core/services/relay/evm/mercury/helpers_test.go @@ -175,7 +175,7 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness { require.NoError(t, lp.Start(ctx)) t.Cleanup(func() { lp.Close() }) - eventBroadcaster.On("Subscribe", "insert_on_evm_logs", "").Return(subscription, nil) + eventBroadcaster.On("Subscribe", "evm.insert_on_logs", "").Return(subscription, nil) configPoller, err := NewConfigPoller(lggr, lp, verifierAddress, feedID, eventBroadcaster) require.NoError(t, err) diff --git a/core/store/migrate/migrations/0194_evm_schema.sql b/core/store/migrate/migrations/0194_evm_schema.sql index 76fc7af1807..fc55028c16e 100644 --- a/core/store/migrate/migrations/0194_evm_schema.sql +++ b/core/store/migrate/migrations/0194_evm_schema.sql @@ -17,8 +17,7 @@ ALTER TABLE evm.evm_log_poller_blocks RENAME TO log_poller_blocks; ALTER TABLE public.evm_log_poller_filters SET SCHEMA evm; ALTER TABLE evm.evm_log_poller_filters RENAME TO log_poller_filters; -ALTER TABLE public.evm_logs SET SCHEMA evm; -ALTER TABLE evm.evm_logs RENAME TO logs; + ALTER TABLE public.evm_upkeep_states SET SCHEMA evm; ALTER TABLE evm.evm_upkeep_states RENAME TO upkeep_states; @@ -29,8 +28,37 @@ ALTER TABLE evm.eth_receipts RENAME TO receipts; ALTER TABLE public.eth_tx_attempts SET SCHEMA evm; ALTER TABLE evm.eth_tx_attempts RENAME TO tx_attempts; --- Handle tx triggers +--------------------- +-- Handle log triggers +--------------------- +DROP TRIGGER IF EXISTS notify_insert_on_evm_logs_topics ON PUBLIC.evm_logs; +DROP FUNCTION IF EXISTS public.notifysavedlogtopics(); + +ALTER TABLE public.evm_logs SET SCHEMA evm; +ALTER TABLE evm.evm_logs RENAME TO logs; +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION evm.notifysavedlogtopics() RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM pg_notify( + 'evm.insert_on_logs'::text, + -- hex encoded address plus comma separated list of hex encoded topic values + -- e.g. "
:," + encode(NEW.address, 'hex') || ':' || array_to_string(array(SELECT encode(unnest(NEW.topics), 'hex')), ',') + ); + RETURN NULL; +END +$$; + +DROP TRIGGER IF EXISTS notify_insert_on_logs_topics ON evm.logs; +CREATE TRIGGER notify_insert_on_logs_topics AFTER INSERT ON evm.logs FOR EACH ROW EXECUTE PROCEDURE evm.notifysavedlogtopics(); +-- +goose StatementEnd + +--------------------- +-- Handle tx triggers +--------------------- DROP TRIGGER IF EXISTS notify_eth_tx_insertion on public.eth_txes; DROP FUNCTION IF EXISTS public.notifyethtxinsertion(); @@ -70,9 +98,6 @@ ALTER TABLE public.log_poller_blocks RENAME TO evm_log_poller_blocks; ALTER TABLE evm.log_poller_filters SET SCHEMA public; ALTER TABLE public.log_poller_filters RENAME TO evm_log_poller_filters; -ALTER TABLE evm.logs SET SCHEMA public; -ALTER TABLE public.logs RENAME TO evm_logs; - ALTER TABLE evm.upkeep_states SET SCHEMA public; ALTER table public.upkeep_states RENAME TO evm_upkeep_states; @@ -83,6 +108,39 @@ ALTER TABLE evm.tx_attempts SET SCHEMA public; ALTER TABLE public.tx_attempts RENAME TO eth_tx_attempts; +--------------------- +-- Handle log triggers +--------------------- + +DROP TRIGGER IF EXISTS notify_insert_on_logs_topics ON evm.logs; +DROP FUNCTION IF EXISTS evm.notifysavedlogtopics(); + +ALTER TABLE evm.logs SET SCHEMA public; +ALTER TABLE public.logs RENAME TO evm_logs; + +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION PUBLIC.notifysavedlogtopics() RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM pg_notify( + 'insert_on_evm_logs'::text, + -- hex encoded address plus comma separated list of hex encoded topic values + -- e.g. "
:," + encode(NEW.address, 'hex') || ':' || array_to_string(array(SELECT encode(unnest(NEW.topics), 'hex')), ',') + ); + RETURN NULL; +END +$$; + +DROP TRIGGER IF EXISTS notify_insert_on_evm_logs_topics ON PUBLIC.evm_logs; +CREATE TRIGGER notify_insert_on_evm_logs_topics AFTER INSERT ON PUBLIC.evm_logs FOR EACH ROW EXECUTE PROCEDURE PUBLIC.notifysavedlogtopics(); +-- +goose StatementEnd + +--------------------- +-- Handle tx triggers +--------------------- + DROP TRIGGER IF EXISTS notify_tx_insertion on evm.txes; DROP FUNCTION IF EXISTS evm.notifytxinsertion();