Skip to content

Commit

Permalink
Merge pull request #85 from hyperledger/no-blocking-improvements
Browse files Browse the repository at this point in the history
improve logic for no wait submission mode
  • Loading branch information
Chengxuan authored Aug 5, 2024
2 parents 46e1c3b + 3bc26d3 commit c42392f
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions internal/perf/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ func (pr *perfRunner) Start() (err error) {
assumedTokenCountPerSecond := 0
for _, testConf := range pr.cfg.Tests {
assumedTokenCountPerSecond += testConf.ActionsPerLoop * ((1 + testConf.Workers) * testConf.Workers / 2)
if pr.cfg.MaxSubmissionsPerSecond > testConf.Workers {
assumedTokenCountPerSecond = assumedTokenCountPerSecond * ((pr.cfg.MaxSubmissionsPerSecond / testConf.Workers) + 1)
}
// if we over pre-mint it doesn't matter.
}

assumedTotalTokenRequired := (assumedTokenCountPerSecond * int(pr.cfg.Length.Seconds()) * 120 /*allow 20% extra*/) / 100
Expand Down Expand Up @@ -478,7 +482,7 @@ func (pr *perfRunner) Start() (err error) {
}
}

if prepEventTrackingID != "" && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations {
if prepEventTrackingID != "" {
log.Infof("Waiting for tracking %s", prepEventTrackingID)
<-pr.wsReceivers[prepEventTrackingID]
log.Infof("Prep action completed")
Expand Down Expand Up @@ -587,23 +591,30 @@ perfLoop:

pr.stopping = true

tallyStart := time.Now()
idleStart := time.Now()

if pr.cfg.NoWaitSubmission {
eventsCount := getMetricVal(receivedEventsCounter)
submissionCount := getMetricVal(totalActionsCounter)
log.Infof("<No wait submission mode> Wait for the event count %f to reach request sent count %f, within 30s", eventsCount, submissionCount)
previousEventCount := eventsCount
log.Infof("<No wait submission mode> Wait for the event count %f to reach request sent count %f", eventsCount, submissionCount)
for {
previousEventCount = eventsCount
eventsCount = getMetricVal(receivedEventsCounter)
if previousEventCount < eventsCount {
// reset idle start time if there are new events
idleStart = time.Now()
}
if eventsCount == submissionCount {
break
} else if eventsCount > submissionCount {
log.Warnf("The number of events received %f is greater than the number of requests sent %f.", eventsCount, submissionCount)
break
}

// Check if more than 1 minute has passed
if time.Since(tallyStart) > 30*time.Second {
log.Errorf("The number of events received %f doesn't tally up to the number of requests sent %f after %s.", eventsCount, submissionCount, time.Since(time.Unix(pr.startTime, 0)))
// Check if more than 30 seconds has passed
if time.Since(idleStart) > 30*time.Second {
log.Errorf("The number of events received %f doesn't tally up to the number of requests sent %f after 30s idle time, total tally time: %s.", eventsCount, submissionCount, time.Since(time.Unix(pr.startTime, 0)))
break
}

Expand Down Expand Up @@ -655,7 +666,9 @@ perfLoop:
log.Infof(" - Measured send TPS: %2f", tps.SendRate)
log.Infof(" - Measured throughput: %2f", tps.Throughput)
log.Infof(" - Measured send duration: %s", pr.sendTime)
log.Infof(" - Measured event receiving duration: %s", pr.receiveTime)
if !pr.cfg.NoWaitSubmission {
log.Infof(" - Measured event receiving duration: %s", pr.receiveTime)
}
log.Infof(" - Measured total duration: %s", pr.totalTime)

return nil
Expand Down Expand Up @@ -814,7 +827,7 @@ func (pr *perfRunner) batchEventLoop(nodeURL string, wsconn wsclient.WSClient) (

pr.recordCompletedAction()
// Release worker so it can continue to its next task
if !pr.stopping && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations {
if (!pr.stopping && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations) || pr.eventPrefixForCurrentStage == preparePrefix {
if workerID >= 0 {
preFixedWorkerID := fmt.Sprintf("%s%d", pr.eventPrefixForCurrentStage, workerID)
// No need for locking as channel have built in support
Expand Down Expand Up @@ -902,7 +915,7 @@ func (pr *perfRunner) eventLoop(nodeURL string, wsconn wsclient.WSClient) (err e
wsconn.Send(context.Background(), ackJSON)
pr.recordCompletedAction()
// Release worker so it can continue to its next task
if !pr.stopping && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations {
if (!pr.stopping && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations) || pr.eventPrefixForCurrentStage == preparePrefix {
if workerID >= 0 {
preFixedWorkerID := fmt.Sprintf("%s%d", pr.eventPrefixForCurrentStage, workerID)
pr.wsReceivers[preFixedWorkerID] <- nodeURL
Expand Down

0 comments on commit c42392f

Please sign in to comment.