Skip to content

Commit

Permalink
Fix a file syncer bus
Browse files Browse the repository at this point in the history
  • Loading branch information
gotoxu committed Apr 10, 2019
1 parent 1e029de commit bb2b265
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 24 deletions.
2 changes: 1 addition & 1 deletion channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Channel interface {
HandleMessage(protos.ReceivedMessage)

// Initialize allocates the ChainState and should be invoked once per channel per creation
Initialize(string, []common.PKIidType, []common.FileSyncInfo) (*protos.ChainState, error)
Initialize(string, []common.PKIidType, []*common.FileSyncInfo) (*protos.ChainState, error)

// InitializeWithChanState allocates the ChainState message
InitializeWithChainState(*protos.ChainState) error
Expand Down
6 changes: 4 additions & 2 deletions channel/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (gc *gossipChannel) InitializeWithChainState(chainState *protos.ChainState)
return nil
}

func (gc *gossipChannel) Initialize(chainID string, members []common.PKIidType, files []common.FileSyncInfo) (*protos.ChainState, error) {
func (gc *gossipChannel) Initialize(chainID string, members []common.PKIidType, files []*common.FileSyncInfo) (*protos.ChainState, error) {
gc.Lock()
defer gc.Unlock()

Expand Down Expand Up @@ -272,7 +272,7 @@ func (gc *gossipChannel) AddFile(files []*common.FileSyncInfo) (*protos.ChainSta
return nil, errors.Errorf("Unknow file mode: %s", file.Mode)
}

f := &protos.File{Path: file.Path, Mode: protos.File_Mode(mode)}
f := &protos.File{Path: file.Path, Mode: protos.File_Mode(mode), Metadata: file.Metadata}
stateInfo.Properties.Files = append(stateInfo.Properties.Files, f)

err = gc.fileState.createProvider(file.Path, protos.File_Mode(mode), file.Metadata, true)
Expand Down Expand Up @@ -372,6 +372,8 @@ func (gc *gossipChannel) HandleMessage(msg protos.ReceivedMessage) {
err = gc.updateChainState(m.GetState(), msg.GetConnectionInfo().ID)
if err == nil {
gc.Forward(msg)
} else {
logging.Errorf("Failed updating chain state message: %s", err)
}
}

Expand Down
6 changes: 3 additions & 3 deletions gossip/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestChannelInit(t *testing.T) {

fmt.Println("Create channel")
mac := channel.GenerateMAC(gossipSvc1.SelfPKIid(), "testchannel")
_, err = gossipSvc1.CreateChain(mac, "testchannel", []common.FileSyncInfo{})
_, err = gossipSvc1.CreateChain(mac, "testchannel", []*common.FileSyncInfo{})
assert.NoError(t, err)
fmt.Println("Add member to channel")
_, err = gossipSvc1.AddMemberToChain(mac, gossipSvc2.SelfPKIid())
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestChannelClose(t *testing.T) {
defer gossipSvc2.Stop()

mac := channel.GenerateMAC(gossipSvc1.SelfPKIid(), "closechannel")
_, err = gossipSvc1.CreateChain(mac, "closechannel", []common.FileSyncInfo{})
_, err = gossipSvc1.CreateChain(mac, "closechannel", []*common.FileSyncInfo{})
assert.NoError(t, err)

_, err = gossipSvc1.AddMemberToChain(mac, gossipSvc2.SelfPKIid())
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestRemoveMemberWithChain(t *testing.T) {
defer gossipSvc2.Stop()

mac := channel.GenerateMAC(gossipSvc1.SelfPKIid(), "channel3")
_, err = gossipSvc1.CreateChain(mac, "channel3", []common.FileSyncInfo{})
_, err = gossipSvc1.CreateChain(mac, "channel3", []*common.FileSyncInfo{})
assert.NoError(t, err)

_, err = gossipSvc1.AddMemberToChain(mac, gossipSvc2.SelfPKIid())
Expand Down
68 changes: 56 additions & 12 deletions gossip/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func TestFileSync(t *testing.T) {
defer gossipSvc2.Stop()

mac := channel.GenerateMAC(gossipSvc1.SelfPKIid(), "testchannel")
_, err = gossipSvc1.CreateChain(mac, "testchannel", []common.FileSyncInfo{
common.FileSyncInfo{Path: "101.png", Mode: "Append"},
common.FileSyncInfo{Path: "config.yaml", Mode: "Append"},
common.FileSyncInfo{Path: "rfc2616.txt", Mode: "Append"},
_, err = gossipSvc1.CreateChain(mac, "testchannel", []*common.FileSyncInfo{
&common.FileSyncInfo{Path: "101.png", Mode: "Append"},
&common.FileSyncInfo{Path: "config.yaml", Mode: "Append"},
&common.FileSyncInfo{Path: "rfc2616.txt", Mode: "Append"},
})
assert.NoError(t, err)
_, err = gossipSvc1.AddMemberToChain(mac, gossipSvc2.SelfPKIid())
Expand Down Expand Up @@ -63,10 +63,10 @@ func TestChainStateDynamicUpdate(t *testing.T) {
defer gossipSvc1.Stop()

mac := channel.GenerateMAC(gossipSvc1.SelfPKIid(), "testchain")
_, err = gossipSvc1.CreateChain(mac, "testchain", []common.FileSyncInfo{
common.FileSyncInfo{Path: "101.png", Mode: "Append"},
common.FileSyncInfo{Path: "config.yaml", Mode: "Append"},
common.FileSyncInfo{Path: "rfc2616.txt", Mode: "Append"},
_, err = gossipSvc1.CreateChain(mac, "testchain", []*common.FileSyncInfo{
&common.FileSyncInfo{Path: "101.png", Mode: "Append"},
&common.FileSyncInfo{Path: "config.yaml", Mode: "Append"},
&common.FileSyncInfo{Path: "rfc2616.txt", Mode: "Append"},
})
assert.NoError(t, err)

Expand Down Expand Up @@ -129,10 +129,9 @@ func TestFileMetadata(t *testing.T) {
defer gossipSvc1.Stop()

mac := channel.GenerateMAC(gossipSvc1.SelfPKIid(), "testchain")
_, err = gossipSvc1.CreateChain(mac, "testchain", []common.FileSyncInfo{
common.FileSyncInfo{Path: "101.png", Mode: "Append", Metadata: createMetadata("101.png", "png")},
common.FileSyncInfo{Path: "config.yaml", Mode: "Append", Metadata: createMetadata("config.yaml", "yaml")},
common.FileSyncInfo{Path: "rfc2616.txt", Mode: "Append", Metadata: createMetadata("rfc2616.txt", "txt")},
_, err = gossipSvc1.CreateChain(mac, "testchain", []*common.FileSyncInfo{
&common.FileSyncInfo{Path: "101.png", Mode: "Append", Metadata: createMetadata("101.png", "png")},
&common.FileSyncInfo{Path: "config.yaml", Mode: "Append", Metadata: createMetadata("config.yaml", "yaml")},
})
assert.NoError(t, err)

Expand All @@ -156,6 +155,51 @@ func TestFileMetadata(t *testing.T) {
state := msg.GetStateInfo()
assert.NotNil(t, state)
assert.Len(t, state.Properties.Members, 2)
assert.Len(t, state.Properties.Files, 2)

for _, f := range state.Properties.Files {
m := metadata{}
err := json.Unmarshal(f.Metadata, &m)
assert.NoError(t, err)
assert.Equal(t, f.Path, m.Name)
assert.Equal(t, filepath.Ext(f.Path), fmt.Sprintf(".%s", m.Type))
}

_, err = gossipSvc1.AddFileToChain(mac, []*common.FileSyncInfo{
&common.FileSyncInfo{Path: "rfc2616.txt", Mode: "Append", Metadata: createMetadata("rfc2616.txt", "txt")},
})
assert.NoError(t, err)

chain = gossipSvc1.SelfChainInfo("testchain")
assert.NotNil(t, chain)

msg, err = chain.Envelope.ToRKSyncMessage()
assert.NoError(t, err)

state = msg.GetStateInfo()
assert.NotNil(t, state)
assert.Len(t, state.Properties.Members, 2)
assert.Len(t, state.Properties.Files, 3)

for _, f := range state.Properties.Files {
m := metadata{}
err := json.Unmarshal(f.Metadata, &m)
assert.NoError(t, err)
assert.Equal(t, f.Path, m.Name)
assert.Equal(t, filepath.Ext(f.Path), fmt.Sprintf(".%s", m.Type))
}

time.Sleep(5 * time.Second)

chain = gossipSvc2.SelfChainInfo("testchain")
assert.NotNil(t, chain)

msg, err = chain.Envelope.ToRKSyncMessage()
assert.NoError(t, err)

state = msg.GetStateInfo()
assert.NotNil(t, state)
assert.Len(t, state.Properties.Members, 2)
assert.Len(t, state.Properties.Files, 3)

for _, f := range state.Properties.Files {
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Gossip interface {
InitializeChain(chainMac common.ChainMac, chainState *protos.ChainState) error

// CreateChain creates a channel
CreateChain(chainMac common.ChainMac, chainID string, files []common.FileSyncInfo) (*protos.ChainState, error)
CreateChain(chainMac common.ChainMac, chainID string, files []*common.FileSyncInfo) (*protos.ChainState, error)

// CloseChain closes a channel
CloseChain(chainMac common.ChainMac, notify bool) error
Expand Down
2 changes: 1 addition & 1 deletion gossip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (g *gossipService) GetPKIidOfCert(nodeID string, cert *x509.Certificate) (c
return digest, nil
}

func (g *gossipService) CreateChain(chainMac common.ChainMac, chainID string, files []common.FileSyncInfo) (*protos.ChainState, error) {
func (g *gossipService) CreateChain(chainMac common.ChainMac, chainID string, files []*common.FileSyncInfo) (*protos.ChainState, error) {
if len(chainMac) == 0 {
return nil, errors.New("Channel mac can't be nil or empty")
}
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (srv *Server) Stop() {
}

// CreateChannel creates a channel
func (srv *Server) CreateChannel(chainID string, files []common.FileSyncInfo) error {
func (srv *Server) CreateChannel(chainID string, files []*common.FileSyncInfo) error {
logging.Debugf("Creating channel, ID: %s", chainID)

if err := validateChannelID(chainID); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ func TestRKSyncServiceServe(t *testing.T) {
assert.NoError(t, err)
defer srv2.Stop()

err = srv1.CreateChannel("testchannel", []common.FileSyncInfo{
common.FileSyncInfo{Path: "101.png", Mode: "Append"},
common.FileSyncInfo{Path: "config.yaml", Mode: "Append"},
err = srv1.CreateChannel("testchannel", []*common.FileSyncInfo{
&common.FileSyncInfo{Path: "101.png", Mode: "Append"},
&common.FileSyncInfo{Path: "config.yaml", Mode: "Append"},
})
assert.NoError(t, err)

Expand Down

0 comments on commit bb2b265

Please sign in to comment.