Skip to content

Commit

Permalink
+ fix logic extend logging
Browse files Browse the repository at this point in the history
Signed-off-by: Filipp Akinfiev <[email protected]>
  • Loading branch information
fa-at-pulsit committed Aug 28, 2024
1 parent 560b653 commit c3a820e
Showing 1 changed file with 139 additions and 87 deletions.
226 changes: 139 additions & 87 deletions pkg/producers/radosgwusage/radosgwusage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/rs/zerolog/log"
Expand All @@ -35,7 +36,8 @@ func collectUsageMetrics(cfg RadosGWUsageConfig) ([]UsageEntry, error) {
}

// Create a new RadosGW admin client using the provided configuration.
co, err := admin.New(cfg.AdminURL, cfg.AccessKey, cfg.SecretKey, nil)
httpClient := &http.Client{Timeout: 30 * time.Second}
co, err := admin.New(cfg.AdminURL, cfg.AccessKey, cfg.SecretKey, httpClient)
if err != nil {
return nil, err // Return an error if the client creation fails.
}
Expand All @@ -49,9 +51,11 @@ func collectUsageMetrics(cfg RadosGWUsageConfig) ([]UsageEntry, error) {
}

// Fetch usage statistics from RadosGW.
// usageCtx, usageCancel := context.WithTimeout(context.Background(), timeout)
// defer usageCancel()
usage, err := co.GetUsage(context.Background(), usageRequest)
if err != nil {
return nil, err // Return an error if fetching usage data fails.
return nil, fmt.Errorf("fetching usage data fails: %v", err)
}

// Fetch bucket data from RadosGW concurrently.
Expand Down Expand Up @@ -79,19 +83,27 @@ func collectUsageMetrics(cfg RadosGWUsageConfig) ([]UsageEntry, error) {
}(bucketName)
}

var bucketsProcessed, bucketsFailed int
for i := 0; i < len(bucketNames); i++ {
select {
case data := <-bucketDataCh:
bucketData = append(bucketData, data)
bucketsProcessed++
case err := <-errCh:
log.Error().
Err(err).
Msg("error received during bucket data collection")
bucketsFailed++
}
}
close(bucketDataCh)
close(errCh)

log.Info().
Int("buckets_processed", bucketsProcessed).
Int("buckets_failed", bucketsFailed).
Msg("bucket data collection completed")

// Fetch user data from RadosGW concurrently.
userIDs, err := co.GetUsers(context.Background())
if err != nil {
Expand All @@ -116,19 +128,28 @@ func collectUsageMetrics(cfg RadosGWUsageConfig) ([]UsageEntry, error) {
userDataCh <- userInfo
}(userName)
}

var usersProcessed, usersFailed int
for i := 0; i < len(*userIDs); i++ {
select {
case data := <-userDataCh:
userData = append(userData, data)
usersProcessed++
case err := <-errCh:
log.Error().
Err(err).
Msg("error received during user data collection")
usersFailed++
}
}
close(userDataCh)
close(errCh)

log.Info().
Int("users_processed", usersProcessed).
Int("users_failed", usersFailed).
Msg("user data collection completed")

