From ef9f678d0b3a17b8f08df3f1ec5e9db163f90f40 Mon Sep 17 00:00:00 2001 From: Wout Slakhorst Date: Wed, 18 May 2022 12:05:47 +0200 Subject: [PATCH] send notification from DAG with AND without payload (#1112) (#1125) * send notification from DAG with AND without payload * separate observers for dag.STate * add integration test for tree updates for priate tx --- network/dag/interface.go | 14 +++-- network/dag/mock.go | 24 ++++++--- network/dag/publisher.go | 17 +++--- network/dag/state.go | 77 ++++++++++++++++++++------- network/dag/state_test.go | 37 +++++++++++-- network/network.go | 4 +- network/network_integration_test.go | 5 ++ network/transport/v2/protocol.go | 4 +- network/transport/v2/protocol_test.go | 6 +-- 9 files changed, 141 insertions(+), 47 deletions(-) diff --git a/network/dag/interface.go b/network/dag/interface.go index 2b95441c5f..49e490d081 100644 --- a/network/dag/interface.go +++ b/network/dag/interface.go @@ -43,7 +43,7 @@ type State interface { // Deprecated: remove with V1 protocol ReadManyPayloads(ctx context.Context, consumer func(context.Context, PayloadReader) error) error - // Add a transactions to the DAG. If it can't be added an error is returned. + // Add a transaction to the DAG. If it can't be added an error is returned. // If the transaction already exists, nothing is added and no observers are notified. // The payload may be passed as well. Allowing for better notification of observers Add(ctx context.Context, transactions Transaction, payload []byte) error @@ -61,9 +61,12 @@ type State interface { IsPresent(context.Context, hash.SHA256Hash) (bool, error) // PayloadHashes applies the visitor function to the payload hashes of all transactions, in random order. PayloadHashes(ctx context.Context, visitor func(payloadHash hash.SHA256Hash) error) error - // RegisterObserver allows observers to be notified when a transaction is added to the DAG. + // RegisterTransactionObserver allows observers to be notified when a transaction is added to the DAG. // If the observer needs to be called within the transaction, transactional must be true. - RegisterObserver(observer Observer, transactional bool) + RegisterTransactionObserver(observer Observer, transactional bool) + // RegisterPayloadObserver allows observers to be notified when a payload is written to the store. + // If the observer needs to be called within the transaction, transactional must be true. + RegisterPayloadObserver(observer PayloadObserver, transactional bool) // Subscribe lets an application subscribe to a specific type of transaction. When a new transaction is received // the `receiver` function is called. If an asterisk (`*`) is specified as `payloadType` the receiver is subscribed // to all payload types. @@ -159,7 +162,10 @@ type PayloadReader interface { } // Observer defines the signature of an observer which can be called by an Observable. -type Observer func(ctx context.Context, transaction Transaction, payload []byte) error +type Observer func(ctx context.Context, transaction Transaction) error + +// PayloadObserver defines the signature of an observer which can be called by an Observable. +type PayloadObserver func(ctx context.Context, transaction Transaction, payload []byte) error // MinTime returns the minimum value for time.Time func MinTime() time.Time { diff --git a/network/dag/mock.go b/network/dag/mock.go index 23bcc94c7f..12c7c55837 100644 --- a/network/dag/mock.go +++ b/network/dag/mock.go @@ -214,16 +214,28 @@ func (mr *MockStateMockRecorder) ReadPayload(ctx, payloadHash interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadPayload", reflect.TypeOf((*MockState)(nil).ReadPayload), ctx, payloadHash) } -// RegisterObserver mocks base method. -func (m *MockState) RegisterObserver(observer Observer, transactional bool) { +// RegisterPayloadObserver mocks base method. +func (m *MockState) RegisterPayloadObserver(observer PayloadObserver, transactional bool) { m.ctrl.T.Helper() - m.ctrl.Call(m, "RegisterObserver", observer, transactional) + m.ctrl.Call(m, "RegisterPayloadObserver", observer, transactional) } -// RegisterObserver indicates an expected call of RegisterObserver. -func (mr *MockStateMockRecorder) RegisterObserver(observer, transactional interface{}) *gomock.Call { +// RegisterPayloadObserver indicates an expected call of RegisterPayloadObserver. +func (mr *MockStateMockRecorder) RegisterPayloadObserver(observer, transactional interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterObserver", reflect.TypeOf((*MockState)(nil).RegisterObserver), observer, transactional) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterPayloadObserver", reflect.TypeOf((*MockState)(nil).RegisterPayloadObserver), observer, transactional) +} + +// RegisterTransactionObserver mocks base method. +func (m *MockState) RegisterTransactionObserver(observer Observer, transactional bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RegisterTransactionObserver", observer, transactional) +} + +// RegisterTransactionObserver indicates an expected call of RegisterTransactionObserver. +func (mr *MockStateMockRecorder) RegisterTransactionObserver(observer, transactional interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterTransactionObserver", reflect.TypeOf((*MockState)(nil).RegisterTransactionObserver), observer, transactional) } // Shutdown mocks base method. diff --git a/network/dag/publisher.go b/network/dag/publisher.go index ec8812c6a4..9764e2990d 100644 --- a/network/dag/publisher.go +++ b/network/dag/publisher.go @@ -53,17 +53,18 @@ type replayingDAGPublisher struct { func (s *replayingDAGPublisher) ConfigureCallbacks(state State) { // the publisher only signals the VDR, VCR and transaction state. These need to be called after the bbolt transaction is completed. - state.RegisterObserver(func(ctx context.Context, transaction Transaction, payload []byte) error { + state.RegisterTransactionObserver(func(ctx context.Context, transaction Transaction) error { s.publishMux.Lock() defer s.publishMux.Unlock() - if transaction != nil { - s.transactionAdded(ctx, transaction, payload) - } - if payload != nil { - s.payloadWritten(ctx, transaction, payload) - } - return nil + return s.transactionAdded(ctx, transaction, nil) + }, false) + + state.RegisterPayloadObserver(func(ctx context.Context, transaction Transaction, payload []byte) error { + s.publishMux.Lock() + defer s.publishMux.Unlock() + + return s.payloadWritten(ctx, transaction, payload) }, false) } diff --git a/network/dag/state.go b/network/dag/state.go index b3a13a2e25..f6f6deda20 100644 --- a/network/dag/state.go +++ b/network/dag/state.go @@ -48,16 +48,18 @@ const ( // State has references to the DAG and the payload store. type state struct { - db *bbolt.DB - graph *bboltDAG - payloadStore PayloadStore - transactionalObservers []Observer - nonTransactionalObservers []Observer - keyResolver types.KeyResolver - publisher Publisher - txVerifiers []Verifier - xorTree *bboltTree - ibltTree *bboltTree + db *bbolt.DB + graph *bboltDAG + payloadStore PayloadStore + transactionalObservers []Observer + nonTransactionalObservers []Observer + transactionalPayloadObservers []PayloadObserver + nonTransactionalPayloadObservers []PayloadObserver + keyResolver types.KeyResolver + publisher Publisher + txVerifiers []Verifier + xorTree *bboltTree + ibltTree *bboltTree } // NewState returns a new State. The State is used as entry point, it's methods will start transactions and will notify observers from within those transactions. @@ -88,23 +90,36 @@ func NewState(dataDir string, verifiers ...Verifier) (State, error) { newState.publisher = publisher xorTree := newBBoltTreeStore(db, "xorBucket", tree.New(tree.NewXor(), PageSize)) - newState.RegisterObserver(xorTree.dagObserver, true) newState.xorTree = xorTree ibltTree := newBBoltTreeStore(db, "ibltBucket", tree.New(tree.NewIblt(IbltNumBuckets), PageSize)) - newState.RegisterObserver(ibltTree.dagObserver, true) newState.ibltTree = ibltTree + newState.RegisterTransactionObserver(newState.treeObserver, true) return newState, nil } -func (s *state) RegisterObserver(observer Observer, transactional bool) { +func (s *state) RegisterTransactionObserver(observer Observer, transactional bool) { if transactional { s.transactionalObservers = append(s.transactionalObservers, observer) } else { s.nonTransactionalObservers = append(s.nonTransactionalObservers, observer) } +} + +func (s *state) RegisterPayloadObserver(observer PayloadObserver, transactional bool) { + if transactional { + s.transactionalPayloadObservers = append(s.transactionalPayloadObservers, observer) + } else { + s.nonTransactionalPayloadObservers = append(s.nonTransactionalPayloadObservers, observer) + } +} +func (s *state) treeObserver(ctx context.Context, transaction Transaction) error { + if err := s.ibltTree.dagObserver(ctx, transaction, nil); err != nil { + return err + } + return s.xorTree.dagObserver(ctx, transaction, nil) } func (s *state) Add(ctx context.Context, transaction Transaction, payload []byte) error { @@ -125,7 +140,7 @@ func (s *state) Add(ctx context.Context, transaction Transaction, payload []byte if !transaction.PayloadHash().Equals(payloadHash) { return errors.New("tx.PayloadHash does not match hash of payload") } - if err := s.payloadStore.WritePayload(contextWithTX, payloadHash, payload); err != nil { + if err := s.WritePayload(contextWithTX, transaction, payloadHash, payload); err != nil { return err } } @@ -133,7 +148,7 @@ func (s *state) Add(ctx context.Context, transaction Transaction, payload []byte return err } - return s.notifyObservers(contextWithTX, transaction, payload) + return s.notifyObservers(contextWithTX, transaction) }) } @@ -175,7 +190,7 @@ func (s *state) WritePayload(ctx context.Context, transaction Transaction, paylo err := s.payloadStore.WritePayload(contextWithTX, payloadHash, data) if err == nil { // ctx passed with bbolt transaction - return s.notifyObservers(contextWithTX, transaction, data) + return s.notifyPayloadObservers(contextWithTX, transaction, data) } return err }) @@ -311,16 +326,42 @@ func (s *state) Walk(ctx context.Context, visitor Visitor, startAt hash.SHA256Ha } // notifyObservers is called from a transactional context. The transactional observers need to be called with the TX context, the other observers after the commit. -func (s *state) notifyObservers(ctx context.Context, transaction Transaction, payload []byte) error { +func (s *state) notifyObservers(ctx context.Context, transaction Transaction) error { // apply TX context observers for _, observer := range s.transactionalObservers { - if err := observer(ctx, transaction, payload); err != nil { + if err := observer(ctx, transaction); err != nil { return fmt.Errorf("observer notification failed: %w", err) } } notifyNonTXObservers := func() { for _, observer := range s.nonTransactionalObservers { + if err := observer(context.Background(), transaction); err != nil { + log.Logger().Errorf("observer notification failed: %v", err) + } + } + } + // check if there's an active transaction + tx, txIsActive := storage.BBoltTX(ctx) + if txIsActive { // sanity check because there should always be a transaction + tx.OnCommit(notifyNonTXObservers) + } else { + notifyNonTXObservers() + } + return nil +} + +// notifyObservers is called from a transactional context. The transactional observers need to be called with the TX context, the other observers after the commit. +func (s *state) notifyPayloadObservers(ctx context.Context, transaction Transaction, payload []byte) error { + // apply TX context observers + for _, observer := range s.transactionalPayloadObservers { + if err := observer(ctx, transaction, payload); err != nil { + return fmt.Errorf("observer notification failed: %w", err) + } + } + + notifyNonTXObservers := func() { + for _, observer := range s.nonTransactionalPayloadObservers { if err := observer(context.Background(), transaction, payload); err != nil { log.Logger().Errorf("observer notification failed: %v", err) } diff --git a/network/dag/state_test.go b/network/dag/state_test.go index 187fc2ec52..b941885e21 100644 --- a/network/dag/state_test.go +++ b/network/dag/state_test.go @@ -170,7 +170,7 @@ func TestState_Observe(t *testing.T) { ctx := context.Background() txState := createState(t) var actual bool - txState.RegisterObserver(func(ctx context.Context, transaction Transaction, _ []byte) error { + txState.RegisterTransactionObserver(func(ctx context.Context, transaction Transaction) error { _, actual = storage.BBoltTX(ctx) return nil }, expected) @@ -187,7 +187,7 @@ func TestState_Observe(t *testing.T) { ctx := context.Background() txState := createState(t) var actual Transaction - txState.RegisterObserver(func(ctx context.Context, transaction Transaction, _ []byte) error { + txState.RegisterTransactionObserver(func(ctx context.Context, transaction Transaction) error { actual = transaction return nil }, false) @@ -203,8 +203,11 @@ func TestState_Observe(t *testing.T) { txState := createState(t) var actualTX Transaction var actualPayload []byte - txState.RegisterObserver(func(ctx context.Context, transaction Transaction, payload []byte) error { + txState.RegisterTransactionObserver(func(ctx context.Context, transaction Transaction) error { actualTX = transaction + return nil + }, false) + txState.RegisterPayloadObserver(func(ctx context.Context, transaction Transaction, payload []byte) error { actualPayload = payload return nil }, false) @@ -229,7 +232,7 @@ func TestState_Observe(t *testing.T) { ctx := context.Background() txState := createState(t) var actual []byte - txState.RegisterObserver(func(ctx context.Context, tx Transaction, payload []byte) error { + txState.RegisterPayloadObserver(func(ctx context.Context, tx Transaction, payload []byte) error { actual = payload return nil }, false) @@ -363,6 +366,32 @@ func TestState_IBLT(t *testing.T) { }) } +func TestState_treeObserver(t *testing.T) { + setup := func(t *testing.T) State { + txState := createState(t) + err := txState.Start() + if !assert.NoError(t, err) { + t.Fatal(err) + } + return txState + } + ctx := context.Background() + + t.Run("callback for public transaction without payload", func(t *testing.T) { + txState := setup(t) + tx := CreateTestTransactionWithJWK(1) + + err := txState.Add(ctx, tx, nil) + + if !assert.NoError(t, err) { + return + } + + xor, _ := txState.XOR(ctx, 1) + assert.False(t, hash.EmptyHash().Equals(xor)) + }) +} + func createState(t *testing.T, verifier ...Verifier) State { testDir := io.TestDirectory(t) s, _ := NewState(testDir, verifier...) diff --git a/network/network.go b/network/network.go index ce95d5b823..baeae5e4fe 100644 --- a/network/network.go +++ b/network/network.go @@ -212,8 +212,8 @@ func (n *Network) Configure(config core.ServerConfig) error { ) } - // register callback from DAG to other engines. - n.state.RegisterObserver(n.emitEvents, true) + // register callback from DAG to other engines, with payload only. + n.state.RegisterPayloadObserver(n.emitEvents, true) return nil } diff --git a/network/network_integration_test.go b/network/network_integration_test.go index 4f59262261..8dbac87ee3 100644 --- a/network/network_integration_test.go +++ b/network/network_integration_test.go @@ -426,6 +426,11 @@ func TestNetworkIntegration_PrivateTransaction(t *testing.T) { return } waitForTransaction(t, tx, "node2") + + // assert not only TX is transfered, but state is updates as well + xor1, _ := node1.state.XOR(context.Background(), math.MaxUint32) + xor2, _ := node2.state.XOR(context.Background(), math.MaxUint32) + assert.Equal(t, xor1.String(), xor2.String()) }) t.Run("event received", func(t *testing.T) { diff --git a/network/transport/v2/protocol.go b/network/transport/v2/protocol.go index a753984694..bda8a369a3 100644 --- a/network/transport/v2/protocol.go +++ b/network/transport/v2/protocol.go @@ -163,7 +163,7 @@ func (p *protocol) Configure(_ transport.PeerID) error { p.gManager.RegisterSender(p.sendGossip) // called after DAG is committed - p.state.RegisterObserver(p.gossipTransaction, false) + p.state.RegisterTransactionObserver(p.gossipTransaction, false) return nil } @@ -213,7 +213,7 @@ func (p *protocol) connectionStateCallback(peer transport.Peer, state transport. } // gossipTransaction is called when a transaction is added to the DAG -func (p *protocol) gossipTransaction(ctx context.Context, tx dag.Transaction, _ []byte) error { +func (p *protocol) gossipTransaction(ctx context.Context, tx dag.Transaction) error { if tx != nil { // can happen when payload is written for private TX xor, clock := p.state.XOR(ctx, math.MaxUint32) p.gManager.TransactionRegistered(tx.Ref(), xor, clock) diff --git a/network/transport/v2/protocol_test.go b/network/transport/v2/protocol_test.go index 53c0660801..95499b39f1 100644 --- a/network/transport/v2/protocol_test.go +++ b/network/transport/v2/protocol_test.go @@ -101,7 +101,7 @@ func TestDefaultConfig(t *testing.T) { func TestProtocol_Configure(t *testing.T) { testDID, _ := did.ParseDID("did:nuts:123") p, mocks := newTestProtocol(t, testDID) - mocks.State.EXPECT().RegisterObserver(gomock.Any(), false) + mocks.State.EXPECT().RegisterTransactionObserver(gomock.Any(), false) assert.NoError(t, p.Configure("")) } @@ -253,7 +253,7 @@ func TestProtocol_gossipTransaction(t *testing.T) { t.Run("ok - no transaction", func(t *testing.T) { proto, _ := newTestProtocol(t, nil) - proto.gossipTransaction(context.Background(), nil, nil) + proto.gossipTransaction(context.Background(), nil) }) t.Run("ok - to gossipManager", func(t *testing.T) { @@ -262,7 +262,7 @@ func TestProtocol_gossipTransaction(t *testing.T) { mocks.State.EXPECT().XOR(context.Background(), uint32(math.MaxUint32)) mocks.Gossip.EXPECT().TransactionRegistered(tx.Ref(), hash.EmptyHash(), uint32(0)) - proto.gossipTransaction(context.Background(), tx, nil) + proto.gossipTransaction(context.Background(), tx) }) }