Skip to content

Commit

Permalink
Simple output improvements (also make simple output format default) (#…
Browse files Browse the repository at this point in the history
…519)

* make two reporter implementations - zap and console

* make simple log format default and use new reporters

* separated metrics into several smaller files

* linter fixes

* completely separated metrics data from reporters

* linter fixes
  • Loading branch information
roman-kruglov authored Apr 28, 2022
1 parent 1db31d7 commit 8ac3e4e
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 208 deletions.
26 changes: 21 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"github.com/Arriven/db1000n/src/utils/templates"
)

const simpleLogFormat = "simple"

func main() {
runnerConfigOptions := job.NewConfigOptionsWithFlags()
jobsGlobalConfig := job.NewGlobalConfigWithFlags()
Expand All @@ -56,7 +58,7 @@ func main() {
version := flag.Bool("version", false, "print version and exit")
debug := flag.Bool("debug", utils.GetEnvBoolDefault("DEBUG", false), "enable debug level logging and features")
logLevel := flag.String("log-level", utils.GetEnvStringDefault("LOG_LEVEL", "none"), "log level override for zap, leave empty to use default")
logFormat := flag.String("log-format", utils.GetEnvStringDefault("LOG_FORMAT", ""), "overrides the default (json) log output format,\n"+
logFormat := flag.String("log-format", utils.GetEnvStringDefault("LOG_FORMAT", simpleLogFormat), "overrides the default (simple) log output format,\n"+
"possible values are: json, console, simple\n"+
"simple is the most human readable format if you only look at the output in your terminal")

Expand Down Expand Up @@ -99,7 +101,9 @@ func main() {
metrics.InitOrFail(ctx, logger, *prometheusOn, *prometheusListenAddress, *prometheusPushGateways, jobsGlobalConfig.ClientID, country)

go cancelOnSignal(logger, cancel)
job.NewRunner(runnerConfigOptions, jobsGlobalConfig).Run(ctx, logger)

reporter := newReporter(*logFormat, logger)
job.NewRunner(runnerConfigOptions, jobsGlobalConfig, reporter).Run(ctx, logger)
}

func newZapLogger(debug bool, logLevel string, logFormat string) (*zap.Logger, error) {
Expand All @@ -108,14 +112,18 @@ func newZapLogger(debug bool, logLevel string, logFormat string) (*zap.Logger, e
cfg = zap.NewDevelopmentConfig()
}

if logFormat == "simple" {
if logFormat == simpleLogFormat {
// turn off all output except the message itself
cfg.Encoding = "console"
cfg.EncoderConfig.LevelKey = ""
cfg.EncoderConfig.TimeKey = ""
cfg.EncoderConfig.NameKey = ""
cfg.EncoderConfig.CallerKey = ""
cfg.EncoderConfig.StacktraceKey = ""

// turning these off for debug output would be undesirable
if !debug {
cfg.EncoderConfig.CallerKey = ""
cfg.EncoderConfig.StacktraceKey = ""
}
} else if logFormat != "" {
cfg.Encoding = logFormat
}
Expand Down Expand Up @@ -159,3 +167,11 @@ func cancelOnSignal(logger *zap.Logger, cancel context.CancelFunc) {
logger.Info("terminating")
cancel()
}

func newReporter(logFormat string, logger *zap.Logger) metrics.Reporter {
if logFormat == simpleLogFormat {
return metrics.NewConsoleReporter(os.Stdout)
}

return metrics.NewZapReporter(logger)
}
9 changes: 7 additions & 2 deletions src/job/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func singleRequestJob(ctx context.Context, args config.Args, globalConfig *Globa
return nil, err
}

requestSize, _ := req.WriteTo(metrics.NopWriter{})
requestSize, _ := req.WriteTo(nopWriter{})

if a != nil {
tgt := target(req.URI())
Expand Down Expand Up @@ -173,7 +173,7 @@ func fastHTTPJob(ctx context.Context, args config.Args, globalConfig *GlobalConf
continue
}

requestSize, _ := req.WriteTo(metrics.NopWriter{})
requestSize, _ := req.WriteTo(nopWriter{})

if a != nil {
tgt := target(req.URI())
Expand Down Expand Up @@ -229,3 +229,8 @@ func sendFastHTTPRequest(client http.Client, req *fasthttp.Request, resp *fastht

return nil
}

// nopWriter implements io.Writer interface to simply track how much data has to be serialized
type nopWriter struct{}

func (w nopWriter) Write(p []byte) (int, error) { return len(p), nil }
33 changes: 17 additions & 16 deletions src/job/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ func NewConfigOptionsWithFlags() *ConfigOptions {
type Runner struct {
cfgOptions *ConfigOptions
globalJobsCfg *GlobalConfig
reporter metrics.Reporter
}

// NewRunner according to the config
func NewRunner(cfgOptions *ConfigOptions, globalJobsCfg *GlobalConfig) *Runner {
func NewRunner(cfgOptions *ConfigOptions, globalJobsCfg *GlobalConfig, reporter metrics.Reporter) *Runner {
return &Runner{
cfgOptions: cfgOptions,
globalJobsCfg: globalJobsCfg,
reporter: reporter,
}
}

Expand All @@ -84,10 +86,9 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
defer refreshTimer.Stop()
metrics.IncClient()

var (
cancel context.CancelFunc
reporter *metrics.Reporter
)
var cancel context.CancelFunc

var metric *metrics.Metrics

for {
rawConfig := config.FetchRawMultiConfig(logger, strings.Split(r.cfgOptions.PathsCSV, ","),
Expand All @@ -105,14 +106,14 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
cancel()
}

metric = &metrics.Metrics{} // clear info about previous targets and avoid old jobs from dumping old info to new metrics

if rawConfig.Encrypted {
logger.Info("config is encrypted, disabling logs")

reporter = nil
cancel = r.runJobs(ctx, cfg, reporter, zap.NewNop())
cancel = r.runJobs(ctx, cfg, nil, zap.NewNop())
} else {
reporter = metrics.NewReporter(r.globalJobsCfg.ClientID)
cancel = r.runJobs(ctx, cfg, reporter, logger)
cancel = r.runJobs(ctx, cfg, metric, logger)
}
} else {
logger.Info("the config has not changed. Keep calm and carry on!")
Expand All @@ -129,8 +130,8 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
return
}

if reporter != nil {
reportMetrics(reporter, r.globalJobsCfg.ClientID, logger)
if r.reporter != nil {
reportMetrics(r.reporter, metric, r.globalJobsCfg.ClientID, logger)
}
}
}
Expand All @@ -151,7 +152,7 @@ func nonNilConfigOrDefault(c, defaultConfig *config.RawMultiConfig) *config.RawM
return defaultConfig
}

func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, reporter *metrics.Reporter, logger *zap.Logger) (cancel context.CancelFunc) {
func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *metrics.Metrics, logger *zap.Logger) (cancel context.CancelFunc) {
ctx, cancel = context.WithCancel(ctx)

var jobInstancesCount int
Expand Down Expand Up @@ -189,7 +190,7 @@ func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, reporter
go func(i int) {
defer utils.PanicHandler(logger)

if _, err := job(ctx, cfg.Jobs[i].Args, r.globalJobsCfg, reporter.NewAccumulator(uuid.NewString()), logger); err != nil {
if _, err := job(ctx, cfg.Jobs[i].Args, r.globalJobsCfg, metric.NewAccumulator(uuid.NewString()), logger); err != nil {
logger.Error("error running job",
zap.String("name", cfg.Jobs[i].Name),
zap.String("type", cfg.Jobs[i].Type),
Expand All @@ -206,10 +207,10 @@ func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, reporter
return cancel
}

func reportMetrics(reporter *metrics.Reporter, clientID string, logger *zap.Logger) {
reporter.WriteSummary(logger)
func reportMetrics(reporter metrics.Reporter, metric *metrics.Metrics, clientID string, logger *zap.Logger) {
reporter.WriteSummary(metric)

if err := metrics.ReportStatistics(int64(reporter.Sum(metrics.BytesSentStat)), clientID); err != nil {
if err := metrics.ReportStatistics(int64(metric.Sum(metrics.BytesSentStat)), clientID); err != nil {
logger.Debug("error reporting statistics", zap.Error(err))
}
}
55 changes: 55 additions & 0 deletions src/utils/metrics/accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package metrics

// Accumulator for statistical metrics for use in a single job. Requires Flush()-ing to Reporter.
// Not concurrency-safe.
type Accumulator struct {
jobID string
stats [NumStats]map[string]uint64 // Array of metrics by Stat. Each metric is a map of uint64 values by target.
metrics *Metrics
}

type dimensions struct {
jobID string
target string
}

// Add n to the Accumulator Stat value. Returns self for chaining.
func (a *Accumulator) Add(target string, s Stat, n uint64) *Accumulator {
a.stats[s][target] += n

return a
}

// Inc increases Accumulator Stat value by 1. Returns self for chaining.
func (a *Accumulator) Inc(target string, s Stat) *Accumulator { return a.Add(target, s, 1) }

// Flush Accumulator contents to the Reporter.
func (a *Accumulator) Flush() {
for stat := RequestsAttemptedStat; stat < NumStats; stat++ {
for target, value := range a.stats[stat] {
a.metrics[stat].Store(dimensions{jobID: a.jobID, target: target}, value)
}
}
}

// Clone a new, blank metrics Accumulator with the same Reporter as the original.
func (a *Accumulator) Clone(jobID string) *Accumulator {
if a == nil {
return nil
}

return newAccumulator(jobID, a.metrics)
}

func newAccumulator(jobID string, data *Metrics) *Accumulator {
res := &Accumulator{
jobID: jobID,
metrics: data,
}

for s := RequestsAttemptedStat; s < NumStats; s++ {
res.stats[s] = make(map[string]uint64)
}

return res
}
Loading

0 comments on commit 8ac3e4e

Please sign in to comment.