Skip to content

Commit

Permalink
refactor(stopFlag): completly remove stopFlag for context
Browse files Browse the repository at this point in the history
`context` is Go package used mainly to signal cancelation.
`stopFlag` package inside of gemini was used the same way.
It was just adding overhead and maintainability issues in the long
run, as it was only a wrapper for `context.Context` and all at once
calling the `cancel` funcs at the end of the program. This removal
makes code easier to follow for everybody working on it. `context`
package is well known in Go community and can serve many purposes
beyond just canceling the background workers (`jobs` in gemini
terminology).

Hard kill in gemini was used to signal the immadiet stoppage
for gemini, without pritting the results, and soft on the contrary
could be triggers in a few ways:

- gemini validation failed
- gemini mutation failed
- SIG(INT,TERM) was sent

There is no reason to have HARD kill in the application, if we need hard
kill, SIGKILL can be sent and everything would be stopped, there will be
no need to the the cleanup as kernel will ensure that happens.

`context.Context` works as a soft kill, and if something happens bad
in gemini (validation fails or mutation fail) `globalStatus.HasError()`
would stop all other `goroutines` from continuing if `failFast` CLI flag
is passed, so the set of `softKill` is not needed.

Signed-off-by: Dusan Malusev <[email protected]>
  • Loading branch information
