diff --git a/providers/aws/connection/clients.go b/providers/aws/connection/clients.go index 67ee8f754e..0cfc680558 100644 --- a/providers/aws/connection/clients.go +++ b/providers/aws/connection/clients.go @@ -13,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/autoscaling" "github.com/aws/aws-sdk-go-v2/service/backup" "github.com/aws/aws-sdk-go-v2/service/inspector2" + "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/cloudfront" "github.com/aws/aws-sdk-go-v2/service/cloudtrail" @@ -485,6 +486,30 @@ func (t *AwsConnection) Sns(region string) *sns.Client { return client } +func (t *AwsConnection) Sqs(region string) *sqs.Client { + // if no region value is sent in, use the configured region + if len(region) == 0 { + region = t.cfg.Region + } + cacheVal := "_sqs_" + region + + // check for cached client and return it if it exists + c, ok := t.clientcache.Load(cacheVal) + if ok { + log.Debug().Msg("use cached sqs client") + return c.Data.(*sqs.Client) + } + + // create the client + cfg := t.cfg.Copy() + cfg.Region = region + client := sqs.NewFromConfig(cfg) + + // cache it + t.clientcache.Store(cacheVal, &CacheEntry{Data: client}) + return client +} + func (t *AwsConnection) Ssm(region string) *ssm.Client { // if no region value is sent in, use the configured region if len(region) == 0 { diff --git a/providers/aws/go.mod b/providers/aws/go.mod index fe90892d82..fbafc5e5a6 100644 --- a/providers/aws/go.mod +++ b/providers/aws/go.mod @@ -51,6 +51,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1 github.com/aws/aws-sdk-go-v2/service/securityhub v1.48.4 github.com/aws/aws-sdk-go-v2/service/sns v1.29.8 + github.com/aws/aws-sdk-go-v2/service/sqs v1.32.3 github.com/aws/aws-sdk-go-v2/service/ssm v1.50.4 github.com/aws/aws-sdk-go-v2/service/sts v1.28.10 github.com/aws/aws-sdk-go-v2/service/wafv2 v1.49.1 diff --git a/providers/aws/go.sum b/providers/aws/go.sum index b8b0cf75f4..534f5bb1d1 100644 --- a/providers/aws/go.sum +++ b/providers/aws/go.sum @@ -190,6 +190,8 @@ github.com/aws/aws-sdk-go-v2/service/securityhub v1.48.4 h1:J/5XSSUxCUIy6Ya/1qBe github.com/aws/aws-sdk-go-v2/service/securityhub v1.48.4/go.mod h1:Ypax6FsjjJFd0fojZ85aErP+hwfVaXW4gsInyTbwL6Q= github.com/aws/aws-sdk-go-v2/service/sns v1.29.8 h1:CQicXbvanE/nn+MJQVuDzBplQSFj7M+gLLtArzDVZS4= github.com/aws/aws-sdk-go-v2/service/sns v1.29.8/go.mod h1:oP1vkszM8xdAqHMdBstE5TF3xc+yHwQYrAvkNharymc= +github.com/aws/aws-sdk-go-v2/service/sqs v1.32.3 h1:K0kIvRVzlVB/7onxMnRoqJkBqRdukIeaQ5GwGAmzggM= +github.com/aws/aws-sdk-go-v2/service/sqs v1.32.3/go.mod h1:xPN9AEzpZ3Ny+HpzsyLBrdXoTFOz7tig6xuYOQ3A0bQ= github.com/aws/aws-sdk-go-v2/service/ssm v1.50.4 h1:SgDxM/2kJEeSavji5ob+oluTPo3CQOQmP56F3yUz/kE= github.com/aws/aws-sdk-go-v2/service/ssm v1.50.4/go.mod h1:uRCbiDLweN10yl6W80fLygiLUDTIonz8/RpH+6lsEnY= github.com/aws/aws-sdk-go-v2/service/sso v1.20.9 h1:aD7AGQhvPuAxlSUfo0CWU7s6FpkbyykMhGYMvlqTjVs= diff --git a/providers/aws/resources/aws.lr b/providers/aws/resources/aws.lr index 8279f485fb..15a8b89d19 100644 --- a/providers/aws/resources/aws.lr +++ b/providers/aws/resources/aws.lr @@ -1836,6 +1836,46 @@ private aws.dynamodb.table @defaults("name region") { status string } +// Amazon Simple Queue Service (SQS) +aws.sqs { + // List of Amazon SQS queues + queues() []aws.sqs.queue +} + +// Amazon Simple Queue Service (SQS) Queue +private aws.sqs.queue { + // ARN for the queue + arn() string + // Time when the queue was created + createdAt() time + // Dead letter SQS queue, if any + deadLetterQueue() aws.sqs.queue + // Delay seconds set on the queue + deliveryDelaySeconds() int + // KMS Key for SSE, if any + kmsKey() aws.kms.key + // Time when the queue was last modified + lastModified() time + // Maximum amount of messages that can be received by the queue + maxReceiveCount() int + // Maximum message size for the queue + maximumMessageSize() int + // Time in seconds the queue retains messages + messageRetentionPeriodSeconds() int + // Time in seconds the queue waits for messages + receiveMessageWaitTimeSeconds() int + // Region for the queue + region string + // Whether SSE is enabled for the queue + sqsManagedSseEnabled() bool + // The type of queue: Fifo or Standard + queueType() string + // URL for the queue + url string + // Visibility timeout for the queue + visibilityTimeoutSeconds() int +} + // Amazon Relational Database Service (RDS) aws.rds { // List of database instances diff --git a/providers/aws/resources/aws.lr.go b/providers/aws/resources/aws.lr.go index a83861d510..297f8cffab 100644 --- a/providers/aws/resources/aws.lr.go +++ b/providers/aws/resources/aws.lr.go @@ -486,6 +486,14 @@ func init() { Init: initAwsDynamodbTable, Create: createAwsDynamodbTable, }, + "aws.sqs": { + // to override args, implement: initAwsSqs(runtime *plugin.Runtime, args map[string]*llx.RawData) (map[string]*llx.RawData, plugin.Resource, error) + Create: createAwsSqs, + }, + "aws.sqs.queue": { + // to override args, implement: initAwsSqsQueue(runtime *plugin.Runtime, args map[string]*llx.RawData) (map[string]*llx.RawData, plugin.Resource, error) + Create: createAwsSqsQueue, + }, "aws.rds": { // to override args, implement: initAwsRds(runtime *plugin.Runtime, args map[string]*llx.RawData) (map[string]*llx.RawData, plugin.Resource, error) Create: createAwsRds, @@ -2795,6 +2803,54 @@ var getDataFields = map[string]func(r plugin.Resource) *plugin.DataRes{ "aws.dynamodb.table.status": func(r plugin.Resource) *plugin.DataRes { return (r.(*mqlAwsDynamodbTable).GetStatus()).ToDataRes(types.String) }, + "aws.sqs.queues": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqs).GetQueues()).ToDataRes(types.Array(types.Resource("aws.sqs.queue"))) + }, + "aws.sqs.queue.arn": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetArn()).ToDataRes(types.String) + }, + "aws.sqs.queue.createdAt": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetCreatedAt()).ToDataRes(types.Time) + }, + "aws.sqs.queue.deadLetterQueue": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetDeadLetterQueue()).ToDataRes(types.Resource("aws.sqs.queue")) + }, + "aws.sqs.queue.deliveryDelaySeconds": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetDeliveryDelaySeconds()).ToDataRes(types.Int) + }, + "aws.sqs.queue.kmsKey": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetKmsKey()).ToDataRes(types.Resource("aws.kms.key")) + }, + "aws.sqs.queue.lastModified": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetLastModified()).ToDataRes(types.Time) + }, + "aws.sqs.queue.maxReceiveCount": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetMaxReceiveCount()).ToDataRes(types.Int) + }, + "aws.sqs.queue.maximumMessageSize": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetMaximumMessageSize()).ToDataRes(types.Int) + }, + "aws.sqs.queue.messageRetentionPeriodSeconds": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetMessageRetentionPeriodSeconds()).ToDataRes(types.Int) + }, + "aws.sqs.queue.receiveMessageWaitTimeSeconds": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetReceiveMessageWaitTimeSeconds()).ToDataRes(types.Int) + }, + "aws.sqs.queue.region": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetRegion()).ToDataRes(types.String) + }, + "aws.sqs.queue.sqsManagedSseEnabled": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetSqsManagedSseEnabled()).ToDataRes(types.Bool) + }, + "aws.sqs.queue.queueType": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetQueueType()).ToDataRes(types.String) + }, + "aws.sqs.queue.url": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetUrl()).ToDataRes(types.String) + }, + "aws.sqs.queue.visibilityTimeoutSeconds": func(r plugin.Resource) *plugin.DataRes { + return (r.(*mqlAwsSqsQueue).GetVisibilityTimeoutSeconds()).ToDataRes(types.Int) + }, "aws.rds.dbInstances": func(r plugin.Resource) *plugin.DataRes { return (r.(*mqlAwsRds).GetDbInstances()).ToDataRes(types.Array(types.Resource("aws.rds.dbinstance"))) }, @@ -7313,6 +7369,78 @@ var setDataFields = map[string]func(r plugin.Resource, v *llx.RawData) bool { r.(*mqlAwsDynamodbTable).Status, ok = plugin.RawToTValue[string](v.Value, v.Error) return }, + "aws.sqs.__id": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqs).__id, ok = v.Value.(string) + return + }, + "aws.sqs.queues": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqs).Queues, ok = plugin.RawToTValue[[]interface{}](v.Value, v.Error) + return + }, + "aws.sqs.queue.__id": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).__id, ok = v.Value.(string) + return + }, + "aws.sqs.queue.arn": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).Arn, ok = plugin.RawToTValue[string](v.Value, v.Error) + return + }, + "aws.sqs.queue.createdAt": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).CreatedAt, ok = plugin.RawToTValue[*time.Time](v.Value, v.Error) + return + }, + "aws.sqs.queue.deadLetterQueue": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).DeadLetterQueue, ok = plugin.RawToTValue[*mqlAwsSqsQueue](v.Value, v.Error) + return + }, + "aws.sqs.queue.deliveryDelaySeconds": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).DeliveryDelaySeconds, ok = plugin.RawToTValue[int64](v.Value, v.Error) + return + }, + "aws.sqs.queue.kmsKey": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).KmsKey, ok = plugin.RawToTValue[*mqlAwsKmsKey](v.Value, v.Error) + return + }, + "aws.sqs.queue.lastModified": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).LastModified, ok = plugin.RawToTValue[*time.Time](v.Value, v.Error) + return + }, + "aws.sqs.queue.maxReceiveCount": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).MaxReceiveCount, ok = plugin.RawToTValue[int64](v.Value, v.Error) + return + }, + "aws.sqs.queue.maximumMessageSize": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).MaximumMessageSize, ok = plugin.RawToTValue[int64](v.Value, v.Error) + return + }, + "aws.sqs.queue.messageRetentionPeriodSeconds": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).MessageRetentionPeriodSeconds, ok = plugin.RawToTValue[int64](v.Value, v.Error) + return + }, + "aws.sqs.queue.receiveMessageWaitTimeSeconds": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).ReceiveMessageWaitTimeSeconds, ok = plugin.RawToTValue[int64](v.Value, v.Error) + return + }, + "aws.sqs.queue.region": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).Region, ok = plugin.RawToTValue[string](v.Value, v.Error) + return + }, + "aws.sqs.queue.sqsManagedSseEnabled": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).SqsManagedSseEnabled, ok = plugin.RawToTValue[bool](v.Value, v.Error) + return + }, + "aws.sqs.queue.queueType": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).QueueType, ok = plugin.RawToTValue[string](v.Value, v.Error) + return + }, + "aws.sqs.queue.url": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).Url, ok = plugin.RawToTValue[string](v.Value, v.Error) + return + }, + "aws.sqs.queue.visibilityTimeoutSeconds": func(r plugin.Resource, v *llx.RawData) (ok bool) { + r.(*mqlAwsSqsQueue).VisibilityTimeoutSeconds, ok = plugin.RawToTValue[int64](v.Value, v.Error) + return + }, "aws.rds.__id": func(r plugin.Resource, v *llx.RawData) (ok bool) { r.(*mqlAwsRds).__id, ok = v.Value.(string) return @@ -18988,6 +19116,232 @@ func (c *mqlAwsDynamodbTable) GetStatus() *plugin.TValue[string] { return &c.Status } +// mqlAwsSqs for the aws.sqs resource +type mqlAwsSqs struct { + MqlRuntime *plugin.Runtime + __id string + // optional: if you define mqlAwsSqsInternal it will be used here + Queues plugin.TValue[[]interface{}] +} + +// createAwsSqs creates a new instance of this resource +func createAwsSqs(runtime *plugin.Runtime, args map[string]*llx.RawData) (plugin.Resource, error) { + res := &mqlAwsSqs{ + MqlRuntime: runtime, + } + + err := SetAllData(res, args) + if err != nil { + return res, err + } + + if res.__id == "" { + res.__id, err = res.id() + if err != nil { + return nil, err + } + } + + if runtime.HasRecording { + args, err = runtime.ResourceFromRecording("aws.sqs", res.__id) + if err != nil || args == nil { + return res, err + } + return res, SetAllData(res, args) + } + + return res, nil +} + +func (c *mqlAwsSqs) MqlName() string { + return "aws.sqs" +} + +func (c *mqlAwsSqs) MqlID() string { + return c.__id +} + +func (c *mqlAwsSqs) GetQueues() *plugin.TValue[[]interface{}] { + return plugin.GetOrCompute[[]interface{}](&c.Queues, func() ([]interface{}, error) { + if c.MqlRuntime.HasRecording { + d, err := c.MqlRuntime.FieldResourceFromRecording("aws.sqs", c.__id, "queues") + if err != nil { + return nil, err + } + if d != nil { + return d.Value.([]interface{}), nil + } + } + + return c.queues() + }) +} + +// mqlAwsSqsQueue for the aws.sqs.queue resource +type mqlAwsSqsQueue struct { + MqlRuntime *plugin.Runtime + __id string + mqlAwsSqsQueueInternal + Arn plugin.TValue[string] + CreatedAt plugin.TValue[*time.Time] + DeadLetterQueue plugin.TValue[*mqlAwsSqsQueue] + DeliveryDelaySeconds plugin.TValue[int64] + KmsKey plugin.TValue[*mqlAwsKmsKey] + LastModified plugin.TValue[*time.Time] + MaxReceiveCount plugin.TValue[int64] + MaximumMessageSize plugin.TValue[int64] + MessageRetentionPeriodSeconds plugin.TValue[int64] + ReceiveMessageWaitTimeSeconds plugin.TValue[int64] + Region plugin.TValue[string] + SqsManagedSseEnabled plugin.TValue[bool] + QueueType plugin.TValue[string] + Url plugin.TValue[string] + VisibilityTimeoutSeconds plugin.TValue[int64] +} + +// createAwsSqsQueue creates a new instance of this resource +func createAwsSqsQueue(runtime *plugin.Runtime, args map[string]*llx.RawData) (plugin.Resource, error) { + res := &mqlAwsSqsQueue{ + MqlRuntime: runtime, + } + + err := SetAllData(res, args) + if err != nil { + return res, err + } + + if res.__id == "" { + res.__id, err = res.id() + if err != nil { + return nil, err + } + } + + if runtime.HasRecording { + args, err = runtime.ResourceFromRecording("aws.sqs.queue", res.__id) + if err != nil || args == nil { + return res, err + } + return res, SetAllData(res, args) + } + + return res, nil +} + +func (c *mqlAwsSqsQueue) MqlName() string { + return "aws.sqs.queue" +} + +func (c *mqlAwsSqsQueue) MqlID() string { + return c.__id +} + +func (c *mqlAwsSqsQueue) GetArn() *plugin.TValue[string] { + return plugin.GetOrCompute[string](&c.Arn, func() (string, error) { + return c.arn() + }) +} + +func (c *mqlAwsSqsQueue) GetCreatedAt() *plugin.TValue[*time.Time] { + return plugin.GetOrCompute[*time.Time](&c.CreatedAt, func() (*time.Time, error) { + return c.createdAt() + }) +} + +func (c *mqlAwsSqsQueue) GetDeadLetterQueue() *plugin.TValue[*mqlAwsSqsQueue] { + return plugin.GetOrCompute[*mqlAwsSqsQueue](&c.DeadLetterQueue, func() (*mqlAwsSqsQueue, error) { + if c.MqlRuntime.HasRecording { + d, err := c.MqlRuntime.FieldResourceFromRecording("aws.sqs.queue", c.__id, "deadLetterQueue") + if err != nil { + return nil, err + } + if d != nil { + return d.Value.(*mqlAwsSqsQueue), nil + } + } + + return c.deadLetterQueue() + }) +} + +func (c *mqlAwsSqsQueue) GetDeliveryDelaySeconds() *plugin.TValue[int64] { + return plugin.GetOrCompute[int64](&c.DeliveryDelaySeconds, func() (int64, error) { + return c.deliveryDelaySeconds() + }) +} + +func (c *mqlAwsSqsQueue) GetKmsKey() *plugin.TValue[*mqlAwsKmsKey] { + return plugin.GetOrCompute[*mqlAwsKmsKey](&c.KmsKey, func() (*mqlAwsKmsKey, error) { + if c.MqlRuntime.HasRecording { + d, err := c.MqlRuntime.FieldResourceFromRecording("aws.sqs.queue", c.__id, "kmsKey") + if err != nil { + return nil, err + } + if d != nil { + return d.Value.(*mqlAwsKmsKey), nil + } + } + + return c.kmsKey() + }) +} + +func (c *mqlAwsSqsQueue) GetLastModified() *plugin.TValue[*time.Time] { + return plugin.GetOrCompute[*time.Time](&c.LastModified, func() (*time.Time, error) { + return c.lastModified() + }) +} + +func (c *mqlAwsSqsQueue) GetMaxReceiveCount() *plugin.TValue[int64] { + return plugin.GetOrCompute[int64](&c.MaxReceiveCount, func() (int64, error) { + return c.maxReceiveCount() + }) +} + +func (c *mqlAwsSqsQueue) GetMaximumMessageSize() *plugin.TValue[int64] { + return plugin.GetOrCompute[int64](&c.MaximumMessageSize, func() (int64, error) { + return c.maximumMessageSize() + }) +} + +func (c *mqlAwsSqsQueue) GetMessageRetentionPeriodSeconds() *plugin.TValue[int64] { + return plugin.GetOrCompute[int64](&c.MessageRetentionPeriodSeconds, func() (int64, error) { + return c.messageRetentionPeriodSeconds() + }) +} + +func (c *mqlAwsSqsQueue) GetReceiveMessageWaitTimeSeconds() *plugin.TValue[int64] { + return plugin.GetOrCompute[int64](&c.ReceiveMessageWaitTimeSeconds, func() (int64, error) { + return c.receiveMessageWaitTimeSeconds() + }) +} + +func (c *mqlAwsSqsQueue) GetRegion() *plugin.TValue[string] { + return &c.Region +} + +func (c *mqlAwsSqsQueue) GetSqsManagedSseEnabled() *plugin.TValue[bool] { + return plugin.GetOrCompute[bool](&c.SqsManagedSseEnabled, func() (bool, error) { + return c.sqsManagedSseEnabled() + }) +} + +func (c *mqlAwsSqsQueue) GetQueueType() *plugin.TValue[string] { + return plugin.GetOrCompute[string](&c.QueueType, func() (string, error) { + return c.queueType() + }) +} + +func (c *mqlAwsSqsQueue) GetUrl() *plugin.TValue[string] { + return &c.Url +} + +func (c *mqlAwsSqsQueue) GetVisibilityTimeoutSeconds() *plugin.TValue[int64] { + return plugin.GetOrCompute[int64](&c.VisibilityTimeoutSeconds, func() (int64, error) { + return c.visibilityTimeoutSeconds() + }) +} + // mqlAwsRds for the aws.rds resource type mqlAwsRds struct { MqlRuntime *plugin.Runtime diff --git a/providers/aws/resources/aws.lr.manifest.yaml b/providers/aws/resources/aws.lr.manifest.yaml index 06aa93e275..8e877ab2df 100755 --- a/providers/aws/resources/aws.lr.manifest.yaml +++ b/providers/aws/resources/aws.lr.manifest.yaml @@ -2617,6 +2617,38 @@ resources: platform: name: - aws + aws.sqs: + fields: + queues: {} + min_mondoo_version: 9.0.0 + platform: + name: + - aws + aws.sqs.queue: + fields: + arn: {} + createdAt: {} + deadLetterQueue: {} + deliveryDelaySeconds: {} + kmsKey: {} + lastModified: {} + maxReceiveCount: {} + maximumMessageSize: {} + messageRetentionPeriodSeconds: {} + queueType: {} + receiveMessageWaitTimeSeconds: {} + region: {} + sqsManagedSseEnabled: {} + sseEnabled: {} + tags: {} + type: {} + url: {} + visibilityTimeoutSeconds: {} + is_private: true + min_mondoo_version: 9.0.0 + platform: + name: + - aws aws.ssm: fields: instances: {} diff --git a/providers/aws/resources/aws_sqs.go b/providers/aws/resources/aws_sqs.go new file mode 100644 index 0000000000..1149e62350 --- /dev/null +++ b/providers/aws/resources/aws_sqs.go @@ -0,0 +1,306 @@ +// Copyright (c) Mondoo, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package resources + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/arn" + "github.com/rs/zerolog/log" + "go.mondoo.com/cnquery/v11/llx" + "go.mondoo.com/cnquery/v11/providers-sdk/v1/plugin" + "go.mondoo.com/cnquery/v11/providers-sdk/v1/util/jobpool" + "go.mondoo.com/cnquery/v11/providers/aws/connection" +) + +func (a *mqlAwsSqs) id() (string, error) { + return "aws.sqs", nil +} + +func (a *mqlAwsSqsQueue) id() (string, error) { + return a.Url.Data, nil +} + +func (a *mqlAwsSqs) queues() ([]interface{}, error) { + conn := a.MqlRuntime.Connection.(*connection.AwsConnection) + res := []interface{}{} + poolOfJobs := jobpool.CreatePool(a.getQueues(conn), 5) + poolOfJobs.Run() + + // check for errors + if poolOfJobs.HasErrors() { + return nil, poolOfJobs.GetErrors() + } + // get all the results + for i := range poolOfJobs.Jobs { + res = append(res, poolOfJobs.Jobs[i].Result.([]interface{})...) + } + + return res, nil +} + +func (a *mqlAwsSqs) getQueues(conn *connection.AwsConnection) []*jobpool.Job { + tasks := make([]*jobpool.Job, 0) + regions, err := conn.Regions() + if err != nil { + return []*jobpool.Job{{Err: err}} + } + + for _, region := range regions { + regionVal := region + f := func() (jobpool.JobResult, error) { + svc := conn.Sqs(regionVal) + ctx := context.Background() + res := []interface{}{} + + nextToken := aws.String("no_token_to_start_with") + params := &sqs.ListQueuesInput{} + for nextToken != nil { + qs, err := svc.ListQueues(ctx, params) + if err != nil { + if Is400AccessDeniedError(err) { + log.Warn().Str("region", regionVal).Msg("error accessing region for AWS API") + return res, nil + } + return nil, err + } + for _, q := range qs.QueueUrls { + mqlTopic, err := CreateResource(a.MqlRuntime, "aws.sqs.queue", + map[string]*llx.RawData{ + "url": llx.StringData(q), + "region": llx.StringData(regionVal), + }, + ) + if err != nil { + return nil, err + } + res = append(res, mqlTopic) + } + nextToken = qs.NextToken + if qs.NextToken != nil { + params.NextToken = nextToken + } + } + return jobpool.JobResult(res), nil + } + tasks = append(tasks, jobpool.NewJob(f)) + } + return tasks +} + +type mqlAwsSqsQueueInternal struct { + fetched bool + queueAtts map[string]string + lock sync.Mutex +} + +func (a *mqlAwsSqsQueue) fetchAttributes() (map[string]string, error) { + if a.fetched { + return a.queueAtts, nil + } + a.lock.Lock() + defer a.lock.Unlock() + conn := a.MqlRuntime.Connection.(*connection.AwsConnection) + ctx := context.Background() + svc := conn.Sqs(a.Region.Data) + desc, err := svc.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{QueueUrl: aws.String(a.Url.Data), AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll}}) + if err != nil { + return nil, err + } + a.fetched = true + a.queueAtts = desc.Attributes + return desc.Attributes, nil +} + +func (a *mqlAwsSqsQueue) kmsKey() (*mqlAwsKmsKey, error) { + atts, err := a.fetchAttributes() + if err != nil { + return nil, err + } + if atts["KmsMasterKeyId"] == "" { + a.KmsKey.State = plugin.StateIsNull | plugin.StateIsSet + return nil, nil + } + id := atts["KmsMasterKeyId"] + conn := a.MqlRuntime.Connection.(*connection.AwsConnection) + mqlKey, err := NewResource(a.MqlRuntime, "aws.kms.key", + map[string]*llx.RawData{ + "arn": llx.StringData(fmt.Sprintf(kmsKeyArnPattern, a.Region.Data, conn.AccountId(), id)), + }) + if err != nil { + return nil, err + } + return mqlKey.(*mqlAwsKmsKey), nil +} + +func (a *mqlAwsSqsQueue) deadLetterQueue() (*mqlAwsSqsQueue, error) { + atts, err := a.fetchAttributes() + if err != nil { + return nil, err + } + c := atts["RedrivePolicy"] + if c == "" { + a.DeadLetterQueue.State = plugin.StateIsSet | plugin.StateIsNull + return nil, nil + } + var r redrivePolicy + err = json.Unmarshal([]byte(c), &r) + if err != nil { + return nil, err + } + parsedArn, err := arn.Parse(r.DeadLetterTargetArn) + if err != nil { + return nil, err + } + // "https://sqs.us-east-1.amazonaws.com/921877552404/Test-Preslav-Queue" + url := fmt.Sprintf("https://sqs.%s.amazonaws.com/%s/%s", a.Region.Data, parsedArn.AccountID, parsedArn.Resource) + q, err := NewResource(a.MqlRuntime, "aws.sqs.queue", + map[string]*llx.RawData{ + "arn": llx.StringData(r.DeadLetterTargetArn), + "url": llx.StringData(url), + "region": llx.StringData(a.Region.Data), + }) + if err != nil { + return nil, err + } + return q.(*mqlAwsSqsQueue), nil +} + +func (a *mqlAwsSqsQueue) arn() (string, error) { + atts, err := a.fetchAttributes() + if err != nil { + return "", err + } + return atts["QueueArn"], nil +} + +func (a *mqlAwsSqsQueue) createdAt() (*time.Time, error) { + atts, err := a.fetchAttributes() + if err != nil { + return nil, err + } + i, err := strconv.ParseInt(atts["CreatedTimestamp"], 10, 64) + t := time.Unix(i, 0) + return &t, nil +} + +func (a *mqlAwsSqsQueue) deliveryDelaySeconds() (int64, error) { + atts, err := a.fetchAttributes() + if err != nil { + return 0, err + } + c, err := strconv.Atoi(atts["DelaySeconds"]) + if err != nil { + return 0, err + } + return int64(c), nil +} + +func (a *mqlAwsSqsQueue) lastModified() (*time.Time, error) { + atts, err := a.fetchAttributes() + if err != nil { + return nil, err + } + i, err := strconv.ParseInt(atts["LastModifiedTimestamp"], 10, 64) + t := time.Unix(i, 0) + return &t, nil +} + +type redrivePolicy struct { + DeadLetterTargetArn string `json:"deadLetterTargetArn,omitempty"` + MaxReceiveCount int `json:"maxReceiveCount,omitempty"` +} + +func (a *mqlAwsSqsQueue) maxReceiveCount() (int64, error) { + atts, err := a.fetchAttributes() + if err != nil { + return 0, err + } + c := atts["RedrivePolicy"] + if c == "" { + return 0, nil + } + log.Info().Msgf("redrive %v", c) + var r redrivePolicy + err = json.Unmarshal([]byte(c), &r) + if err != nil { + return 0, err + } + return int64(r.MaxReceiveCount), nil +} + +func (a *mqlAwsSqsQueue) maximumMessageSize() (int64, error) { + atts, err := a.fetchAttributes() + if err != nil { + return 0, err + } + c, err := strconv.Atoi(atts["MaximumMessageSize"]) + if err != nil { + return 0, err + } + return int64(c), nil +} + +func (a *mqlAwsSqsQueue) messageRetentionPeriodSeconds() (int64, error) { + atts, err := a.fetchAttributes() + if err != nil { + return 0, err + } + c, err := strconv.Atoi(atts["MessageRetentionPeriod"]) + if err != nil { + return 0, err + } + return int64(c), nil +} + +func (a *mqlAwsSqsQueue) receiveMessageWaitTimeSeconds() (int64, error) { + atts, err := a.fetchAttributes() + if err != nil { + return 0, err + } + c, err := strconv.Atoi(atts["ReceiveMessageWaitTimeSeconds"]) + if err != nil { + return 0, err + } + return int64(c), nil +} + +func (a *mqlAwsSqsQueue) sqsManagedSseEnabled() (bool, error) { + atts, err := a.fetchAttributes() + if err != nil { + return false, err + } + return strconv.ParseBool(atts["SqsManagedSseEnabled"]) +} + +func (a *mqlAwsSqsQueue) queueType() (string, error) { + atts, err := a.fetchAttributes() + if err != nil { + return "", err + } + if atts["FifoQueue"] == "true" { + return "fifo", nil + } + return "standard", nil +} + +func (a *mqlAwsSqsQueue) visibilityTimeoutSeconds() (int64, error) { + atts, err := a.fetchAttributes() + if err != nil { + return 0, err + } + c, err := strconv.Atoi(atts["VisibilityTimeout"]) + if err != nil { + return 0, err + } + return int64(c), nil +}