Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reporter: Upload symbols only if in given allowlist #3035

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ relabel_configs:
- source_labels: [__meta_process_executable_compiler]
target_label: compiler
action: replace

symbol_upload:
allowlist: []
# - parca
# GPU workloads
# - ollama
# - cuda
# - torch
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ var ErrEmptyConfig = errors.New("empty config")
// Config holds all the configuration information for Parca Agent.
type Config struct {
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
SymbolUpload SymbolUpload `yaml:"symbol_upload,omitempty"`
}

type SymbolUpload struct {
Allowlist []string `yaml:"allowlist,omitempty"`
}

func (c Config) String() string {
Expand Down
29 changes: 20 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func mainWithExitCode() flags.ExitCode {

log.Info("report sent successfully")

if exiterr, ok := err.(*exec.ExitError); ok { //nolint: errorlint
if exiterr, ok := err.(*exec.ExitError); ok { // nolint: errorlint
return flags.ExitCode(exiterr.ExitCode())
}

Expand Down Expand Up @@ -283,7 +283,10 @@ func mainWithExitCode() flags.ExitCode {
return flags.Failure("Failed to parse the included tracers: %s", err)
}

var relabelConfigs []*relabel.Config
var (
relabelConfigs []*relabel.Config
symbolsUploadAllowlist []string
)
if f.ConfigPath == "" {
log.Info("no config file provided, using default config")
} else {
Expand All @@ -297,11 +300,11 @@ func mainWithExitCode() flags.ExitCode {
if cfgFile != nil {
log.Infof("using config file: %s", f.ConfigPath)
relabelConfigs = cfgFile.RelabelConfigs
symbolsUploadAllowlist = cfgFile.SymbolUpload.Allowlist
}
}

traceHandlerCacheSize :=
traceCacheSize(f.Profiling.Duration, f.Profiling.CPUSamplingFrequency, uint16(presentCores))
traceHandlerCacheSize := traceCacheSize(f.Profiling.Duration, f.Profiling.CPUSamplingFrequency, uint16(presentCores))

intervals := times.New(5*time.Second, f.Profiling.Duration, f.Profiling.ProbabilisticInterval)
times.StartRealtimeSync(mainCtx, f.ClockSyncInterval)
Expand Down Expand Up @@ -332,6 +335,7 @@ func mainWithExitCode() flags.ExitCode {
f.Debuginfo.Strip,
f.Debuginfo.UploadMaxParallel,
f.Debuginfo.UploadDisable || isOfflineMode,
symbolsUploadAllowlist,
int64(f.Profiling.CPUSamplingFrequency),
traceHandlerCacheSize,
f.Debuginfo.UploadQueueSize,
Expand Down Expand Up @@ -478,8 +482,11 @@ func getTelemetryMetadata(numCPU int) map[string]string {
// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large
// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is
// therefore used here.
func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int,
presentCPUCores uint16) uint32 {
func traceCacheSize(
monitorInterval time.Duration,
samplesPerSecond int,
presentCPUCores uint16,
) uint32 {
const (
traceCacheIntervals = 6
traceCacheMinSize = 65536
Expand All @@ -494,8 +501,11 @@ func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int,
return util.NextPowerOfTwo(size)
}

func maxElementsPerInterval(monitorInterval time.Duration, samplesPerSecond int,
presentCPUCores uint16) uint32 {
func maxElementsPerInterval(
monitorInterval time.Duration,
samplesPerSecond int,
presentCPUCores uint16,
) uint32 {
return uint32(samplesPerSecond) * uint32(monitorInterval.Seconds()) * uint32(presentCPUCores)
}

Expand All @@ -504,7 +514,8 @@ func getTracePipe() (*os.File, error) {
"/sys/kernel/debug/tracing",
"/sys/kernel/tracing",
"/tracing",
"/trace"} {
"/trace",
} {
t, err := os.Open(mnt + "/trace_pipe")
if err == nil {
return t, nil
Expand Down
82 changes: 54 additions & 28 deletions reporter/parca_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"os"
"path"
"strings"
Expand All @@ -29,8 +30,6 @@ import (
"github.com/apache/arrow/go/v16/arrow/memory"
lru "github.com/elastic/go-freelru"
"github.com/klauspost/compress/zstd"
"github.com/parca-dev/parca-agent/metrics"
"github.com/parca-dev/parca-agent/reporter/metadata"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -42,6 +41,9 @@ import (
"go.opentelemetry.io/ebpf-profiler/libpf/xsync"
otelmetrics "go.opentelemetry.io/ebpf-profiler/metrics"
"go.opentelemetry.io/ebpf-profiler/reporter"

"github.com/parca-dev/parca-agent/metrics"
"github.com/parca-dev/parca-agent/reporter/metadata"
)

// Assert that we implement the full Reporter interface.
Expand Down Expand Up @@ -117,6 +119,9 @@ type ParcaReporter struct {
// disableSymbolUpload disables the symbol upload.
disableSymbolUpload bool

// symbolUploadAllowlist is checked before uploading symbols.
symbolUploadAllowlist []string

// reportInterval is the interval at which to report data.
reportInterval time.Duration

Expand Down Expand Up @@ -167,8 +172,8 @@ func (r *ParcaReporter) SupportsReportTraceEvent() bool { return true }

// ReportTraceEvent enqueues reported trace events for the OTLP reporter.
func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace,
meta *reporter.TraceEventMeta) {

meta *reporter.TraceEventMeta,
) {
// This is an LRU so we need to check every time if the stack is already
// known, as it might have been evicted.
if _, exists := r.stacks.Get(trace.Hash); !exists {
Expand Down Expand Up @@ -266,7 +271,6 @@ func (r *ParcaReporter) ExecutableKnown(fileID libpf.FileID) bool {
// ExecutableMetadata accepts a fileID with the corresponding filename
// and caches this information.
func (r *ParcaReporter) ExecutableMetadata(args *reporter.ExecutableMetadataArgs) {

if args.Interp != libpf.Native {
r.executables.Add(args.FileID, metadata.ExecInfo{
FileName: args.FileName,
Expand All @@ -275,6 +279,20 @@ func (r *ParcaReporter) ExecutableMetadata(args *reporter.ExecutableMetadataArgs
return
}

if len(r.symbolUploadAllowlist) > 0 {
var allowed bool
for _, s := range r.symbolUploadAllowlist {
if strings.Contains(args.FileName, s) {
log.Infof("executable found in allowlist, file: '%s' matches '%s'", args.FileName, s)
allowed = true
break
}
}
if !allowed {
log.Debugf("ignoring executable, not found in allowlist, file: %s", args.FileName)
}
}

// Always attempt to upload, the uploader is responsible for deduplication.
if !r.disableSymbolUpload {
r.uploader.Upload(context.TODO(), args.FileID, args.GnuBuildID, args.Open)
Expand Down Expand Up @@ -363,8 +381,12 @@ func (r *ParcaReporter) ReportHostMetadata(metadataMap map[string]string) {
}

// ReportHostMetadataBlocking enqueues host metadata.
func (r *ParcaReporter) ReportHostMetadataBlocking(_ context.Context,
metadataMap map[string]string, _ int, _ time.Duration) error {
func (r *ParcaReporter) ReportHostMetadataBlocking(
_ context.Context,
_ map[string]string,
_ int,
_ time.Duration,
) error {
// noop
return nil
}
Expand Down Expand Up @@ -460,6 +482,7 @@ func New(
stripTextSection bool,
symbolUploadConcurrency int,
disableSymbolUpload bool,
symbolUploadAllowlist []string,
samplesPerSecond int64,
cacheSize uint32,
uploaderQueueSize uint32,
Expand Down Expand Up @@ -524,21 +547,24 @@ func New(
reg.MustRegister(sampleWriteRequestBytes)
reg.MustRegister(stacktraceWriteRequestBytes)

slog.Info("reporter found allowlist", "list", symbolUploadAllowlist)

r := &ParcaReporter{
stopSignal: make(chan libpf.Void),
client: nil,
executables: executables,
labels: labels,
frames: frames,
sampleWriter: NewSampleWriter(mem),
stacks: stacks,
mem: mem,
externalLabels: externalLabels,
samplesPerSecond: samplesPerSecond,
disableSymbolUpload: disableSymbolUpload,
reportInterval: reportInterval,
nodeName: nodeName,
relabelConfigs: relabelConfigs,
stopSignal: make(chan libpf.Void),
client: nil,
executables: executables,
labels: labels,
frames: frames,
sampleWriter: NewSampleWriter(mem),
stacks: stacks,
mem: mem,
externalLabels: externalLabels,
samplesPerSecond: samplesPerSecond,
disableSymbolUpload: disableSymbolUpload,
symbolUploadAllowlist: symbolUploadAllowlist,
reportInterval: reportInterval,
nodeName: nodeName,
relabelConfigs: relabelConfigs,
metadataProviders: []metadata.MetadataProvider{
metadata.NewProcessMetadataProvider(),
metadata.NewMainExecutableMetadataProvider(executables),
Expand Down Expand Up @@ -575,8 +601,10 @@ func New(
return r, nil
}

const DATA_FILE_EXTENSION string = ".padata"
const DATA_FILE_COMPRESSED_EXTENSION string = ".padata.zst"
const (
DATA_FILE_EXTENSION string = ".padata"
DATA_FILE_COMPRESSED_EXTENSION string = ".padata.zst"
)

// initialScan inspects the storage directory to determine its size, and whether there are any
// uncompressed files lying around.
Expand Down Expand Up @@ -615,7 +643,7 @@ func initialScan(storagePath string) (map[string]uint64, []string, uint64, error
}

func compressFile(file io.Reader, fpath, compressedFpath string) error {
compressedLog, err := os.OpenFile(compressedFpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0660)
compressedLog, err := os.OpenFile(compressedFpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o660)
if err != nil {
return fmt.Errorf("Failed to create compressed file %s for log rotation: %w", compressedFpath, err)
}
Expand All @@ -641,7 +669,7 @@ func compressFile(file io.Reader, fpath, compressedFpath string) error {

func setupOfflineModeLog(fpath string) (*os.File, error) {
// Open the log file
file, err := os.OpenFile(fpath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0660)
file, err := os.OpenFile(fpath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o660)
if err != nil {
return nil, fmt.Errorf("failed to create new offline mode file %s: %w", fpath, err)
}
Expand All @@ -661,7 +689,6 @@ func (r *ParcaReporter) rotateOfflineModeLog() error {
logFile, err := setupOfflineModeLog(fpath)
if err != nil {
return fmt.Errorf("Failed to create new log %s for offline mode: %w", fpath, err)

}
// We are connected to the new log, let's take the old one and compress it
r.offlineModeLogMu.Lock()
Expand Down Expand Up @@ -727,7 +754,7 @@ func (r *ParcaReporter) Start(mainCtx context.Context) error {
}

if r.offlineModeConfig != nil {
if err := os.MkdirAll(r.offlineModeConfig.StoragePath, 0770); err != nil {
if err := os.MkdirAll(r.offlineModeConfig.StoragePath, 0o770); err != nil {
return fmt.Errorf("error creating offline mode storage: %v", err)
}
go func() {
Expand Down Expand Up @@ -993,7 +1020,6 @@ func (r *ParcaReporter) reportDataToBackend(ctx context.Context, buf *bytes.Buff
}

rec, err = r.buildStacktraceRecord(ctx, stacktraceIDs)

if err != nil {
return err
}
Expand Down
Loading