diff --git a/VERSION b/VERSION index a4c853ea2e..133cad286f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -6.4.2 +6.4.3 diff --git a/engine/timer.go b/engine/timer.go index 50b4f1309f..00acab09d7 100644 --- a/engine/timer.go +++ b/engine/timer.go @@ -5,72 +5,46 @@ package engine import ( - "fmt" "time" "github.com/FactomProject/factomd/common/interfaces" - s "github.com/FactomProject/factomd/state" + "github.com/FactomProject/factomd/state" ) -var _ = (*s.State)(nil) - +// Timer +// Provides a tick add inserts it into the TickerQueue to trigger EOM generation by +// leaders. func Timer(stateI interfaces.IState) { - state := stateI.(*s.State) - time.Sleep(2 * time.Second) - - tenthPeriod := state.GetMinuteDuration() - - now := time.Now().UnixNano() // Time in billionths of a second - - wait := tenthPeriod.Nanoseconds() - (now % tenthPeriod.Nanoseconds()) - - next := now + wait + tenthPeriod.Nanoseconds() - - if state.GetOut() { - state.Print(fmt.Sprintf("Time: %v\r\n", time.Now())) - } - - time.Sleep(time.Duration(wait)) + s := stateI.(*state.State) + var last int64 for { - for i := 0; i < 10; i++ { - // Don't stuff messages into the system if the - // Leader is behind. - for j := 0; j < 10 && len(state.AckQueue()) > 1000; j++ { - time.Sleep(time.Millisecond * 10) - } + tenthPeriod := s.GetMinuteDuration().Nanoseconds() // The length of the minute can change, so do this each time + now := time.Now().UnixNano() // Get the current time + sleep := tenthPeriod - now%tenthPeriod + time.Sleep(time.Duration(sleep)) // Sleep the length of time from now to the next minute + // Delay some number of milliseconds. This is a debugging tool for testing how well we handle + // Leaders running with slightly different minutes in test environments. + time.Sleep(time.Duration(s.GetTimeOffset().GetTimeMilli()) * time.Millisecond) + + if s.Leader { now = time.Now().UnixNano() - if now > next { - next += tenthPeriod.Nanoseconds() - wait = next - now - } else { - wait = next - now - next += tenthPeriod.Nanoseconds() + issueTime := last + if s.EOMSyncEnd > s.EOMIssueTime { + issueTime = s.EOMIssueTime } - time.Sleep(time.Duration(wait)) - - // Delay some number of milliseconds. - time.Sleep(time.Duration(state.GetTimeOffset().GetTimeMilli()) * time.Millisecond) - - state.TickerQueue() <- -1 // -1 indicated this is real minute cadence + last = s.EOMIssueTime - tenthPeriod = state.GetMinuteDuration() - state.LogPrintf("ticker", "Tick! %d, wait=%s, tenthPeriod=%s", i, time.Duration(wait), time.Duration(tenthPeriod)) + if s.EOMIssueTime-issueTime > tenthPeriod*8/100 { + s.EOMIssueTime = 0 // Don't skip more than one ticker + s.LogPrintf("ticker", "10%s Skip ticker Sleep %4.2f", s.FactomNodeName, float64(sleep)/1000000000) + continue + } } - } -} - -func PrintBusy(state interfaces.IState, i int) { - s := state.(*s.State) + s.LogPrintf("ticker", "10%s send ticker Sleep %4.2f", s.FactomNodeName, float64(sleep)/1000000000) + s.EOMSyncEnd = 0 - if len(s.ShutdownChan) == 0 { - if state.GetOut() { - state.Print(fmt.Sprintf("\r%19s: %s %s", - "Timer", - state.String(), - (string)((([]byte)("-\\|/-\\|/-="))[i]))) - } + s.TickerQueue() <- -1 // -1 indicated this is real minute cadence } - } diff --git a/state/state.go b/state/state.go index 81017ff070..9966e2e911 100644 --- a/state/state.go +++ b/state/state.go @@ -210,6 +210,8 @@ type State struct { DBFinished bool RunLeader bool BootTime int64 // Time in seconds that we last booted + EOMIssueTime int64 + EOMSyncEnd int64 // Ignore missing messages for a period to allow rebooting a network where your // own messages from the previously executing network can confuse you. @@ -238,7 +240,6 @@ type State struct { CurrentBlockStartTime int64 EOMsyncing bool - EOMSyncTime int64 EOM bool // Set to true when the first EOM is encountered EOMLimit int EOMProcessed int diff --git a/state/stateConsensus.go b/state/stateConsensus.go index 03d9b1038e..8db29d8c52 100644 --- a/state/stateConsensus.go +++ b/state/stateConsensus.go @@ -776,7 +776,6 @@ func (s *State) MoveStateToHeight(dbheight uint32, newMinute int) { if s.LLeaderHeight != dbheight { fmt.Fprintf(os.Stderr, "State move between non-sequential heights from %d to %d\n", s.LLeaderHeight, dbheight) } - //force sync state to a rational state for between minutes s.Syncing = false // movestatetoheight s.EOM = false // movestatetoheight @@ -977,7 +976,7 @@ func (s *State) repost(m interfaces.IMsg, delay int) { time.Sleep(time.Duration(delay) * s.FactomSecond()) // delay in Factom seconds } //s.LogMessage("MsgQueue", fmt.Sprintf("enqueue_%s(%d)", whereAmI, len(s.msgQueue)), m) - s.LogMessage("MsgQueue", fmt.Sprintf("enqueue (%d)", len(s.msgQueue)), m) + s.LogMessage("MsgQueue", fmt.Sprintf("repost enqueue (%d)", len(s.msgQueue)), m) s.msgQueue <- m // Goes in the "do this really fast" queue so we are prompt about EOM's while syncing }() } @@ -1505,6 +1504,8 @@ func (s *State) LeaderExecuteEOM(m interfaces.IMsg) { fix = true } + s.EOMIssueTime = time.Now().UnixNano() // Time we issue the EOM + // make sure EOM has the right data eom.DBHeight = s.LLeaderHeight eom.VMIndex = s.LeaderVMIndex @@ -1527,6 +1528,7 @@ func (s *State) LeaderExecuteEOM(m interfaces.IMsg) { ack.SendOut(s, ack) eom.SendOut(s, eom) s.FollowerExecuteEOM(eom) + s.LogMessage("executeMsg", "issue EOM", eom) s.UpdateState() } @@ -1985,13 +1987,15 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool { s.EOMProcessed-- if s.EOMProcessed <= 0 { // why less than or equal? s.SendHeartBeat() // Only do this once per minute - s.LogPrintf("dbsig-eom", "ProcessEOM complete for %d", e.Minute) // setup to sync next minute ... s.Syncing = false // ProcessEOM (EOM complete) s.EOM = false // ProcessEOM (EOM complete) s.EOMDone = false // ProcessEOM (EOM complete) s.EOMProcessed = 0 // ProcessEOM (EOM complete) + + s.EOMSyncEnd = time.Now().UnixNano() + for _, vm := range pl.VMs { vm.Synced = false // ProcessEOM (EOM complete) } @@ -2128,7 +2132,6 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool { // e.VMIndex, allfaults, s.EOMProcessed, s.EOMLimit, s.EOMDone)) s.EOMDone = true // ProcessEOM - s.EOMSyncTime = time.Now().UnixNano() for _, eb := range pl.NewEBlocks { eb.AddEndOfMinuteMarker(byte(e.Minute + 1)) } diff --git a/state/validation.go b/state/validation.go index 6932e9a35a..4efd0c1e1e 100644 --- a/state/validation.go +++ b/state/validation.go @@ -100,22 +100,11 @@ func (s *State) ValidatorLoop() { currentMinute = 9 // treat minute 10 as an extension of minute 9 } if lastHeight == int(s.LLeaderHeight) && lastMinute == currentMinute && s.LeaderVMIndex == lastVM { - // This eom was already generated. We shouldn't generate it again. - // This does mean we missed an EOM boundary, and the next EOM won't occur for another - // "minute". This could cause some serious sliding, as minutes could be an addition 100% - // in length. - if c == -1 { // This means we received a normal eom cadence timer - c = 8 // Send 8 retries on a 1/10 of the normal minute period - } - if c > 0 { - go func() { - // We sleep for 1/10 of a minute, and try again - time.Sleep(s.GetMinuteDuration() / 10) - s.tickerQueue <- c - 1 - }() - } - s.LogPrintf("timer", "retry %d", c) - s.LogPrintf("validator", "retry %d %d-:-%d %d", c, s.LLeaderHeight, currentMinute, s.LeaderVMIndex) + + // Drop ticker + + s.LogPrintf("timer", "drop %d", c) + s.LogPrintf("validator", "drop %d %d-:-%d %d", c, s.LLeaderHeight, currentMinute, s.LeaderVMIndex) continue // Already generated this eom }