CodeLieutenant committed Nov 28, 2024
1 parent f3aa0ab commit 16dd84c
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 731 deletions.
66 changes: 34 additions & 32 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,13 @@ import (
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"text/tabwriter"
"time"

"github.com/scylladb/gemini/pkg/auth"
"github.com/scylladb/gemini/pkg/builders"
"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/jobs"
"github.com/scylladb/gemini/pkg/realrandom"
"github.com/scylladb/gemini/pkg/replication"
"github.com/scylladb/gemini/pkg/store"
"github.com/scylladb/gemini/pkg/typedef"
"github.com/scylladb/gemini/pkg/utils"

"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/stop"

"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"
"github.com/pkg/errors"
Expand All @@ -50,6 +39,17 @@ import (
"golang.org/x/exp/rand"
"golang.org/x/net/context"
"gonum.org/v1/gonum/stat/distuv"

"github.com/scylladb/gemini/pkg/auth"
"github.com/scylladb/gemini/pkg/builders"
"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/jobs"
"github.com/scylladb/gemini/pkg/realrandom"
"github.com/scylladb/gemini/pkg/replication"
"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/store"
"github.com/scylladb/gemini/pkg/typedef"
"github.com/scylladb/gemini/pkg/utils"
)

var (
Expand Down Expand Up @@ -137,8 +137,11 @@ func readSchema(confFile string, schemaConfig typedef.SchemaConfig) (*typedef.Sc
}

func run(_ *cobra.Command, _ []string) error {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGABRT, syscall.SIGTERM, syscall.SIGINT)
defer cancel()

logger := createLogger(level)
globalStatus := status.NewGlobalStatus(1000)
globalStatus := status.NewGlobalStatus(int32(maxErrorsToStore))
defer utils.IgnoreError(logger.Sync)

if err := validateSeed(seed); err != nil {
Expand Down Expand Up @@ -242,15 +245,15 @@ func run(_ *cobra.Command, _ []string) error {
if dropSchema && mode != jobs.ReadMode {
for _, stmt := range generators.GetDropKeyspace(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil {
if err = st.Mutate(ctx, typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to drop schema")
}
}
}

testKeyspace, oracleKeyspace := generators.GetCreateKeyspaces(schema)
if err = st.Create(
context.Background(),
ctx,
typedef.SimpleStmt(testKeyspace, typedef.CreateKeyspaceStatementType),
typedef.SimpleStmt(oracleKeyspace, typedef.CreateKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to create keyspace")
Expand All @@ -263,11 +266,7 @@ func run(_ *cobra.Command, _ []string) error {
}
}

ctx, done := context.WithTimeout(context.Background(), duration+warmup+time.Second*2)
stopFlag := stop.NewFlag("main")
warmupStopFlag := stop.NewFlag("warmup")
stop.StartOsSignalsTransmitter(logger, stopFlag, warmupStopFlag)
pump := jobs.NewPump(stopFlag, logger)
pump := jobs.NewPump(ctx, logger)

distFunc, err := createDistributionFunc(partitionKeyDistribution, partitionCount, intSeed, normalDistMean, normalDistSigma)
if err != nil {
Expand All @@ -281,10 +280,9 @@ func run(_ *cobra.Command, _ []string) error {
sp := createSpinner(interactive())
ticker := time.NewTicker(time.Second)
go func() {
defer done()
for {
select {
case <-stopFlag.SignalChannel():
case <-ctx.Done():
return
case <-ticker.C:
sp.Set(" Running Gemini... %v", globalStatus)
Expand All @@ -293,20 +291,24 @@ func run(_ *cobra.Command, _ []string) error {
}()
}

if warmup > 0 && !stopFlag.IsHardOrSoft() {
jobsList := jobs.ListFromMode(jobs.WarmupMode, warmup, concurrency)
if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil {
if warmup > 0 {
warmupCtx, warmupCancel := context.WithTimeout(ctx, warmup)
defer warmupCancel()

jobsList := jobs.ListFromMode(jobs.WarmupMode, concurrency)
if err = jobsList.Run(warmupCtx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, failFast, verbose); err != nil {
logger.Error("warmup encountered an error", zap.Error(err))
stopFlag.SetHard(true)
}
}

if !stopFlag.IsHardOrSoft() {
jobsList := jobs.ListFromMode(mode, duration, concurrency)
if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil {
logger.Debug("error detected", zap.Error(err))
}
jobsCtx, jobsCancel := context.WithTimeout(ctx, duration)
defer jobsCancel()

jobsList := jobs.ListFromMode(mode, concurrency)
if err = jobsList.Run(jobsCtx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, failFast, verbose); err != nil {
logger.Debug("error detected", zap.Error(err))
}

logger.Info("test finished")
globalStatus.PrintResult(outFile, schema, version)
if globalStatus.HasErrors() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package generators

import (
"context"

"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/exp/rand"
Expand All @@ -26,7 +27,7 @@ import (

// TokenIndex represents the position of a token in the token ring.
// A token index is translated to a token by a generators. If the generators
// preserves the exact position, then the token index becomes the token;
// preserve the exact position, then the token index becomes the token;
// otherwise token index represents an approximation of the token.
//
// We use a token index approach, because our generators actually generate
Expand Down
8 changes: 4 additions & 4 deletions pkg/generators/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
package generators_test

import (
"context"
"sync/atomic"
"testing"

"go.uber.org/zap"

"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/stop"
"github.com/scylladb/gemini/pkg/typedef"
)

Expand All @@ -32,7 +32,7 @@ func TestGenerator(t *testing.T) {
PartitionKeys: generators.CreatePkColumns(1, "pk"),
}
var current uint64
cfg := &generators.Config{
cfg := generators.Config{
PartitionsRangeConfig: typedef.PartitionRangeConfig{
MaxStringLength: 10,
MinStringLength: 0,
Expand All @@ -47,8 +47,8 @@ func TestGenerator(t *testing.T) {
}
logger, _ := zap.NewDevelopment()
generator := generators.NewGenerator(table, cfg, logger)
generator.Start(stop.NewFlag("main_test"))
for i := uint64(0); i < cfg.PartitionsCount; i++ {
generator.Start(context.Background())
for i := range cfg.PartitionsCount {
atomic.StoreUint64(&current, i)
v := generator.Get()
n := generator.Get()
Expand Down
2 changes: 1 addition & 1 deletion pkg/generators/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
)

type Generators struct {
Generators []Generator
wg *sync.WaitGroup
cancel context.CancelFunc
Generators []Generator
}

func New(
Expand Down
10 changes: 5 additions & 5 deletions pkg/generators/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package generators

import (
"go.uber.org/multierr"
"sync/atomic"

"go.uber.org/multierr"

"github.com/scylladb/gemini/pkg/inflight"
"github.com/scylladb/gemini/pkg/typedef"
)
Expand Down Expand Up @@ -113,12 +114,11 @@ func (s *Partition) safelyGetOldValuesChannel() chan *typedef.ValueWithToken {
}

func (s *Partition) Close() error {
for !s.closed.CompareAndSwap(false, true) {
if s.closed.CompareAndSwap(false, true) {
close(s.values)
close(s.oldValues)
}

close(s.values)
close(s.oldValues)

return nil
}

Expand Down
49 changes: 16 additions & 33 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/joberror"
"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/stop"
"github.com/scylladb/gemini/pkg/store"
"github.com/scylladb/gemini/pkg/typedef"
)
Expand All @@ -53,10 +52,9 @@ var (
)

type List struct {
name string
jobs []job
duration time.Duration
workers uint64
name string
jobs []job
workers uint64
}

type job struct {
Expand All @@ -72,16 +70,16 @@ type job struct {
*generators.Generator,
*status.GlobalStatus,
*zap.Logger,
*stop.Flag,
bool,
bool,
) error
name string
}

func ListFromMode(mode string, duration time.Duration, workers uint64) List {
func ListFromMode(mode string, workers uint64) List {
jobs := make([]job, 0, 2)
name := "work cycle"

switch mode {
case WriteMode:
jobs = append(jobs, mutate)
Expand All @@ -93,11 +91,11 @@ func ListFromMode(mode string, duration time.Duration, workers uint64) List {
default:
jobs = append(jobs, mutate, validate)
}

return List{
name: name,
jobs: jobs,
duration: duration,
workers: workers,
name: name,
jobs: jobs,
workers: workers,
}
}

Expand All @@ -111,16 +109,10 @@ func (l List) Run(
globalStatus *status.GlobalStatus,
logger *zap.Logger,
seed uint64,
stopFlag *stop.Flag,
failFast, verbose bool,
) error {
logger = logger.Named(l.name)
ctx = stopFlag.CancelContextOnSignal(ctx, stop.SignalHardStop)
g, gCtx := errgroup.WithContext(ctx)
time.AfterFunc(l.duration, func() {
logger.Info("jobs time is up, begins jobs completion")
stopFlag.SetSoft(true)
})

partitionRangeConfig := schemaConfig.GetPartitionRangeConfig()
logger.Info("start jobs")
Expand All @@ -131,7 +123,7 @@ func (l List) Run(
jobF := l.jobs[idx].function
r := rand.New(rand.NewSource(seed))
g.Go(func() error {
return jobF(gCtx, pump, schema, schemaConfig, table, s, r, &partitionRangeConfig, generator, globalStatus, logger, stopFlag, failFast, verbose)
return jobF(gCtx, pump, schema, schemaConfig, table, s, r, &partitionRangeConfig, generator, globalStatus, logger, failFast, verbose)
})
}
}
Expand All @@ -154,7 +146,6 @@ func mutationJob(
g *generators.Generator,
globalStatus *status.GlobalStatus,
logger *zap.Logger,
stopFlag *stop.Flag,
failFast, verbose bool,
) error {
schemaConfig := &schemaCfg
Expand All @@ -164,11 +155,8 @@ func mutationJob(
logger.Info("ending mutation loop")
}()
for {
if stopFlag.IsHardOrSoft() {
return nil
}
select {
case <-stopFlag.SignalChannel():
case <-ctx.Done():
logger.Debug("mutation job terminated")
return nil
case hb := <-pump:
Expand All @@ -187,7 +175,6 @@ func mutationJob(
}
}
if failFast && globalStatus.HasErrors() {
stopFlag.SetSoft(true)
return nil
}
}
Expand All @@ -207,7 +194,6 @@ func validationJob(
g *generators.Generator,
globalStatus *status.GlobalStatus,
logger *zap.Logger,
stopFlag *stop.Flag,
failFast, _ bool,
) error {
schemaConfig := &schemaCfg
Expand All @@ -218,11 +204,8 @@ func validationJob(
}()

for {
if stopFlag.IsHardOrSoft() {
return nil
}
select {
case <-stopFlag.SignalChannel():
case <-ctx.Done():
return nil
case hb := <-pump:
time.Sleep(hb)
Expand Down Expand Up @@ -262,7 +245,6 @@ func validationJob(
}

if failFast && globalStatus.HasErrors() {
stopFlag.SetSoft(true)
return nil
}
}
Expand All @@ -282,7 +264,6 @@ func warmupJob(
g *generators.Generator,
globalStatus *status.GlobalStatus,
logger *zap.Logger,
stopFlag *stop.Flag,
failFast, _ bool,
) error {
schemaConfig := &schemaCfg
Expand All @@ -292,18 +273,20 @@ func warmupJob(
logger.Info("ending warmup loop")
}()
for {
if stopFlag.IsHardOrSoft() {
select {
case <-ctx.Done():
logger.Debug("warmup job terminated")
return nil
default:
}

// Do we care about errors during warmup?
err := mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger)
if err != nil {
return err
}

if failFast && globalStatus.HasErrors() {
stopFlag.SetSoft(true)
return nil
}
}
Expand Down
Loading

0 comments on commit 16dd84c

Please sign in to comment.