Skip to content

Commit

Permalink
Merge pull request #440 from CodeLieutenant/feat/remove-stopflag
Browse files Browse the repository at this point in the history
refactor(stopFlag): completly remove stopFlag for context
  • Loading branch information
CodeLieutenant authored Dec 11, 2024
2 parents 545e784 + 5f374f2 commit 01d50a9
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 885 deletions.
58 changes: 0 additions & 58 deletions cmd/gemini/generators.go

This file was deleted.

74 changes: 39 additions & 35 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,13 @@ import (
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"text/tabwriter"
"time"

"github.com/scylladb/gemini/pkg/auth"
"github.com/scylladb/gemini/pkg/builders"
"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/jobs"
"github.com/scylladb/gemini/pkg/realrandom"
"github.com/scylladb/gemini/pkg/replication"
"github.com/scylladb/gemini/pkg/store"
"github.com/scylladb/gemini/pkg/typedef"
"github.com/scylladb/gemini/pkg/utils"

"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/stop"

"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"
"github.com/pkg/errors"
Expand All @@ -50,6 +39,17 @@ import (
"golang.org/x/exp/rand"
"golang.org/x/net/context"
"gonum.org/v1/gonum/stat/distuv"

"github.com/scylladb/gemini/pkg/auth"
"github.com/scylladb/gemini/pkg/builders"
"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/jobs"
"github.com/scylladb/gemini/pkg/realrandom"
"github.com/scylladb/gemini/pkg/replication"
"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/store"
"github.com/scylladb/gemini/pkg/typedef"
"github.com/scylladb/gemini/pkg/utils"
)

var (
Expand Down Expand Up @@ -137,8 +137,11 @@ func readSchema(confFile string, schemaConfig typedef.SchemaConfig) (*typedef.Sc
}

func run(_ *cobra.Command, _ []string) error {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGABRT, syscall.SIGTERM, syscall.SIGINT)
defer cancel()

logger := createLogger(level)
globalStatus := status.NewGlobalStatus(1000)
globalStatus := status.NewGlobalStatus(int32(maxErrorsToStore))
defer utils.IgnoreError(logger.Sync)

if err := validateSeed(seed); err != nil {
Expand Down Expand Up @@ -242,15 +245,15 @@ func run(_ *cobra.Command, _ []string) error {
if dropSchema && mode != jobs.ReadMode {
for _, stmt := range generators.GetDropKeyspace(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil {
if err = st.Mutate(ctx, typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to drop schema")
}
}
}

testKeyspace, oracleKeyspace := generators.GetCreateKeyspaces(schema)
if err = st.Create(
context.Background(),
ctx,
typedef.SimpleStmt(testKeyspace, typedef.CreateKeyspaceStatementType),
typedef.SimpleStmt(oracleKeyspace, typedef.CreateKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to create keyspace")
Expand All @@ -263,26 +266,23 @@ func run(_ *cobra.Command, _ []string) error {
}
}

ctx, done := context.WithTimeout(context.Background(), duration+warmup+time.Second*2)
stopFlag := stop.NewFlag("main")
warmupStopFlag := stop.NewFlag("warmup")
stop.StartOsSignalsTransmitter(logger, stopFlag, warmupStopFlag)
pump := jobs.NewPump(stopFlag, logger)
pump := jobs.NewPump(ctx, logger)

gens, err := createGenerators(schema, schemaConfig, intSeed, partitionCount, logger)
distFunc, err := createDistributionFunc(partitionKeyDistribution, partitionCount, intSeed, normalDistMean, normalDistSigma)
if err != nil {
return err
return errors.Wrapf(err, "Faile to create distribution function: %s", partitionKeyDistribution)
}
gens.StartAll(stopFlag)

gens := generators.New(ctx, schema, distFunc, schemaConfig.GetPartitionRangeConfig(), intSeed, partitionCount, pkBufferReuseSize, logger)
defer utils.IgnoreError(gens.Close)

if !nonInteractive {
sp := createSpinner(interactive())
ticker := time.NewTicker(time.Second)
go func() {
defer done()
for {
select {
case <-stopFlag.SignalChannel():
case <-ctx.Done():
return
case <-ticker.C:
sp.Set(" Running Gemini... %v", globalStatus)
Expand All @@ -291,20 +291,24 @@ func run(_ *cobra.Command, _ []string) error {
}()
}

if warmup > 0 && !stopFlag.IsHardOrSoft() {
jobsList := jobs.ListFromMode(jobs.WarmupMode, warmup, concurrency)
if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil {
if warmup > 0 {
warmupCtx, warmupCancel := context.WithTimeout(ctx, warmup)
defer warmupCancel()

jobsList := jobs.ListFromMode(jobs.WarmupMode, concurrency)
if err = jobsList.Run(warmupCtx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, failFast, verbose); err != nil {
logger.Error("warmup encountered an error", zap.Error(err))
stopFlag.SetHard(true)
}
}

if !stopFlag.IsHardOrSoft() {
jobsList := jobs.ListFromMode(mode, duration, concurrency)
if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil {
logger.Debug("error detected", zap.Error(err))
}
jobsCtx, jobsCancel := context.WithTimeout(ctx, duration)
defer jobsCancel()

jobsList := jobs.ListFromMode(mode, concurrency)
if err = jobsList.Run(jobsCtx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, failFast, verbose); err != nil {
logger.Debug("error detected", zap.Error(err))
}

logger.Info("test finished")
globalStatus.PrintResult(outFile, schema, version)
if globalStatus.HasErrors() {
Expand Down
110 changes: 65 additions & 45 deletions pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
package generators

import (
"context"

"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/exp/rand"

"github.com/scylladb/gemini/pkg/routingkey"
"github.com/scylladb/gemini/pkg/stop"
"github.com/scylladb/gemini/pkg/typedef"
)

// TokenIndex represents the position of a token in the token ring.
// A token index is translated to a token by a generators. If the generators
// preserves the exact position, then the token index becomes the token;
// preserve the exact position, then the token index becomes the token;
// otherwise token index represents an approximation of the token.
//
// We use a token index approach, because our generators actually generate
Expand All @@ -37,7 +38,7 @@ type TokenIndex uint64

type DistributionFunc func() TokenIndex

type GeneratorInterface interface {
type Interface interface {
Get() *typedef.ValueWithToken
GetOld() *typedef.ValueWithToken
GiveOld(_ *typedef.ValueWithToken)
Expand All @@ -64,14 +65,6 @@ func (g *Generator) PartitionCount() uint64 {
return g.partitionCount
}

type Generators []*Generator

func (g Generators) StartAll(stopFlag *stop.Flag) {
for _, gen := range g {
gen.Start(stopFlag)
}
}

type Config struct {
PartitionsDistributionFunc DistributionFunc
PartitionsRangeConfig typedef.PartitionRangeConfig
Expand All @@ -80,9 +73,9 @@ type Config struct {
PkUsedBufferSize uint64
}

func NewGenerator(table *typedef.Table, config *Config, logger *zap.Logger) *Generator {
func NewGenerator(table *typedef.Table, config Config, logger *zap.Logger) Generator {
wakeUpSignal := make(chan struct{})
return &Generator{
return Generator{
partitions: NewPartitions(int(config.PartitionsCount), int(config.PkUsedBufferSize), wakeUpSignal),
partitionCount: config.PartitionsCount,
table: table,
Expand Down Expand Up @@ -135,47 +128,41 @@ func (g *Generator) ReleaseToken(token uint64) {
g.GetPartitionForToken(TokenIndex(token)).releaseToken(token)
}

func (g *Generator) Start(stopFlag *stop.Flag) {
go func() {
g.logger.Info("starting partition key generation loop")
defer g.partitions.CloseAll()
for {
g.fillAllPartitions(stopFlag)
select {
case <-stopFlag.SignalChannel():
g.logger.Debug("stopping partition key generation loop",
zap.Uint64("keys_created", g.cntCreated),
zap.Uint64("keys_emitted", g.cntEmitted))
return
case <-g.wakeUpSignal:
}
func (g *Generator) Start(ctx context.Context) {
defer g.partitions.Close()
g.logger.Info("starting partition key generation loop")
for {
g.fillAllPartitions(ctx)
select {
case <-ctx.Done():
g.logger.Debug("stopping partition key generation loop",
zap.Uint64("keys_created", g.cntCreated),
zap.Uint64("keys_emitted", g.cntEmitted))
return
case <-g.wakeUpSignal:
}
}()
}
}

func (g *Generator) FindAndMarkStalePartitions() {
r := rand.New(rand.NewSource(10))
nonStale := make([]bool, g.partitionCount)
for n := uint64(0); n < g.partitionCount*100; n++ {
values := CreatePartitionKeyValues(g.table, r, &g.partitionsConfig)
token, err := g.routingKeyCreator.GetHash(g.table, values)

for range g.partitionCount * 100 {
token, _, err := g.createPartitionKeyValues(r)
if err != nil {
g.logger.Panic(errors.Wrap(err, "failed to get primary key hash").Error())
g.logger.Panic("failed to get primary key hash", zap.Error(err))
}
nonStale[g.shardOf(token)] = true
}

for idx, val := range nonStale {
if !val {
g.partitions[idx].MarkStale()
if err = g.partition(token).MarkStale(); err != nil {
g.logger.Panic("failed to mark partition as stale", zap.Error(err))
}
}
}

// fillAllPartitions guarantees that each partition was tested to be full
// at least once since the function started and before it ended.
// In other words no partition will be starved.
func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
func (g *Generator) fillAllPartitions(ctx context.Context) {
pFilled := make([]bool, len(g.partitions))
allFilled := func() bool {
for idx, filled := range pFilled {
Expand All @@ -188,22 +175,30 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
}
return true
}
for !stopFlag.IsHardOrSoft() {
values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig)
token, err := g.routingKeyCreator.GetHash(g.table, values)

for {
select {
case <-ctx.Done():
return
default:
}

token, values, err := g.createPartitionKeyValues()
if err != nil {
g.logger.Panic(errors.Wrap(err, "failed to get primary key hash").Error())
g.logger.Panic("failed to get primary key hash", zap.Error(err))
}
g.cntCreated++
idx := token % g.partitionCount
partition := g.partitions[idx]

partition := g.partition(token)
if partition.Stale() || partition.inFlight.Has(token) {
continue
}

select {
case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}:
g.cntEmitted++
default:
idx := g.shardOf(token)
if !pFilled[idx] {
pFilled[idx] = true
if allFilled() {
Expand All @@ -217,3 +212,28 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
func (g *Generator) shardOf(token uint64) int {
return int(token % g.partitionCount)
}

func (g *Generator) partition(token uint64) *Partition {
return g.partitions[g.shardOf(token)]
}

func (g *Generator) createPartitionKeyValues(r ...*rand.Rand) (uint64, []any, error) {
rnd := g.r

if len(r) > 0 && r[0] != nil {
rnd = r[0]
}

values := make([]any, 0, g.table.PartitionKeysLenValues())

for _, pk := range g.table.PartitionKeys {
values = append(values, pk.Type.GenValue(rnd, &g.partitionsConfig)...)
}

token, err := g.routingKeyCreator.GetHash(g.table, values)
if err != nil {
return 0, nil, errors.Wrap(err, "failed to get primary key hash")
}

return token, values, nil
}
Loading

0 comments on commit 01d50a9

Please sign in to comment.