diff --git a/channel/channel.go b/channel/channel.go index e96bd7b..1d141a8 100644 --- a/channel/channel.go +++ b/channel/channel.go @@ -48,11 +48,11 @@ type Channel interface { // RemoveMember removes member contained in the channel RemoveMember(common.PKIidType) (*protos.ChainState, error) - // AddFile adds file to the channel - AddFile(common.FileSyncInfo) (*protos.ChainState, error) + // AddFile adds files to the channel + AddFile([]*common.FileSyncInfo) (*protos.ChainState, error) // RemoveFile removes file contained in the channel - RemoveFile(string) (*protos.ChainState, error) + RemoveFile([]string) (*protos.ChainState, error) // Stop the channel's activity Stop() diff --git a/channel/service.go b/channel/service.go index 581d7ad..c05ce89 100644 --- a/channel/service.go +++ b/channel/service.go @@ -252,11 +252,7 @@ func (gc *gossipChannel) RemoveMember(member common.PKIidType) (*protos.ChainSta return gc.chainStateMsg, nil } -func (gc *gossipChannel) AddFile(file common.FileSyncInfo) (*protos.ChainState, error) { - if gc.fileState.lookupFSyncProviderByFilename(file.Path) != nil { - return nil, errors.Errorf("File %s has already exists ", file.Path) - } - +func (gc *gossipChannel) AddFile(files []*common.FileSyncInfo) (*protos.ChainState, error) { gc.Lock() defer gc.Unlock() @@ -265,19 +261,25 @@ func (gc *gossipChannel) AddFile(file common.FileSyncInfo) (*protos.ChainState, return nil, err } - mode, exists := protos.File_Mode_value[file.Mode] - if !exists { - return nil, errors.Errorf("Unknow file mode: %s", file.Mode) - } + for _, file := range files { + if gc.fileState.lookupFSyncProviderByFilename(file.Path) != nil { + logging.Warningf("File %s has already exists", file.Path) + continue + } - for _, f := range stateInfo.Properties.Files { - if f.Path == file.Path { - return gc.chainStateMsg, nil + mode, exists := protos.File_Mode_value[file.Mode] + if !exists { + return nil, errors.Errorf("Unknow file mode: %s", file.Mode) } - } - f := &protos.File{Path: file.Path, Mode: protos.File_Mode(mode)} - stateInfo.Properties.Files = append(stateInfo.Properties.Files, f) + f := &protos.File{Path: file.Path, Mode: protos.File_Mode(mode)} + stateInfo.Properties.Files = append(stateInfo.Properties.Files, f) + + err = gc.fileState.createProvider(file.Path, protos.File_Mode(mode), file.Metadata, true) + if err != nil { + return nil, errors.Wrap(err, "Failed creating file sync provider") + } + } envp, err := msg.Sign(func(msg []byte) ([]byte, error) { return gc.idMapper.Sign(msg) @@ -287,12 +289,10 @@ func (gc *gossipChannel) AddFile(file common.FileSyncInfo) (*protos.ChainState, } gc.chainStateMsg.Envelope = envp gc.chainStateMsg.SeqNum = uint64(time.Now().UnixNano()) - - gc.fileState.createProvider(file.Path, protos.File_Mode(mode), file.Metadata, true) return gc.chainStateMsg, nil } -func (gc *gossipChannel) RemoveFile(filename string) (*protos.ChainState, error) { +func (gc *gossipChannel) RemoveFile(filenames []string) (*protos.ChainState, error) { gc.Lock() defer gc.Unlock() @@ -301,19 +301,18 @@ func (gc *gossipChannel) RemoveFile(filename string) (*protos.ChainState, error) return nil, err } - var found bool - n := len(stateInfo.Properties.Files) - for i := 0; i < n; i++ { - f := stateInfo.Properties.Files[i] - if f.Path == filename { - stateInfo.Properties.Files = append(stateInfo.Properties.Files[:i], stateInfo.Properties.Files[i+1:]...) - found = true - break - } - } + for _, filename := range filenames { + gc.fileState.closeFSyncProvider(filename) + gc.Unregister(fsync.GenerateMAC(gc.chainMac, filename)) - if !found { - return gc.chainStateMsg, nil + n := len(stateInfo.Properties.Files) + for i := 0; i < n; i++ { + f := stateInfo.Properties.Files[i] + if f.Path == filename { + stateInfo.Properties.Files = append(stateInfo.Properties.Files[:i], stateInfo.Properties.Files[i+1:]...) + break + } + } } envp, err := msg.Sign(func(msg []byte) ([]byte, error) { @@ -326,9 +325,6 @@ func (gc *gossipChannel) RemoveFile(filename string) (*protos.ChainState, error) gc.chainStateMsg.Envelope = envp gc.chainStateMsg.SeqNum = uint64(time.Now().UnixNano()) - gc.fileState.closeFSyncProvider(filename) - gc.Unregister(fsync.GenerateMAC(gc.chainMac, filename)) - return gc.chainStateMsg, nil } diff --git a/gossip/batcher.go b/gossip/batcher.go index de9e2bb..d499a4d 100644 --- a/gossip/batcher.go +++ b/gossip/batcher.go @@ -70,6 +70,9 @@ type batchedMessage struct { } func (p *batchingEmitterImpl) Add(message interface{}) { + if p.toDie() { + return + } if p.iterations == 0 { return } diff --git a/gossip/file_test.go b/gossip/file_test.go index 23e07af..8b01025 100644 --- a/gossip/file_test.go +++ b/gossip/file_test.go @@ -37,9 +37,9 @@ func TestFileSync(t *testing.T) { assert.NoError(t, err) _, err = gossipSvc1.AddMemberToChain(mac, gossipSvc2.SelfPKIid()) assert.NoError(t, err) - _, err = gossipSvc1.AddFileToChain(mac, common.FileSyncInfo{Path: "https-cert.pem", Mode: "Append"}) + _, err = gossipSvc1.AddFileToChain(mac, []*common.FileSyncInfo{&common.FileSyncInfo{Path: "https-cert.pem", Mode: "Append"}}) assert.NoError(t, err) - _, err = gossipSvc1.AddFileToChain(mac, common.FileSyncInfo{Path: "https-key.pem", Mode: "Append"}) + _, err = gossipSvc1.AddFileToChain(mac, []*common.FileSyncInfo{&common.FileSyncInfo{Path: "https-key.pem", Mode: "Append"}}) assert.NoError(t, err) time.Sleep(5 * time.Second) @@ -95,7 +95,7 @@ func TestChainStateDynamicUpdate(t *testing.T) { assert.Len(t, state.Properties.Members, 2) assert.Len(t, state.Properties.Files, 3) - _, err = gossipSvc1.AddFileToChain(mac, common.FileSyncInfo{Path: "https-cert.pem", Mode: "Append"}) + _, err = gossipSvc1.AddFileToChain(mac, []*common.FileSyncInfo{&common.FileSyncInfo{Path: "https-cert.pem", Mode: "Append"}}) assert.NoError(t, err) time.Sleep(5 * time.Second) diff --git a/gossip/gossip.go b/gossip/gossip.go index 6f48b2f..4039342 100644 --- a/gossip/gossip.go +++ b/gossip/gossip.go @@ -39,10 +39,10 @@ type Gossip interface { RemoveMemberWithChain(chainMac common.ChainMac, member common.PKIidType) (*protos.ChainState, error) // AddFileToChain adds file to channel - AddFileToChain(chainMac common.ChainMac, file common.FileSyncInfo) (*protos.ChainState, error) + AddFileToChain(chainMac common.ChainMac, files []*common.FileSyncInfo) (*protos.ChainState, error) // RemoveFileWithChain removes file contained in the channel - RemoveFileWithChain(chainMac common.ChainMac, filename string) (*protos.ChainState, error) + RemoveFileWithChain(chainMac common.ChainMac, filenames []string) (*protos.ChainState, error) // GetPKIidOfCert returns the PKI-ID of a certificate GetPKIidOfCert(nodeID string, cert *x509.Certificate) (common.PKIidType, error) diff --git a/gossip/service.go b/gossip/service.go index bbf7561..828a68d 100644 --- a/gossip/service.go +++ b/gossip/service.go @@ -204,22 +204,22 @@ func (g *gossipService) RemoveMemberWithChain(chainMac common.ChainMac, member c return gc.RemoveMember(member) } -func (g *gossipService) AddFileToChain(chainMac common.ChainMac, file common.FileSyncInfo) (*protos.ChainState, error) { +func (g *gossipService) AddFileToChain(chainMac common.ChainMac, files []*common.FileSyncInfo) (*protos.ChainState, error) { gc := g.chanState.getChannelByMAC(chainMac) if gc == nil { return nil, errors.Errorf("Channel %s not yet created", chainMac) } - return gc.AddFile(file) + return gc.AddFile(files) } -func (g *gossipService) RemoveFileWithChain(chainMac common.ChainMac, filename string) (*protos.ChainState, error) { +func (g *gossipService) RemoveFileWithChain(chainMac common.ChainMac, filenames []string) (*protos.ChainState, error) { gc := g.chanState.getChannelByMAC(chainMac) if gc == nil { return nil, errors.Errorf("Channel %s not yet created", chainMac) } - return gc.RemoveFile(filename) + return gc.RemoveFile(filenames) } func (g *gossipService) GetPKIidOfCert(nodeID string, cert *x509.Certificate) (common.PKIidType, error) { @@ -327,8 +327,8 @@ func (g *gossipService) Stop() { logging.Info("Stopping gossip") defer logging.Info("Stopped gossip") g.chanState.stop() - g.discAdapter.close() g.disc.Stop() + g.discAdapter.close() g.toDieChan <- struct{}{} g.emitter.Stop() g.ChannelDeMultiplexer.Close() @@ -579,8 +579,6 @@ func (g *gossipService) isInChannel(m protos.ReceivedMessage) bool { } func (g *gossipService) forwardDiscoveryMsg(msg protos.ReceivedMessage) { - g.discAdapter.RLock() - defer g.discAdapter.RUnlock() if g.discAdapter.toDie() { return } @@ -717,14 +715,10 @@ type discoveryAdapter struct { gossipFunc func(message *protos.SignedRKSyncMessage) forwardFunc func(message protos.ReceivedMessage) disclosurePolicy discovery.DisclosurePolicy - sync.RWMutex } func (da *discoveryAdapter) close() { atomic.StoreInt32(&da.stopping, int32(1)) - - da.Lock() - defer da.Unlock() close(da.incChan) } @@ -733,8 +727,6 @@ func (da *discoveryAdapter) toDie() bool { } func (da *discoveryAdapter) Gossip(msg *protos.SignedRKSyncMessage) { - da.RLock() - defer da.RUnlock() if da.toDie() { return } @@ -743,8 +735,6 @@ func (da *discoveryAdapter) Gossip(msg *protos.SignedRKSyncMessage) { } func (da *discoveryAdapter) Forward(msg protos.ReceivedMessage) { - da.RLock() - defer da.RUnlock() if da.toDie() { return } @@ -753,8 +743,6 @@ func (da *discoveryAdapter) Forward(msg protos.ReceivedMessage) { } func (da *discoveryAdapter) SendToPeer(peer *common.NetworkMember, msg *protos.SignedRKSyncMessage) { - da.RLock() - defer da.RUnlock() if da.toDie() { return } diff --git a/server_test.go b/server_test.go index 776ad39..f770ca3 100644 --- a/server_test.go +++ b/server_test.go @@ -190,7 +190,7 @@ func TestRKSyncServiceServe(t *testing.T) { chainInfo := srv2.gossip.SelfChainInfo("testchannel") assert.NotNil(t, chainInfo) - err = srv1.AddFileToChan("testchannel", "rfc2616.txt", "Append", []byte{}) + err = srv1.AddFileToChan("testchannel", []*common.FileSyncInfo{&common.FileSyncInfo{Path: "rfc2616.txt", Mode: "Append", Metadata: []byte{}}}) assert.NoError(t, err) time.Sleep(5 * time.Second) diff --git a/service.go b/service.go index 6502e1a..14cdc1c 100644 --- a/service.go +++ b/service.go @@ -218,19 +218,16 @@ func (srv *Server) RemoveMemberWithChan(chainID string, nodeID string, cert *x50 } // AddFileToChan adds a file to the channel -func (srv *Server) AddFileToChan(chainID string, filepath string, filemode string, metadata []byte) error { +func (srv *Server) AddFileToChan(chainID string, files []*common.FileSyncInfo) error { if chainID == "" { return errors.New("Channel ID must be provided") } - if filepath == "" { - return errors.New("File path must be provided") - } - if filemode == "" { - return errors.New("File mode must be provided") + if len(files) == 0 { + return errors.New("files can't be nil or empty") } mac := channel.GenerateMAC(srv.gossip.SelfPKIid(), chainID) - chainState, err := srv.gossip.AddFileToChain(mac, common.FileSyncInfo{Path: filepath, Mode: filemode, Metadata: metadata}) + chainState, err := srv.gossip.AddFileToChain(mac, files) if err != nil { return err } @@ -239,16 +236,16 @@ func (srv *Server) AddFileToChan(chainID string, filepath string, filemode strin } // RemoveFileWithChan removes file contained in the channel -func (srv *Server) RemoveFileWithChan(chainID string, filename string) error { +func (srv *Server) RemoveFileWithChan(chainID string, filenames []string) error { if chainID == "" { return errors.New("Channel ID must be provided") } - if filename == "" { - return errors.New("File name must be provided") + if len(filenames) == 0 { + return errors.New("files can't be nil or empty") } mac := channel.GenerateMAC(srv.gossip.SelfPKIid(), chainID) - chainState, err := srv.gossip.RemoveFileWithChain(mac, filename) + chainState, err := srv.gossip.RemoveFileWithChain(mac, filenames) if err != nil { return err }