Skip to content

Commit

Permalink
Xuan Fix for syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulSnow committed Oct 4, 2019
1 parent cc10d15 commit 3b777da
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 74 deletions.
80 changes: 27 additions & 53 deletions engine/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,72 +5,46 @@
package engine

import (
"fmt"
"time"

"github.com/FactomProject/factomd/common/interfaces"
s "github.com/FactomProject/factomd/state"
"github.com/FactomProject/factomd/state"
)

var _ = (*s.State)(nil)

// Timer
// Provides a tick add inserts it into the TickerQueue to trigger EOM generation by
// leaders.
func Timer(stateI interfaces.IState) {
state := stateI.(*s.State)
time.Sleep(2 * time.Second)

tenthPeriod := state.GetMinuteDuration()

now := time.Now().UnixNano() // Time in billionths of a second

wait := tenthPeriod.Nanoseconds() - (now % tenthPeriod.Nanoseconds())

next := now + wait + tenthPeriod.Nanoseconds()

if state.GetOut() {
state.Print(fmt.Sprintf("Time: %v\r\n", time.Now()))
}

time.Sleep(time.Duration(wait))
s := stateI.(*state.State)

var last int64
for {
for i := 0; i < 10; i++ {
// Don't stuff messages into the system if the
// Leader is behind.
for j := 0; j < 10 && len(state.AckQueue()) > 1000; j++ {
time.Sleep(time.Millisecond * 10)
}
tenthPeriod := s.GetMinuteDuration().Nanoseconds() // The length of the minute can change, so do this each time
now := time.Now().UnixNano() // Get the current time
sleep := tenthPeriod - now%tenthPeriod
time.Sleep(time.Duration(sleep)) // Sleep the length of time from now to the next minute

// Delay some number of milliseconds. This is a debugging tool for testing how well we handle
// Leaders running with slightly different minutes in test environments.
time.Sleep(time.Duration(s.GetTimeOffset().GetTimeMilli()) * time.Millisecond)

if s.Leader {
now = time.Now().UnixNano()
if now > next {
next += tenthPeriod.Nanoseconds()
wait = next - now
} else {
wait = next - now
next += tenthPeriod.Nanoseconds()
issueTime := last
if s.EOMSyncEnd > s.EOMIssueTime {
issueTime = s.EOMIssueTime
}
time.Sleep(time.Duration(wait))

// Delay some number of milliseconds.
time.Sleep(time.Duration(state.GetTimeOffset().GetTimeMilli()) * time.Millisecond)

state.TickerQueue() <- -1 // -1 indicated this is real minute cadence
last = s.EOMIssueTime

tenthPeriod = state.GetMinuteDuration()
state.LogPrintf("ticker", "Tick! %d, wait=%s, tenthPeriod=%s", i, time.Duration(wait), time.Duration(tenthPeriod))
if s.EOMIssueTime-issueTime > tenthPeriod*8/100 {
s.EOMIssueTime = 0 // Don't skip more than one ticker
s.LogPrintf("ticker", "10%s Skip ticker Sleep %4.2f", s.FactomNodeName, float64(sleep)/1000000000)
continue
}
}
}
}

func PrintBusy(state interfaces.IState, i int) {
s := state.(*s.State)
s.LogPrintf("ticker", "10%s send ticker Sleep %4.2f", s.FactomNodeName, float64(sleep)/1000000000)
s.EOMSyncEnd = 0

if len(s.ShutdownChan) == 0 {
if state.GetOut() {
state.Print(fmt.Sprintf("\r%19s: %s %s",
"Timer",
state.String(),
(string)((([]byte)("-\\|/-\\|/-="))[i])))
}
s.TickerQueue() <- -1 // -1 indicated this is real minute cadence
}

}
3 changes: 2 additions & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ type State struct {
DBFinished bool
RunLeader bool
BootTime int64 // Time in seconds that we last booted
EOMIssueTime int64
EOMSyncEnd int64

// Ignore missing messages for a period to allow rebooting a network where your
// own messages from the previously executing network can confuse you.
Expand Down Expand Up @@ -238,7 +240,6 @@ type State struct {
CurrentBlockStartTime int64

EOMsyncing bool
EOMSyncTime int64
EOM bool // Set to true when the first EOM is encountered
EOMLimit int
EOMProcessed int
Expand Down
11 changes: 7 additions & 4 deletions state/stateConsensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,6 @@ func (s *State) MoveStateToHeight(dbheight uint32, newMinute int) {
if s.LLeaderHeight != dbheight {
fmt.Fprintf(os.Stderr, "State move between non-sequential heights from %d to %d\n", s.LLeaderHeight, dbheight)
}

//force sync state to a rational state for between minutes
s.Syncing = false // movestatetoheight
s.EOM = false // movestatetoheight
Expand Down Expand Up @@ -977,7 +976,7 @@ func (s *State) repost(m interfaces.IMsg, delay int) {
time.Sleep(time.Duration(delay) * s.FactomSecond()) // delay in Factom seconds
}
//s.LogMessage("MsgQueue", fmt.Sprintf("enqueue_%s(%d)", whereAmI, len(s.msgQueue)), m)
s.LogMessage("MsgQueue", fmt.Sprintf("enqueue (%d)", len(s.msgQueue)), m)
s.LogMessage("MsgQueue", fmt.Sprintf("repost enqueue (%d)", len(s.msgQueue)), m)
s.msgQueue <- m // Goes in the "do this really fast" queue so we are prompt about EOM's while syncing
}()
}
Expand Down Expand Up @@ -1505,6 +1504,8 @@ func (s *State) LeaderExecuteEOM(m interfaces.IMsg) {
fix = true
}

s.EOMIssueTime = time.Now().UnixNano() // Time we issue the EOM

// make sure EOM has the right data
eom.DBHeight = s.LLeaderHeight
eom.VMIndex = s.LeaderVMIndex
Expand All @@ -1527,6 +1528,7 @@ func (s *State) LeaderExecuteEOM(m interfaces.IMsg) {
ack.SendOut(s, ack)
eom.SendOut(s, eom)
s.FollowerExecuteEOM(eom)
s.LogMessage("executeMsg", "issue EOM", eom)
s.UpdateState()
}

Expand Down Expand Up @@ -1985,13 +1987,15 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool {
s.EOMProcessed--
if s.EOMProcessed <= 0 { // why less than or equal?
s.SendHeartBeat() // Only do this once per minute

s.LogPrintf("dbsig-eom", "ProcessEOM complete for %d", e.Minute)
// setup to sync next minute ...
s.Syncing = false // ProcessEOM (EOM complete)
s.EOM = false // ProcessEOM (EOM complete)
s.EOMDone = false // ProcessEOM (EOM complete)
s.EOMProcessed = 0 // ProcessEOM (EOM complete)

s.EOMSyncEnd = time.Now().UnixNano()

for _, vm := range pl.VMs {
vm.Synced = false // ProcessEOM (EOM complete)
}
Expand Down Expand Up @@ -2128,7 +2132,6 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool {
// e.VMIndex, allfaults, s.EOMProcessed, s.EOMLimit, s.EOMDone))

s.EOMDone = true // ProcessEOM
s.EOMSyncTime = time.Now().UnixNano()
for _, eb := range pl.NewEBlocks {
eb.AddEndOfMinuteMarker(byte(e.Minute + 1))
}
Expand Down
21 changes: 5 additions & 16 deletions state/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,11 @@ func (s *State) ValidatorLoop() {
currentMinute = 9 // treat minute 10 as an extension of minute 9
}
if lastHeight == int(s.LLeaderHeight) && lastMinute == currentMinute && s.LeaderVMIndex == lastVM {
// This eom was already generated. We shouldn't generate it again.
// This does mean we missed an EOM boundary, and the next EOM won't occur for another
// "minute". This could cause some serious sliding, as minutes could be an addition 100%
// in length.
if c == -1 { // This means we received a normal eom cadence timer
c = 8 // Send 8 retries on a 1/10 of the normal minute period
}
if c > 0 {
go func() {
// We sleep for 1/10 of a minute, and try again
time.Sleep(s.GetMinuteDuration() / 10)
s.tickerQueue <- c - 1
}()
}
s.LogPrintf("timer", "retry %d", c)
s.LogPrintf("validator", "retry %d %d-:-%d %d", c, s.LLeaderHeight, currentMinute, s.LeaderVMIndex)

// Drop ticker

s.LogPrintf("timer", "drop %d", c)
s.LogPrintf("validator", "drop %d %d-:-%d %d", c, s.LLeaderHeight, currentMinute, s.LeaderVMIndex)
continue // Already generated this eom
}

Expand Down

0 comments on commit 3b777da

Please sign in to comment.