Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace Analyzer Command Line #54

Merged
merged 17 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions collector/processor/obfuscationprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,17 @@ func TestProcessTraces(t *testing.T) {
bVal.SetEmptyBytes().FromRaw(byteIDVal)

expected := map[string]pair{
"resource-attr": cryptPair(processor, resAttrKey, pcommon.NewValueStr(resAttrVal)),
"scope-attr": cryptPair(processor, scopeAttrKey, pcommon.NewValueStr(scopeAttrVal)),
"span-attr": cryptPair(processor, spanAttrKey, pcommon.NewValueStr(spanAttrVal)),
"span-link-attr": cryptPair(processor, spanLinkAttrKey, pcommon.NewValueStr(linkAttrVal)),
"span-event-attr": cryptPair(processor, spanEventAttrKey, pcommon.NewValueStr(eventAttrVal)),
"complex-span-attr": cryptPair(processor, "complex-span-attr", csVal),
"byte-id": cryptPair(processor, "byte-id", bVal),
"span-event-name": cryptPair(processor, "span-event-name", pcommon.NewValueStr(eventName)),
"resource-attr": cryptPair(processor, resAttrKey, pcommon.NewValueStr(resAttrVal)),
"scope-attr": cryptPair(processor, scopeAttrKey, pcommon.NewValueStr(scopeAttrVal)),
"span-attr": cryptPair(processor, spanAttrKey, pcommon.NewValueStr(spanAttrVal)),
"span-link-attr": cryptPair(processor, spanLinkAttrKey, pcommon.NewValueStr(linkAttrVal)),
"span-event-attr": cryptPair(processor, spanEventAttrKey, pcommon.NewValueStr(eventAttrVal)),
"complex-span-attr": cryptPair(processor, "complex-span-attr", csVal),
"byte-id": cryptPair(processor, "byte-id", bVal),
"span-event-name": cryptPair(processor, "span-event-name", pcommon.NewValueStr(eventName)),
"sensitive-key-name": cryptPair(processor, "sensitive-key-name", pcommon.NewValueStr("dummy name")),
"sensitive-version": cryptPair(processor, "sensitive-version", pcommon.NewValueStr("dummy version")),
"status-message": cryptPair(processor, "status-message", pcommon.NewValueStr("dummy error message")),
"sensitive-version": cryptPair(processor, "sensitive-version", pcommon.NewValueStr("dummy version")),
"status-message": cryptPair(processor, "status-message", pcommon.NewValueStr("dummy error message")),
}

processedTraces, err := processor.processTraces(context.Background(), traces)
Expand Down
163 changes: 96 additions & 67 deletions pkg/arrow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package arrow

// This package contains functions for printing Arrow records to stdout.
// This is mostly used for debugging purposes.
// Package providing functions to print Arrow records to stdout for
// debugging and analysis.

import (
"fmt"
Expand All @@ -28,6 +28,7 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"

"github.com/open-telemetry/otel-arrow/pkg/otel/constants"
"github.com/open-telemetry/otel-arrow/pkg/werror"
)

Expand All @@ -38,53 +39,83 @@ const (
BoolCode int8 = 3
BinaryCode int8 = 4
CborCode int8 = 5

MaxColSize = 20
MaxStrSize = "%20s"
MaxValSize = "%20v"
MaxBinSize = "%20x"
)

// TextColumn memorizes the contents of a column for printing.
// MaxLen is used to determine the column width.
type TextColumn struct {
Name string
MaxLen int
Values []string
}

// PrintRecord prints the contents of an Arrow record to stdout.
func PrintRecord(name string, record arrow.Record, maxRows, countPrints, maxPrints int) {
println()
func PrintRecord(name string, record arrow.Record, maxRows int) {
PrintRecordWithProgression(name, record, maxRows, 0, 0)
}

// PrintRecordWithProgression prints the contents of an Arrow record to stdout.
func PrintRecordWithProgression(name string, record arrow.Record, maxRows, countPrints, maxPrints int) {
progression := ""
if maxPrints > 0 {
progression = fmt.Sprintf(", progression %d/%d", countPrints, maxPrints)
}

sortingColumns := ""
sc, ok := record.Schema().Metadata().GetValue(constants.SortingColumns)
if ok {
sortingColumns = fmt.Sprintf(", sort by %q", sc)
}

if record.NumRows() > int64(maxRows) {
fmt.Printf("Record %q -> #rows: %d/%d, prints: %d/%d\n", name, maxRows, record.NumRows(), countPrints, maxPrints)
fmt.Printf("Dump record %q: %d rows/%d (total)%s%s\n", name, maxRows, record.NumRows(), progression, sortingColumns)
} else {
fmt.Printf("Record %q -> #rows: %d, prints: %d/%d\n", name, record.NumRows(), countPrints, maxPrints)
fmt.Printf("Dump record %q: %d rows%s%s\n", name, record.NumRows(), progression, sortingColumns)
}

schema := record.Schema()
colNames := schemaColNames(schema)

columns := make([]TextColumn, len(colNames))
for i := 0; i < len(colNames); i++ {
print(strings.Repeat("-", MaxColSize), "-+")
columns[i].Name = colNames[i]
columns[i].MaxLen = len(colNames[i])
}
println()

for _, colName := range colNames {
if len(colName) > MaxColSize {
colName = colName[:MaxColSize]
maxColSize := 60
rows := int(math.Min(float64(maxRows), float64(record.NumRows())))
for row := 0; row < rows; row++ {
values := recordColValues(record, row)
for i, value := range values {
if len(value) > maxColSize {
value = value[:maxColSize] + "..."
}
columns[i].Values = append(columns[i].Values, value)
if columns[i].MaxLen < len(value) {
columns[i].MaxLen = len(value)
}
}
fmt.Printf(MaxStrSize, colName)
}

for i := 0; i < len(columns); i++ {
print(strings.Repeat("-", columns[i].MaxLen), "-+")
}
println()

for i := 0; i < len(columns); i++ {
fmt.Printf(fmt.Sprintf("%%%ds", columns[i].MaxLen), columns[i].Name)
print(" |")
}
println()

for i := 0; i < len(colNames); i++ {
print(strings.Repeat("-", MaxColSize), "-+")
for i := 0; i < len(columns); i++ {
print(strings.Repeat("-", columns[i].MaxLen), "-+")
}
println()

rows := int(math.Min(500.0, float64(record.NumRows())))
for row := 0; row < rows; row++ {
values := recordColValues(record, row)
for _, value := range values {
if len(value) > MaxColSize {
value = value[:MaxColSize]
}
fmt.Printf(MaxStrSize, value)
for i := 0; i < len(columns); i++ {
fmt.Printf(fmt.Sprintf("%%%ds", columns[i].MaxLen), columns[i].Values[row])
print(" |")
}
println()
Expand All @@ -94,7 +125,8 @@ func PrintRecord(name string, record arrow.Record, maxRows, countPrints, maxPrin
func schemaColNames(schema *arrow.Schema) []string {
var names []string
for _, field := range schema.Fields() {
names = append(names, fieldColNames("", &field)...)
childNames := fieldColNames("", &field)
names = append(names, childNames...)
}
return names
}
Expand All @@ -107,7 +139,8 @@ func fieldColNames(path string, field *arrow.Field) []string {
var names []string
path = path + "."
for _, structField := range st.Fields() {
names = append(names, fieldColNames(path, &structField)...)
childNames := fieldColNames(path, &structField)
names = append(names, childNames...)
}
return names
}
Expand All @@ -127,58 +160,50 @@ func recordColValues(record arrow.Record, row int) []string {

func arrayColValues(arr arrow.Array, row int) []string {
if arr.IsNull(row) {
return []string{fmt.Sprintf(MaxStrSize, "NULL")}
return []string{"NULL"}
}

switch c := arr.(type) {
case *array.Boolean:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
// uints
case *array.Uint8:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Uint16:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Uint32:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Uint64:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
// ints
case *array.Int8:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Int16:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Int32:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Int64:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
// floats
case *array.Float32:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Float64:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}

case *array.String:
str := c.Value(row)
if len(str) > MaxColSize {
str = str[:MaxColSize]
}
return []string{fmt.Sprintf(MaxValSize, str)}

return []string{fmt.Sprintf("%v", escapeNonPrintable(str))}
case *array.Binary:
bin := c.Value(row)
if len(bin) > MaxColSize {
bin = bin[:MaxColSize]
}
return []string{fmt.Sprintf(MaxValSize, bin)}
return []string{fmt.Sprintf("%v", bin)}
case *array.FixedSizeBinary:
bin := c.Value(row)
if len(bin) > MaxColSize {
bin = bin[:MaxColSize]
}
return []string{fmt.Sprintf(MaxBinSize, bin)}
return []string{fmt.Sprintf("%v", bin)}
case *array.Timestamp:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Duration:
return []string{fmt.Sprintf(MaxValSize, c.Value(row))}
return []string{fmt.Sprintf("%v", c.Value(row))}
case *array.Dictionary:
switch arr := c.Dictionary().(type) {
case *array.Int32:
Expand All @@ -189,24 +214,15 @@ func arrayColValues(arr arrow.Array, row int) []string {
return []string{fmt.Sprintf("%d", arr.Value(c.GetValueIndex(row)))}
case *array.String:
str := arr.Value(c.GetValueIndex(row))
if len(str) > MaxColSize {
str = str[:MaxColSize]
}
return []string{fmt.Sprintf(MaxValSize, str)}
return []string{fmt.Sprintf("%v", escapeNonPrintable(str))}
case *array.Binary:
bin := arr.Value(c.GetValueIndex(row))
if len(bin) > MaxColSize {
bin = bin[:MaxColSize]
}
return []string{fmt.Sprintf(MaxValSize, bin)}
return []string{fmt.Sprintf("%v", bin)}
case *array.Duration:
return []string{fmt.Sprintf(MaxValSize, arr.Value(c.GetValueIndex(row)))}
return []string{fmt.Sprintf("%v", arr.Value(c.GetValueIndex(row)))}
case *array.FixedSizeBinary:
bin := arr.Value(c.GetValueIndex(row))
if len(bin) > MaxColSize {
bin = bin[:MaxColSize]
}
return []string{fmt.Sprintf(MaxBinSize, bin)}
return []string{fmt.Sprintf("%v", bin)}
default:
panic(fmt.Sprintf("unsupported dictionary type %T", arr))
}
Expand All @@ -226,6 +242,19 @@ func arrayColValues(arr arrow.Array, row int) []string {
return []string{}
}

// escapeNonPrintable replaces non-printable characters with escape sequences.
func escapeNonPrintable(str string) string {
var sb strings.Builder
for _, r := range str {
if r < 32 || r > 126 {
sb.WriteString(fmt.Sprintf("\\x%02x", r))
} else {
sb.WriteRune(r)
}
}
return sb.String()
}

func sparseUnionValue(union *array.SparseUnion, row int) string {
tcode := union.TypeCode(row)
fieldID := union.ChildID(row)
Expand Down
4 changes: 2 additions & 2 deletions pkg/arrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func DataTypeToID(dt arrow.DataType) string {
return id
}

func ShowSchema(schema *arrow.Schema, prefix string) {
println(prefix + "Schema {")
func ShowSchema(schema *arrow.Schema, schemaName string, prefix string) {
println(prefix + "Schema " + schemaName + " {")
for _, f := range schema.Fields() {
ShowField(&f, prefix+" ")
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/benchmark/dataset/real_trace_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,25 @@ func tracesFromJSON(path string, compression string) (ptrace.Traces, int) {

tr := &traceReader{
unmarshaler: &ptrace.JSONUnmarshaler{},
bytesRead: 0,
bytesRead: 0,
}

if compression == benchmark.CompressionTypeZstd {
cr, err := zstd.NewReader(file)
if err != nil {
log.Fatal("Failed to create compressed reader: ", err)
log.Fatal("Failed to create compressed reader for `", path, "`: ", err)
}
tr.stringReader = bufio.NewReader(cr)
} else { // no compression
tr.stringReader = bufio.NewReader(file)
}

traces, err := tr.readAllTraces()
traces, err := tr.readAllTraces()
if err != nil {
if tr.bytesRead == 0 {
log.Fatal("Read zero bytes from file: ", err)
log.Fatal("Read zero bytes from file `", path, "`: ", err)
}
log.Print("Found error when reading file: ", err)
log.Print("Found error when reading file `", path, "`: ", err)
log.Print("Bytes read: ", tr.bytesRead)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/benchmark/profileable/arrow/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewLogsProfileable(tags []string, dataset dataset.LogsDataset, config *benc
logsProducerOptions = append(logsProducerOptions, cfg.WithNoZstd())
}
if config.Stats {
logsProducerOptions = append(logsProducerOptions, cfg.WithStats())
logsProducerOptions = append(logsProducerOptions, cfg.WithSchemaStats())
}

producer := arrow_record.NewProducerWithOptions(logsProducerOptions...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/benchmark/profileable/arrow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewMetricsProfileable(tags []string, dataset dataset.MetricsDataset, config
options = append(options, cfg.WithNoZstd())
}
if config.Stats {
options = append(options, cfg.WithStats())
options = append(options, cfg.WithSchemaStats())
}

producer := arrow_record.NewProducerWithOptions(options...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/benchmark/profileable/arrow/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewTraceProfileable(tags []string, dataset dataset.TraceDataset, config *be
tracesProducerOptions = append(tracesProducerOptions, cfg.WithNoZstd())
}
if config.Stats {
tracesProducerOptions = append(tracesProducerOptions, cfg.WithStats())
tracesProducerOptions = append(tracesProducerOptions, cfg.WithSchemaStats())
}

return &TracesProfileable{
Expand Down
4 changes: 2 additions & 2 deletions pkg/benchmark/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func (p *Profiler) ExportMetricsTimesCSV(filePrefix string) {
panic(fmt.Sprintf("failed closing the file: %s", err))
}

_, _ = fmt.Fprintf(p.writer, "Time meseasurements exported to %s\n", filename)
_, _ = fmt.Fprintf(p.writer, "Time measurements exported to %s\n", filename)
}

func (p *Profiler) ExportMetricsBytesCSV(filePrefix string) {
Expand Down Expand Up @@ -728,7 +728,7 @@ func (p *Profiler) ExportMetricsBytesCSV(filePrefix string) {
panic(fmt.Sprintf("failed closing the file: %s", err))
}

_, _ = fmt.Fprintf(p.writer, "Meseasurements of the message sizes exported to %s\n", filename)
_, _ = fmt.Fprintf(p.writer, "Measurements of the message sizes exported to %s\n", filename)
}

func min(a, b int) int {
Expand Down
Loading