Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return costs #155

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ func (s *ScrapeResults) Errorf(e error, msg string, args ...interface{}) ScrapeR
return *s
}

type CostData struct {
ExternalID string
ExternalType string
LineItems []LineItem
}

type LineItem struct {
ExternalID string
CostPerMin float64
Cost1d float64
Cost7d float64
Cost30d float64
}

// ScrapeResult ...
// +kubebuilder:object:generate=false
type ScrapeResult struct {
Expand All @@ -129,6 +143,7 @@ type ScrapeResult struct {
Format string `json:"format,omitempty"`
Tags JSONStringMap `json:"tags,omitempty"`
BaseScraper BaseScraper `json:"-"`
Costs *CostData `json:"-"`
Error error `json:"-"`
AnalysisResult *AnalysisResult `json:"analysis,omitempty"`
Changes []ChangeResult `json:"-"`
Expand Down
2 changes: 1 addition & 1 deletion fixtures/file.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
file:
- type: $.Config.InstanceType
id: $.Config.InstanceId
path:
paths:
- config*.json
- test*.json
78 changes: 22 additions & 56 deletions scrapers/aws/cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (

"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
v1 "github.com/flanksource/config-db/api/v1"
athena "github.com/uber/athenadriver/go"
)

Expand Down Expand Up @@ -72,40 +71,31 @@ func getAWSAthenaConfig(ctx *v1.ScrapeContext, awsConfig v1.AWS) (*athena.Config
return conf, nil
}

type LineItemRow struct {
ProductCode string
ResourceID string
Cost1h float64
Cost1d float64
Cost7d float64
Cost30d float64
}

func FetchCosts(ctx *v1.ScrapeContext, config v1.AWS) ([]LineItemRow, error) {
var lineItemRows []LineItemRow
func fetchCosts(ctx *v1.ScrapeContext, config v1.AWS) ([]v1.LineItem, error) {
var lineItemRows []v1.LineItem

athenaConf, err := getAWSAthenaConfig(ctx, config)
if err != nil {
return lineItemRows, err
return nil, fmt.Errorf("failed to get athen conf: %w", err)
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
}

athenaDB, err := sql.Open(athena.DriverName, athenaConf.Stringify())
if err != nil {
return lineItemRows, err
return nil, fmt.Errorf("failed to open sql connection to %s: %w", athena.DriverName, err)
}

table := fmt.Sprintf("%s.%s", config.CostReporting.Database, config.CostReporting.Table)
query := strings.ReplaceAll(costQueryTemplate, "$table", table)

rows, err := athenaDB.Query(query)
if err != nil {
return lineItemRows, err
return nil, fmt.Errorf("failed to query athena: %w", err)
}

for rows.Next() {
var productCode, resourceID, cost1h, cost1d, cost7d, cost30d string
if err := rows.Scan(&productCode, &resourceID, &cost1h, &cost1d, &cost7d, &cost30d); err != nil {
logger.Errorf("Error scanning athena database rows: %v", err)
logger.Errorf("error scanning athena database rows: %v", err)
continue
}

Expand All @@ -114,13 +104,12 @@ func FetchCosts(ctx *v1.ScrapeContext, config v1.AWS) ([]LineItemRow, error) {
cost7dFloat, _ := strconv.ParseFloat(cost7d, 64)
cost30dFloat, _ := strconv.ParseFloat(cost30d, 64)

lineItemRows = append(lineItemRows, LineItemRow{
ProductCode: productCode,
ResourceID: resourceID,
Cost1h: cost1hFloat,
Cost1d: cost1dFloat,
Cost7d: cost7dFloat,
Cost30d: cost30dFloat,
lineItemRows = append(lineItemRows, v1.LineItem{
ExternalID: fmt.Sprintf("%s/%s", productCode, resourceID),
CostPerMin: cost1hFloat / 60,
Cost1d: cost1dFloat,
Cost7d: cost7dFloat,
Cost30d: cost30dFloat,
})
}

Expand All @@ -137,49 +126,26 @@ func (awsCost CostScraper) Scrape(ctx *v1.ScrapeContext, config v1.ConfigScraper
if err != nil {
return results.Errorf(err, "failed to create AWS session")
}

stsClient := sts.NewFromConfig(*session)
caller, err := stsClient.GetCallerIdentity(ctx, nil)
if err != nil {
return results.Errorf(err, "failed to get identity")
}
accountID := *caller.Account

rows, err := FetchCosts(ctx, awsConfig)
rows, err := fetchCosts(ctx, awsConfig)
if err != nil {
return results.Errorf(err, "failed to fetch costs")
}

gormDB := db.DefaultDB()
var accountTotal1h, accountTotal1d, accountTotal7d, accountTotal30d float64
for _, row := range rows {
tx := gormDB.Exec(`
UPDATE config_items SET cost_per_minute = ?, cost_total_1d = ?, cost_total_7d = ?, cost_total_30d = ?
WHERE ? = ANY(external_id)`, row.Cost1h/60, row.Cost1d, row.Cost7d, row.Cost30d, fmt.Sprintf("%s/%s", row.ProductCode, row.ResourceID))

if tx.Error != nil {
logger.Errorf("Error updating costs for config_item: %v", err)
continue
}

if tx.RowsAffected == 0 {
accountTotal1h += row.Cost1h
accountTotal1d += row.Cost1d
accountTotal7d += row.Cost7d
accountTotal30d += row.Cost30d
continue
}
logger.Infof("Updated cost for AWS Resource: %s/%s", row.ProductCode, row.ResourceID)
}

err = gormDB.Exec(`
UPDATE config_items SET cost_per_minute = ?, cost_total_1d = ?, cost_total_7d = ?, cost_total_30d = ?
WHERE external_type = 'AWS::::Account' AND ? = ANY(external_id)`,
accountTotal1h/60, accountTotal1d, accountTotal7d, accountTotal30d, accountID,
).Error
if err != nil {
logger.Errorf("Error updating costs for account: %v", err)
}
logger.Infof("Updated cost for AWS Account: %s", accountID)
results = append(results, v1.ScrapeResult{
Costs: &v1.CostData{
LineItems: rows,
ExternalType: "AWS::::Account",
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
ExternalID: accountID,
},
})
}

return results
Expand Down
4 changes: 2 additions & 2 deletions scrapers/processors/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (e Extract) WithoutItems() Extract {
}
}

func (e Extract) WithouTransform() Extract {
func (e Extract) WithoutTransform() Extract {
return Extract{
ID: e.ID,
Type: e.Type,
Expand Down Expand Up @@ -178,7 +178,7 @@ func (e Extract) Extract(inputs ...v1.ScrapeResult) ([]v1.ScrapeResult, error) {

if e.Items != nil {
items := e.Items.Get(o)
logger.Debugf("Exctracted %d items with %s", len(items), *e.Items)
logger.Debugf("Extracted %d items with %s", len(items), *e.Items)
for _, item := range items {
extracted, err := e.WithoutItems().Extract(input.Clone(item))
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions scrapers/runscrapers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func Run(ctx *v1.ScrapeContext, configs ...v1.ConfigScraper) ([]v1.ScrapeResult,
if err := db.PersistJobHistory(&jobHistory); err != nil {
logger.Errorf("Error persisting job history: %v", err)
}

for _, result := range scraper.Scrape(ctx, config) {
if result.AnalysisResult != nil {
if rule, ok := analysis.Rules[result.AnalysisResult.Analyzer]; ok {
Expand All @@ -36,6 +37,49 @@ func Run(ctx *v1.ScrapeContext, configs ...v1.ConfigScraper) ([]v1.ScrapeResult,
}
}

if result.Costs != nil {
gormDB := db.DefaultDB()
var accountTotal1h, accountTotal1d, accountTotal7d, accountTotal30d float64
for _, item := range result.Costs.LineItems {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is missing external_type, Did we also not extract this into a common clause so that we con't have to keep repeating it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moshloop external type wasn't being used in aws/cost.go which is where this code comes from. Should I add it?

tx := gormDB.Exec(`UPDATE config_items SET cost_per_minute = ?, cost_total_1d = ?, cost_total_7d = ?, cost_total_30d = ? WHERE ? = ANY(external_id)`,
item.CostPerMin,
item.Cost1d,
item.Cost7d,
item.Cost30d,
item.ExternalID,
)

if tx.Error != nil {
logger.Errorf("error updating costs for config_item (externalID=%s): %v", item.ExternalID, tx.Error)
continue
}

if tx.RowsAffected == 0 {
accountTotal1h += item.CostPerMin
accountTotal1d += item.Cost1d
accountTotal7d += item.Cost7d
accountTotal30d += item.Cost30d
continue
}

logger.Infof("updated cost (externalID=%s)", item.ExternalID)
}

err := gormDB.Exec(`UPDATE config_items SET cost_per_minute = ?, cost_total_1d = ?, cost_total_7d = ?, cost_total_30d = ? WHERE external_type = ? AND ? = ANY(external_id)`,
accountTotal1h,
accountTotal1d,
accountTotal7d,
accountTotal30d,
result.Costs.ExternalType,
result.Costs.ExternalID,
).Error
if err != nil {
logger.Errorf("error updating costs (type=%s) (externalID=%s): %v", result.Costs.ExternalType, result.Costs.ExternalID, err)
}

logger.Infof("updated total cost (externalID=%s)", result.Costs.ExternalID)
}

result.Changes = changes.ProcessRules(result)

if result.Config == nil && (result.AnalysisResult != nil || len(result.Changes) > 0) {
Expand All @@ -57,17 +101,20 @@ func Run(ctx *v1.ScrapeContext, configs ...v1.ConfigScraper) ([]v1.ScrapeResult,

results = append(results, scraped...)
}

if result.Error != nil {
jobHistory.AddError(result.Error.Error())
} else {
jobHistory.IncrSuccess()
}
}

jobHistory.End()
if err := db.PersistJobHistory(&jobHistory); err != nil {
logger.Errorf("Error persisting job history: %v", err)
}
}
}

return results, nil
}