Skip to content

Commit

Permalink
Optimized file sync provider
Browse files Browse the repository at this point in the history
  • Loading branch information
gotoxu committed May 7, 2019
1 parent 36d65ce commit 63f4291
Showing 1 changed file with 29 additions and 30 deletions.
59 changes: 29 additions & 30 deletions channel/fsync/fsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,6 @@ func NewFileSyncProvider(chainMac common.ChainMac, chainID string, filename stri
pkiID common.PKIidType, adapter Adapter) (*FileSyncProvier, error) {

mac := GenerateMAC(chainMac, filename)

msgChan, _ := adapter.Accept(func(message interface{}) bool {
return message.(*protos.RKSyncMessage).IsDataMsg() &&
bytes.Equal(message.(*protos.RKSyncMessage).ChainMac, chainMac) &&
bytes.Equal([]byte(message.(*protos.RKSyncMessage).GetDataMsg().FileName), []byte(filename))
}, mac, false)

reqChan, _ := adapter.Accept(func(message interface{}) bool {
return message.(*protos.RKSyncMessage).IsDataReq() &&
bytes.Equal(message.(*protos.RKSyncMessage).ChainMac, chainMac) &&
bytes.Equal([]byte(message.(*protos.RKSyncMessage).GetDataReq().FileName), []byte(filename))
}, mac, false)

p := &FileSyncProvier{
Adapter: adapter,
chainMac: chainMac,
Expand All @@ -65,26 +52,38 @@ func NewFileSyncProvider(chainMac common.ChainMac, chainID string, filename stri
mode: mode,
leader: leader,
state: int32(0),
reqChan: reqChan,
msgChan: msgChan,
stopCh: make(chan struct{}, 1),
pkiID: pkiID,
}

p.reqChan, _ = adapter.Accept(func(message interface{}) bool {
return message.(*protos.RKSyncMessage).IsDataReq() &&
bytes.Equal(message.(*protos.RKSyncMessage).ChainMac, chainMac) &&
bytes.Equal([]byte(message.(*protos.RKSyncMessage).GetDataReq().FileName), []byte(filename))
}, mac, false)

p.done.Add(1)
go p.processDataReq()

if p.leader {
return p, nil
}

p.msgChan, _ = adapter.Accept(func(message interface{}) bool {
return message.(*protos.RKSyncMessage).IsDataMsg() &&
bytes.Equal(message.(*protos.RKSyncMessage).ChainMac, chainMac) &&
bytes.Equal([]byte(message.(*protos.RKSyncMessage).GetDataMsg().FileName), []byte(filename))
}, mac, false)

start, err := p.initPayloadBufferStart()
if err != nil {
return nil, err
}
p.payloads = NewPayloadBuffer(start)

if !leader {
p.done.Add(3)
go p.processDataReq()
go p.listen()
go p.periodicalInvocation(4 * time.Second)
} else {
p.done.Add(1)
go p.processDataReq()
}
p.done.Add(2)
go p.listen()
go p.periodicalInvocation(4 * time.Second)

return p, nil
}
Expand Down Expand Up @@ -142,8 +141,8 @@ func (p *FileSyncProvier) listen() {

for {
select {
case <-p.stopCh:
p.stopCh <- struct{}{}
case s := <-p.stopCh:
p.stopCh <- s
logging.Debug("Stop listening for new messages")
return
case msg := <-p.msgChan:
Expand All @@ -157,8 +156,8 @@ func (p *FileSyncProvier) periodicalInvocation(d time.Duration) {

for {
select {
case <-p.stopCh:
p.stopCh <- struct{}{}
case s := <-p.stopCh:
p.stopCh <- s
return
case <-time.After(d):
p.requestDataAppend()
Expand All @@ -177,8 +176,8 @@ func (p *FileSyncProvier) processDataReq() {
case msg := <-p.reqChan:
wg.Add(1)
go p.handleDataReq(msg, wg)
case <-p.stopCh:
p.stopCh <- struct{}{}
case s := <-p.stopCh:
p.stopCh <- s
wg.Wait()
return
}
Expand Down

0 comments on commit 63f4291

Please sign in to comment.