// Initialize a dictionary to store usage metrics, organized by categories.
usageDict := make(map[string]map[string]map[string]UsageMetrics)
processUsageData(usage, usageDict) // Process the usage data into the usageDict.
Expand All @@ -146,7 +167,7 @@ func collectUsageMetrics(cfg RadosGWUsageConfig) ([]UsageEntry, error) {
// The function iterates through each entry in the usage data, categorizing metrics by user, bucket, and category.
func processUsageData(usage admin.Usage, usageDict map[string]map[string]map[string]UsageMetrics) {
for _, entry := range usage.Entries {
var bucketOwner string
bucketOwner := "unknown_user" // Use a placeholder for entries without a user

if entry.User != "" {
bucketOwner = entry.User
Expand All @@ -159,7 +180,7 @@ func processUsageData(usage admin.Usage, usageDict map[string]map[string]map[str
for _, bucket := range entry.Buckets {
bucketName := bucket.Bucket
if bucketName == "" {
bucketName = "bucket_root"
bucketName = "bucket_root" // Use a placeholder for root or unnamed buckets
}

if _, ok := usageDict[bucketOwner][bucketName]; !ok {
Expand All @@ -174,20 +195,35 @@ func processUsageData(usage admin.Usage, usageDict map[string]map[string]map[str
}

metrics := usageDict[bucketOwner][bucketName][categoryName]
metrics.Ops += category.Ops
metrics.SuccessfulOps += category.SuccessfulOps
metrics.BytesSent += category.BytesSent
metrics.BytesReceived += category.BytesReceived

// Accumulate metrics
if category.Ops > 0 {
metrics.Ops += category.Ops
}
if category.SuccessfulOps > 0 {
metrics.SuccessfulOps += category.SuccessfulOps
}
if category.BytesSent > 0 {
metrics.BytesSent += category.BytesSent
}
if category.BytesReceived > 0 {
metrics.BytesReceived += category.BytesReceived
}

usageDict[bucketOwner][bucketName][categoryName] = metrics
}
}
}
log.Info().
Int("total_entries_processed", len(usage.Entries)).
Msg("usage data processing completed")
}

// processBucketData processes the bucket data and adds relevant details to the entries list.
// It also ensures that categories from the usageDict are correctly included in the output.
func processBucketData(cfg RadosGWUsageConfig, bucketData []admin.Bucket, usageDict map[string]map[string]map[string]UsageMetrics, entries *[]UsageEntry) {
var bucketsProcessed int

for _, bucket := range bucketData {
bucketName := bucket.Bucket
bucketOwner := bucket.Owner
Expand Down Expand Up @@ -262,8 +298,7 @@ func processBucketData(cfg RadosGWUsageConfig, bucketData []admin.Bucket, usageD
totalBytesReceived += metrics.BytesReceived
totalThroughputBytes += metrics.BytesSent + metrics.BytesReceived
totalLatencySeconds += float64(metrics.Ops) * 0.05 //FIXME Simulated latency (e.g., 50ms per operation)
//FIXME latency, currentOps and maxOps we will get from nats subject from s3 ops log
// maybe maxOps should be calculated in grafana
// FIXME: currentOps and maxOps to be retrieved from a NATS subject or similar source
if metrics.Ops > maxOps {
maxOps = metrics.Ops
}
Expand Down Expand Up @@ -298,58 +333,61 @@ func processBucketData(cfg RadosGWUsageConfig, bucketData []admin.Bucket, usageD
sizeKbUtilized = &calculatedSizeKbUtilized
}

// Populate the entries with bucket details
*entries = append(*entries, UsageEntry{
User: bucketOwner,
Buckets: []BucketUsage{
{
Bucket: bucketName,
Owner: bucketOwner,
Zonegroup: bucketZonegroup,
Store: cfg.Store,
Usage: UsageStats{
RgwMain: struct {
Size *uint64 `json:"size"`
SizeActual *uint64 `json:"size_actual"`
SizeUtilized *uint64 `json:"size_utilized"`
SizeKb *uint64 `json:"size_kb"`
SizeKbActual *uint64 `json:"size_kb_actual"`
SizeKbUtilized *uint64 `json:"size_kb_utilized"`
NumObjects *uint64 `json:"num_objects"`
}{
Size: &bucketUsageBytes,
SizeActual: &bucketUsageBytes,
SizeUtilized: &bucketUtilizedBytes,
SizeKb: sizeKb,
SizeKbActual: sizeKbActual,
SizeKbUtilized: sizeKbUtilized,
NumObjects: &bucketUsageObjects,
},
},
BucketQuota: admin.QuotaSpec{
UID: bucketOwner,
Bucket: bucketName,
QuotaType: "bucket",
Enabled: &bucketQuotaEnabled,
CheckOnRaw: false,
MaxSize: &bucketQuotaMaxSize,
MaxSizeKb: &bucketQuotaMaxSizeBytes,
MaxObjects: &bucketQuotaMaxObjects,
},
NumShards: *bucketShards,
Categories: categories,
TotalOps: totalOps,
TotalBytesSent: totalBytesSent,
TotalBytesReceived: totalBytesReceived,
TotalThroughputBytes: totalThroughputBytes,
TotalLatencySeconds: totalLatencySeconds,
TotalRequests: totalRequests,
CurrentOps: currentOps,
MaxOps: maxOps,
// Find or create the UsageEntry for the bucket owner
entry := findOrCreateEntry(entries, bucketOwner)

// Append the bucket information to the user's entry
entry.Buckets = append(entry.Buckets, BucketUsage{
Bucket: bucketName,
Owner: bucketOwner,
Zonegroup: bucketZonegroup,
Store: cfg.Store,
Usage: UsageStats{
RgwMain: struct {
Size *uint64 `json:"size"`
SizeActual *uint64 `json:"size_actual"`
SizeUtilized *uint64 `json:"size_utilized"`
SizeKb *uint64 `json:"size_kb"`
SizeKbActual *uint64 `json:"size_kb_actual"`
SizeKbUtilized *uint64 `json:"size_kb_utilized"`
NumObjects *uint64 `json:"num_objects"`
}{
Size: &bucketUsageBytes,
SizeActual: &bucketUsageBytes,
SizeUtilized: &bucketUtilizedBytes,
SizeKb: sizeKb,
SizeKbActual: sizeKbActual,
SizeKbUtilized: sizeKbUtilized,
NumObjects: &bucketUsageObjects,
},
},
BucketQuota: admin.QuotaSpec{
UID: bucketOwner,
Bucket: bucketName,
QuotaType: "bucket",
Enabled: &bucketQuotaEnabled,
CheckOnRaw: false,
MaxSize: &bucketQuotaMaxSize,
MaxSizeKb: &bucketQuotaMaxSizeBytes,
MaxObjects: &bucketQuotaMaxObjects,
},
NumShards: *bucketShards,
Categories: categories,
TotalOps: totalOps,
TotalBytesSent: totalBytesSent,
TotalBytesReceived: totalBytesReceived,
TotalThroughputBytes: totalThroughputBytes,
TotalLatencySeconds: totalLatencySeconds,
TotalRequests: totalRequests,
CurrentOps: currentOps,
MaxOps: maxOps,
})

bucketsProcessed++
}
log.Info().
Int("buckets_processed", bucketsProcessed).
Msg("bucket data processing completed")
}

// processUserData processes user data and updates the corresponding entries with user-specific information.
Expand Down Expand Up @@ -462,44 +500,58 @@ func StartRadosGWUsageExporter(cfg RadosGWUsageConfig) {
ticker := time.NewTicker(time.Duration(cfg.Interval) * time.Second)
defer ticker.Stop()

isRunning := false

// Run the loop indefinitely, collecting metrics on each tick
for range ticker.C {
// Start timing
startTime := time.Now()

// Collect usage metrics based on the configuration
entries, err := collectUsageMetrics(cfg)
if err != nil {
log.Error().
Err(err).
Msg("error collecting usage metrics")
continue // Skip to the next iteration if an error occurs
// Skip this tick if the previous run hasn't finished
if isRunning {
log.Trace().Msg("previous metrics collection is still running; skipping this tick")
continue
}
isRunning = true
go func() {
defer func() {
isRunning = false // Reset the flag after the function completes
}()

// Calculate duration and set it in the scrapeDurationSeconds metric
duration := time.Since(startTime).Seconds()
// Start timing
startTime := time.Now()

// If Prometheus is enabled, publish the collected metrics to Prometheus
if cfg.Prometheus {
publishToPrometheus(entries, duration, cfg)
}

// If NATS is enabled, publish the collected metrics to the specified NATS subject
if cfg.UseNats {
publishToNATS(nc, cfg.NatsSubject, entries)
} else {
// If NATS is not enabled, output the collected metrics as JSON to stdout
entriesJSON, err := json.MarshalIndent(entries, "", " ")
// Collect usage metrics based on the configuration
entries, err := collectUsageMetrics(cfg)
if err != nil {
log.Error().
Err(err).
Msg("error marshalling entries to JSON")
continue // Skip to the next iteration if an error occurs
Msg("error collecting usage metrics")
return // Skip to the next iteration if an error occurs
}
if !cfg.Prometheus && !cfg.UseNats {
fmt.Println(string(entriesJSON)) // Print the JSON-formatted metrics to stdout

// Calculate duration and set it in the scrapeDurationSeconds metric
duration := time.Since(startTime).Seconds()

// If Prometheus is enabled, publish the collected metrics to Prometheus
if cfg.Prometheus {
publishToPrometheus(entries, duration, cfg)
}
log.Trace().Msg(string(entriesJSON))
}

// If NATS is enabled, publish the collected metrics to the specified NATS subject
if cfg.UseNats {
publishToNATS(nc, cfg.NatsSubject, entries)
} else {
// If NATS is not enabled, output the collected metrics as JSON to stdout
entriesJSON, err := json.MarshalIndent(entries, "", " ")
if err != nil {
log.Error().
Err(err).
Msg("error marshalling entries to JSON")
return // Skip to the next iteration if an error occurs
}
if !cfg.Prometheus && !cfg.UseNats {
fmt.Println(string(entriesJSON)) // Print the JSON-formatted metrics to stdout
}
log.Trace().Msg(string(entriesJSON))
}
}()
}
}

0 comments on commit c3a820e

Please sign in to comment.