Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement cql stmt generation #435

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ cmd/gemini/dist/
bin/
coverage.txt
dist/
results/*.log
13 changes: 13 additions & 0 deletions .run/Run Gemini Mixed.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run Gemini Mixed" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="gemini" />
<working_directory value="$PROJECT_DIR$" />
<go_parameters value="-gcflags &quot;all=-N -l&quot;" />
<parameters value="--dataset-size=small --cql-features all --duration 2m --drop-schema true --fail-fast --level info --non-interactive --materialized-views false --outfile ./results/gemini.log --test-statement-log-file ./results/gemini_test_statements.log --oracle-statement-log-file ./results/gemini_oracle_statements.log --test-host-selection-policy token-aware --oracle-host-selection-policy token-aware --test-cluster=192.168.100.2 --oracle-cluster=192.168.100.3 --outfile ./results/gemini_result.log --mode mixed --non-interactive --request-timeout 180s --connect-timeout 120s --use-server-timestamps false --async-objects-stabilization-attempts 10 --async-objects-stabilization-backoff 100ms --replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --oracle-replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --max-mutation-retries 5 --max-mutation-retries-backoff 1000ms --concurrency 1 --tracing-outfile ./results/gemini_tracing.log" />
<kind value="PACKAGE" />
<package value="github.com/scylladb/gemini/cmd/gemini" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/cmd/gemini/main.go" />
<method v="2" />
</configuration>
</component>
13 changes: 13 additions & 0 deletions .run/Run Gemini Read.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run Gemini Read" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="gemini" />
<working_directory value="$PROJECT_DIR$" />
<go_parameters value="-gcflags &quot;all=-N -l&quot;" />
<parameters value="--dataset-size=small --cql-features all --warmup 0 --duration 2m --drop-schema true --fail-fast --level info --non-interactive --materialized-views false --outfile ./results/gemini.log --test-statement-log-file ./results/gemini_test_statements.log --oracle-statement-log-file ./results/gemini_oracle_statements.log --test-host-selection-policy token-aware --oracle-host-selection-policy token-aware --test-cluster=192.168.100.2 --oracle-cluster=192.168.100.3 --outfile ./results/gemini_result.log --mode read --non-interactive --request-timeout 180s --connect-timeout 120s --use-server-timestamps false --async-objects-stabilization-attempts 10 --async-objects-stabilization-backoff 100ms --replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --oracle-replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --max-mutation-retries 5 --max-mutation-retries-backoff 1000ms --concurrency 2 --tracing-outfile ./results/gemini_tracing.log" />
<kind value="PACKAGE" />
<package value="github.com/scylladb/gemini/cmd/gemini" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/cmd/gemini/main.go" />
<method v="2" />
</configuration>
</component>
47 changes: 22 additions & 25 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"text/tabwriter"
"time"

Expand All @@ -38,7 +40,6 @@ import (
"github.com/scylladb/gemini/pkg/utils"

"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/stop"

"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"
Expand Down Expand Up @@ -168,6 +169,9 @@ func run(_ *cobra.Command, _ []string) error {
return err
}

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM)
defer cancel()

go func() {
http.Handle("/metrics", promhttp.Handler())
_ = http.ListenAndServe(bind, nil)
Expand Down Expand Up @@ -258,31 +262,29 @@ func run(_ *cobra.Command, _ []string) error {

for _, stmt := range generators.GetCreateSchema(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.CreateSchemaStatementType)); err != nil {
if err = st.Mutate(ctx, typedef.SimpleStmt(stmt, typedef.CreateSchemaStatementType)); err != nil {
return errors.Wrap(err, "unable to create schema")
}
}

ctx, done := context.WithTimeout(context.Background(), duration+warmup+time.Second*2)
stopFlag := stop.NewFlag("main")
warmupStopFlag := stop.NewFlag("warmup")
stop.StartOsSignalsTransmitter(logger, stopFlag, warmupStopFlag)
pump := jobs.NewPump(stopFlag, logger)
ctx, done := context.WithTimeout(ctx, duration+warmup+10*time.Second)

gens, err := createGenerators(schema, schemaConfig, intSeed, partitionCount, logger)
if err != nil {
return err
}
gens.StartAll(stopFlag)
gens.StartAll(ctx)

if !nonInteractive {
sp := createSpinner(interactive())
ticker := time.NewTicker(time.Second)

go func() {
defer done()
defer ticker.Stop()
for {
select {
case <-stopFlag.SignalChannel():
case <-ctx.Done():
return
case <-ticker.C:
sp.Set(" Running Gemini... %v", globalStatus)
Expand All @@ -291,36 +293,31 @@ func run(_ *cobra.Command, _ []string) error {
}()
}

if warmup > 0 && !stopFlag.IsHardOrSoft() {
jobsList := jobs.ListFromMode(jobs.WarmupMode, warmup, concurrency)
if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil {
logger.Error("warmup encountered an error", zap.Error(err))
stopFlag.SetHard(true)
}
jobsList := jobs.New(mode, duration, concurrency, logger, schema, st, globalStatus, intSeed, gens, failFast, warmup)
if err = jobsList.Run(ctx); err != nil {
logger.Debug("error detected", zap.Error(err))
}

if !stopFlag.IsHardOrSoft() {
jobsList := jobs.ListFromMode(mode, duration, concurrency)
if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil {
logger.Debug("error detected", zap.Error(err))
}
}
logger.Info("test finished")
globalStatus.PrintResult(outFile, schema, version)

if globalStatus.HasErrors() {
return errors.Errorf("gemini encountered errors, exiting with non zero status")
}

return nil
}

func createFile(fname string, def *os.File) (*os.File, error) {
if fname != "" {
f, err := os.Create(fname)
func createFile(name string, def *os.File) (*os.File, error) {
if name != "" {
f, err := os.Create(name)
if err != nil {
return nil, errors.Wrapf(err, "Unable to open output file %s", fname)
return nil, errors.Wrapf(err, "Unable to open output file %s", name)
}

return f, nil
}

return def, nil
}

Expand Down
50 changes: 23 additions & 27 deletions pkg/jobs/pump.go → pkg/burst/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package jobs
package burst

import (
"context"
"math/rand/v2"
"time"
)

"github.com/scylladb/gemini/pkg/stop"
const ChannelSize = 10000

"go.uber.org/zap"
"golang.org/x/exp/rand"
)
func work(ctx context.Context, pump chan<- time.Duration, chance int, sleepDuration time.Duration) {
defer close(pump)
for {
select {
case <-ctx.Done():
return
default:
sleep := time.Duration(0)

func NewPump(stopFlag *stop.Flag, logger *zap.Logger) chan time.Duration {
pump := make(chan time.Duration, 10000)
logger = logger.Named("Pump")
go func() {
logger.Debug("pump channel opened")
defer func() {
close(pump)
logger.Debug("pump channel closed")
}()
for !stopFlag.IsHardOrSoft() {
pump <- newHeartBeat()
}
}()
if rand.Int()%chance == 0 {
sleep = sleepDuration
}

return pump
pump <- sleep
Comment on lines +32 to +38

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it should sleep? Can you explain me exactly how it acts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Burst mechanism, makes a user of this, sleep after change is hit. practically if you make change = 10.

It will sleep approximatly one in 10. This is just to not overload the server

}
}
}

func newHeartBeat() time.Duration {
r := rand.Intn(10)
switch r {
case 0:
return 10 * time.Millisecond
default:
return 0
}
func New(ctx context.Context, chance int, sleepDuration time.Duration) chan time.Duration {
pump := make(chan time.Duration, ChannelSize)
go work(ctx, pump, chance, sleepDuration)
return pump
}
41 changes: 21 additions & 20 deletions pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
package generators

import (
"context"

"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/exp/rand"

"github.com/scylladb/gemini/pkg/routingkey"
"github.com/scylladb/gemini/pkg/stop"
"github.com/scylladb/gemini/pkg/typedef"
)

Expand All @@ -37,12 +38,11 @@ type TokenIndex uint64

type DistributionFunc func() TokenIndex

type GeneratorInterface interface {
type Interface interface {
CodeLieutenant marked this conversation as resolved.
Show resolved Hide resolved
Get() *typedef.ValueWithToken
GetOld() *typedef.ValueWithToken
GiveOld(_ *typedef.ValueWithToken)
GiveOlds(_ []*typedef.ValueWithToken)
ReleaseToken(_ uint64)
GiveOld(...*typedef.ValueWithToken)
ReleaseToken(uint64)
}

type Generator struct {
Expand All @@ -66,9 +66,9 @@ func (g *Generator) PartitionCount() uint64 {

type Generators []*Generator

func (g Generators) StartAll(stopFlag *stop.Flag) {
func (g Generators) StartAll(ctx context.Context) {
for _, gen := range g {
gen.Start(stopFlag)
gen.Start(ctx)
}
}

Expand Down Expand Up @@ -118,15 +118,10 @@ func (g *Generator) GetOld() *typedef.ValueWithToken {
return targetPart.getOld()
}

// GiveOld returns the supplied value for later reuse unless
func (g *Generator) GiveOld(v *typedef.ValueWithToken) {
g.GetPartitionForToken(TokenIndex(v.Token)).giveOld(v)
}

// GiveOlds returns the supplied values for later reuse unless
func (g *Generator) GiveOlds(tokens []*typedef.ValueWithToken) {
// GiveOld returns the supplied value for later reuse
func (g *Generator) GiveOld(tokens ...*typedef.ValueWithToken) {
for _, token := range tokens {
g.GiveOld(token)
g.GetPartitionForToken(TokenIndex(token.Token)).giveOld(token)
}
}

Expand All @@ -135,14 +130,14 @@ func (g *Generator) ReleaseToken(token uint64) {
g.GetPartitionForToken(TokenIndex(token)).releaseToken(token)
}

func (g *Generator) Start(stopFlag *stop.Flag) {
func (g *Generator) Start(ctx context.Context) {
go func() {
g.logger.Info("starting partition key generation loop")
defer g.partitions.CloseAll()
for {
g.fillAllPartitions(stopFlag)
g.fillAllPartitions(ctx)
select {
case <-stopFlag.SignalChannel():
case <-ctx.Done():
g.logger.Debug("stopping partition key generation loop",
zap.Uint64("keys_created", g.cntCreated),
zap.Uint64("keys_emitted", g.cntEmitted))
Expand Down Expand Up @@ -175,7 +170,7 @@ func (g *Generator) FindAndMarkStalePartitions() {
// fillAllPartitions guarantees that each partition was tested to be full
// at least once since the function started and before it ended.
// In other words no partition will be starved.
func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
func (g *Generator) fillAllPartitions(ctx context.Context) {
pFilled := make([]bool, len(g.partitions))
allFilled := func() bool {
for idx, filled := range pFilled {
Expand All @@ -188,7 +183,13 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
}
return true
}
for !stopFlag.IsHardOrSoft() {
for {
select {
case <-ctx.Done():
return
default:
}

values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig)
token, err := g.routingKeyCreator.GetHash(g.table, values)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/generators/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
package generators_test

import (
"context"
"sync/atomic"
"testing"

"go.uber.org/zap"

"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/stop"
"github.com/scylladb/gemini/pkg/typedef"
)

func TestGenerator(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

table := &typedef.Table{
Name: "tbl",
PartitionKeys: generators.CreatePkColumns(1, "pk"),
Expand All @@ -47,7 +51,7 @@ func TestGenerator(t *testing.T) {
}
logger, _ := zap.NewDevelopment()
generator := generators.NewGenerator(table, cfg, logger)
generator.Start(stop.NewFlag("main_test"))
generator.Start(ctx)
for i := uint64(0); i < cfg.PartitionsCount; i++ {
atomic.StoreUint64(&current, i)
v := generator.Get()
Expand Down
2 changes: 1 addition & 1 deletion pkg/generators/statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func genTable(sc typedef.SchemaConfig, tableName string, r *rand.Rand) *typedef.
table.Indexes = indexes

var mvs []typedef.MaterializedView
if sc.CQLFeature > typedef.CQL_FEATURE_BASIC && sc.UseMaterializedViews && len(clusteringKeys) > 0 && columns.ValidColumnsForPrimaryKey().Len() != 0 {
if sc.UseMaterializedViews && sc.CQLFeature > typedef.CQL_FEATURE_BASIC && len(clusteringKeys) > 0 && columns.ValidColumnsForPrimaryKey().Len() != 0 {
mvs = CreateMaterializedViews(columns, table.Name, partitionKeys, clusteringKeys, r)
}

Expand Down
Loading