diff --git a/cmd/fdsn-holdings-consumer/main.go b/cmd/fdsn-holdings-consumer/main.go index 20a2005d..fc36d5dc 100644 --- a/cmd/fdsn-holdings-consumer/main.go +++ b/cmd/fdsn-holdings-consumer/main.go @@ -10,19 +10,33 @@ package main import ( + "context" "database/sql" "encoding/json" "errors" "fmt" "log" + "log/slog" "os" + "os/signal" "strings" + "syscall" "time" "github.com/GeoNet/kit/aws/s3" "github.com/GeoNet/kit/aws/sqs" "github.com/GeoNet/kit/cfg" + "github.com/GeoNet/kit/health" "github.com/GeoNet/kit/metrics" + "github.com/GeoNet/kit/slogger" +) + +const ( + healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle) + healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting + healthCheckTimeout = 30 * time.Second //health check timeout + healthCheckService = ":7777" //end point to listen to for SOH checks + healthCheckPath = "/soh" ) var ( @@ -31,6 +45,8 @@ var ( sqsClient sqs.SQS s3Client *s3.S3 saveHoldings *sql.Stmt + + sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages ) type event struct { @@ -38,6 +54,12 @@ type event struct { } func main() { + //check health + if health.RunningHealthCheck() { + healthCheck() + } + + //run as normal service p, err := cfg.PostgresEnv() if err != nil { log.Fatalf("error reading DB config from the environment vars: %s", err) @@ -80,11 +102,15 @@ func main() { db.SetMaxIdleConns(p.MaxIdle) db.SetMaxOpenConns(p.MaxOpen) + // provide a soh heartbeat + health := health.New(healthCheckService, healthCheckAged, healthCheckStartup) + ping: for { err = db.Ping() if err != nil { log.Println("problem pinging DB sleeping and retrying") + health.Ok() //send heartbeat time.Sleep(time.Second * 30) continue ping } @@ -107,17 +133,36 @@ ping: var r sqs.Raw var e event + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + +loop1: for { - r, err = sqsClient.Receive(queueURL, 600) + r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) if err != nil { - log.Printf("problem receiving message, backing off: %s", err) - time.Sleep(time.Second * 20) + switch { + case sqs.Cancelled(err): //stoped + log.Println("##1 system stop... ") + break loop1 + case sqs.IsNoMessagesError(err): + n := sLogger.Log(err) + if n%100 == 0 { //don't log all repeated error messages + log.Printf("no message received for %d times ", n) + } + default: + slog.Warn("problem receiving message, backing off", "err", err) + time.Sleep(time.Second * 20) + } + // update soh + health.Ok() continue } err = metrics.DoProcess(&e, []byte(r.Body)) if err != nil { log.Printf("problem processing message, skipping deletion for redelivery: %s", err) + // update soh + health.Ok() continue } @@ -125,7 +170,24 @@ ping: if err != nil { log.Printf("problem deleting message, continuing: %s", err) } + // update soh + health.Ok() + } +} + +// check health by calling the http soh endpoint +// cmd: ./fdsn-holdings-consumer -check +func healthCheck() { + ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) + defer cancel() + + msg, err := health.Check(ctx, healthCheckService+healthCheckPath, healthCheckTimeout) + if err != nil { + log.Printf("status: %v", err) + os.Exit(1) } + log.Printf("status: %s", string(msg)) + os.Exit(0) } // Process implements msg.Processor for event. diff --git a/go.mod b/go.mod index 3d3895fa..87342e01 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/GeoNet/fdsn go 1.21 require ( - github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60 + github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 github.com/gorilla/schema v1.4.1 github.com/joho/godotenv v1.5.1 github.com/lib/pq v1.10.3 diff --git a/go.sum b/go.sum index 8912fc22..a5c407fb 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60 h1:BgAWCVg+WxU28mXiy/3le7H9nZUo37QS/+GfXSFWYgo= -github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= +github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 h1:SeKMshwK+xOgKLKrMSPhYTQImmLop5tXXei/wOmgO80= +github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= github.com/aws/aws-sdk-go-v2 v1.25.3 h1:xYiLpZTQs1mzvz5PaI6uR0Wh57ippuEthxS4iK5v0n0= github.com/aws/aws-sdk-go-v2 v1.25.3/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= diff --git a/vendor/github.com/GeoNet/kit/aws/s3/s3.go b/vendor/github.com/GeoNet/kit/aws/s3/s3.go index 4432d3b8..1cd6bafe 100644 --- a/vendor/github.com/GeoNet/kit/aws/s3/s3.go +++ b/vendor/github.com/GeoNet/kit/aws/s3/s3.go @@ -94,7 +94,7 @@ func getConfig() (aws.Config, error) { var cfg aws.Config var err error - if awsEndpoint := os.Getenv("CUSTOM_AWS_ENDPOINT_URL"); awsEndpoint != "" { + if awsEndpoint := os.Getenv("AWS_ENDPOINT_URL"); awsEndpoint != "" { customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ PartitionID: "aws", diff --git a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go index 8b9778a3..bd7f21c1 100644 --- a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go +++ b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go @@ -27,6 +27,9 @@ type SQS struct { client *sqs.Client } +// specific error to return when no messages are received from the queue +var ErrNoMessages = errors.New("no messages received from queue") + // New returns an SQS struct which wraps an SQS client using the default AWS credentials chain. // This consults (in order) environment vars, config files, EC2 and ECS roles. // It is an error if the AWS_REGION environment variable is not set. @@ -62,7 +65,7 @@ func getConfig() (aws.Config, error) { var cfg aws.Config var err error - if awsEndpoint := os.Getenv("CUSTOM_AWS_ENDPOINT_URL"); awsEndpoint != "" { + if awsEndpoint := os.Getenv("AWS_ENDPOINT_URL"); awsEndpoint != "" { customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ PartitionID: "aws", @@ -125,29 +128,28 @@ func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, // receiveMessage is the common code used internally to receive an SQS message based // on the provided input. func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput) (Raw, error) { + r, err := s.client.ReceiveMessage(ctx, input) + if err != nil { + return Raw{}, err + } - for { - r, err := s.client.ReceiveMessage(ctx, input) - if err != nil { - return Raw{}, err - } + switch { + case r == nil || len(r.Messages) == 0: + // no message received + return Raw{}, ErrNoMessages + + case len(r.Messages) == 1: + raw := r.Messages[0] - switch { - case r == nil || len(r.Messages) == 0: - // no message received - continue - case len(r.Messages) == 1: - raw := r.Messages[0] - - m := Raw{ - Body: aws.ToString(raw.Body), - ReceiptHandle: aws.ToString(raw.ReceiptHandle), - Attributes: raw.Attributes, - } - return m, nil - case len(r.Messages) > 1: - return Raw{}, fmt.Errorf("received more than 1 message: %d", len(r.Messages)) + m := Raw{ + Body: aws.ToString(raw.Body), + ReceiptHandle: aws.ToString(raw.ReceiptHandle), + Attributes: raw.Attributes, } + return m, nil + + default: + return Raw{}, fmt.Errorf("received unexpected messages: %d", len(r.Messages)) } } @@ -279,6 +281,53 @@ func (s *SQS) GetQueueUrl(name string) (string, error) { return "", nil } +func (s *SQS) GetQueueARN(url string) (string, error) { + + params := sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(url), + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameQueueArn, + }, + } + + output, err := s.client.GetQueueAttributes(context.TODO(), ¶ms) + if err != nil { + return "", err + } + arn := output.Attributes[string(types.QueueAttributeNameQueueArn)] + if arn == "" { + return "", errors.New("ARN attribute not found") + } + return arn, nil +} + +// CreateQueue creates an Amazon SQS queue with the specified name. You can specify +// whether the queue is created as a FIFO queue. Returns the queue URL. +func (s *SQS) CreateQueue(queueName string, isFifoQueue bool) (string, error) { + + queueAttributes := map[string]string{} + if isFifoQueue { + queueAttributes["FifoQueue"] = "true" + } + queue, err := s.client.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: aws.String(queueName), + Attributes: queueAttributes, + }) + if err != nil { + return "", err + } + + return aws.ToString(queue.QueueUrl), err +} + +// DeleteQueue deletes an Amazon SQS queue. +func (s *SQS) DeleteQueue(queueUrl string) error { + _, err := s.client.DeleteQueue(context.TODO(), &sqs.DeleteQueueInput{ + QueueUrl: aws.String(queueUrl)}) + + return err +} + func Cancelled(err error) bool { var opErr *smithy.OperationError if errors.As(err, &opErr) { @@ -286,3 +335,7 @@ func Cancelled(err error) bool { } return false } + +func IsNoMessagesError(err error) bool { + return errors.Is(err, ErrNoMessages) +} diff --git a/vendor/github.com/GeoNet/kit/health/check.go b/vendor/github.com/GeoNet/kit/health/check.go new file mode 100644 index 00000000..a770ae60 --- /dev/null +++ b/vendor/github.com/GeoNet/kit/health/check.go @@ -0,0 +1,80 @@ +package health + +import ( + "context" + "flag" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +// Check calls the given service endpoint with a given context and timeout. +// An error will be returned if the connection fails, or the response status +// is not 200 (i.e. StatusOK). A successful check will return only the check message reply. +func Check(ctx context.Context, servicePath string, timeout time.Duration) ([]byte, error) { + checkUrl := servicePath + if !strings.HasPrefix(checkUrl, "http") { + checkUrl = "http://" + servicePath + } + req, err := url.Parse(checkUrl) + if err != nil { + return nil, err + } + + client := &http.Client{ + Timeout: timeout, + } + + request, err := http.NewRequestWithContext(ctx, http.MethodGet, req.String(), nil) + if err != nil { + return nil, err + } + + resp, err := client.Do(request) + if resp == nil || err != nil { + return nil, err + } + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%s (%s)", string(body), http.StatusText(resp.StatusCode)) + } + + return body, nil +} + +// CheckStatus runs a Check on the given service and returns zero for a healthy service, and one otherwise. +// +// @param {string} servicePat: service address and path to check e.g. 8080/soh +func CheckStatus(servicePath string, timeout time.Duration) int { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + if _, err := Check(ctx, servicePath, timeout); err != nil { + return 1 + } + + return 0 +} + +// RunningHealthCheck returns whether the -check flag was used when starting the program. +// This flag indicates that the program is being used to run a health check on another program. +func RunningHealthCheck() bool { + + var isHealthCheck bool + flag.BoolVar(&isHealthCheck, "check", false, "Whether the program is being used to run a health check") + flag.Parse() + + return isHealthCheck +} diff --git a/vendor/github.com/GeoNet/kit/health/service.go b/vendor/github.com/GeoNet/kit/health/service.go new file mode 100644 index 00000000..d808c5f2 --- /dev/null +++ b/vendor/github.com/GeoNet/kit/health/service.go @@ -0,0 +1,159 @@ +package health + +import ( + "context" + "log" + "net/http" + "sync" + "time" +) + +// CheckPath is the baked in SOH endpoint path. +const CheckPath = "/soh" + +// Service provides a mechanism to update a service SOH status. +type Service struct { + mu sync.Mutex + + // status is used to indicate whether the service is running + status bool + // last stores the time of the last update. + last time.Time + + // start stores when the service was started. + start time.Time + // aged is the time if no updates have happened indicates the service is no longer running. + // Default zero value means no age check required. + aged time.Duration + // startup is the time after the start which the check is assumed to be successful. + startup time.Duration +} + +// New returns a health Service which provides running SOH capabilities. +func New(endpoint string, aged, startup time.Duration) *Service { + service := &Service{ + aged: aged, + last: time.Now(), + start: time.Now(), + startup: startup, + } + + router := http.NewServeMux() + router.HandleFunc(CheckPath, service.handler) + + srv := &http.Server{ + Addr: endpoint, + Handler: router, + ReadHeaderTimeout: 2 * time.Second, + } + + go func() { + if err := srv.ListenAndServe(); err != nil { + log.Println("error starting health check service", err) + } + }() + + return service +} + +// state returns the current application state, this is likely to +// be expanded as new checks are added. +func (s *Service) state() bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.status +} + +func (s *Service) handler(w http.ResponseWriter, r *http.Request) { + ok := s.state() + switch { + case time.Since(s.start) < s.startup: + // Avoid terminating before initial check period + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("warn")); err != nil { + log.Println("error writing response", err) + } + case ok && (s.aged == 0 || time.Since(s.last) < s.aged): + // Service is OK and actively updating + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("ok")); err != nil { + log.Println("error writing response", err) + } + default: + // Service is not OK or has stopped updating + w.WriteHeader(http.StatusInternalServerError) + if _, err := w.Write([]byte("fail")); err != nil { + log.Println("error writing response", err) + } + } +} + +// Ok updates the Service to indicate the service is running as expected. +func (s *Service) Ok() { + s.Update(true) +} + +// Fail updates the Service to indicate the service is not running as expected. +func (s *Service) Fail() { + s.Update(false) +} + +// Update sets the Service to the given state, and stores the time since the last update. +func (s *Service) Update(status bool) { + s.mu.Lock() + defer s.mu.Unlock() + + s.status = status + s.last = time.Now() +} + +// Alive allows an application to perform a complex task while still sending hearbeats. +func (s *Service) Alive(ctx context.Context, heartbeat time.Duration) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + + go func() { + defer cancel() + + ticker := time.NewTicker(heartbeat) + defer ticker.Stop() + + s.Ok() + + for { + select { + case <-ticker.C: + s.Ok() + case <-ctx.Done(): + return + } + } + }() + + return cancel +} + +// Pause allows an application to stall for a set period of time while still sending hearbeats. +func (s *Service) Pause(ctx context.Context, deadline, heartbeat time.Duration) context.CancelFunc { + ctx, cancel := context.WithTimeout(ctx, deadline) + + go func() { + defer cancel() + + ticker := time.NewTicker(heartbeat) + defer ticker.Stop() + + s.Ok() + + for { + select { + case <-ticker.C: + s.Ok() + case <-ctx.Done(): + return + } + } + }() + + return cancel +} diff --git a/vendor/github.com/GeoNet/kit/seis/ms/doc.go b/vendor/github.com/GeoNet/kit/seis/ms/doc.go index f58ca47b..24f283b5 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/doc.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/doc.go @@ -1,3 +1,2 @@ // The ms module has been writen as a lightweight replacement for some parts of the libmseed C library. -// package ms diff --git a/vendor/github.com/GeoNet/kit/seis/ms/header.go b/vendor/github.com/GeoNet/kit/seis/ms/header.go index d22163eb..55c3b193 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/header.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/header.go @@ -3,6 +3,7 @@ package ms import ( "encoding/binary" "fmt" + //TODO: needs v 1.15 "hash/maphash" "strconv" "strings" diff --git a/vendor/github.com/GeoNet/kit/seis/ms/record.go b/vendor/github.com/GeoNet/kit/seis/ms/record.go index c4e8e5e7..e4fe13c4 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/record.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/record.go @@ -44,7 +44,7 @@ type Record struct { } // NewMSRecord decodes and unpacks the record samples from a byte slice and returns a Record pointer, -//or an empty pointer and an error if it could not be decoded. +// or an empty pointer and an error if it could not be decoded. func NewRecord(buf []byte) (*Record, error) { var r Record diff --git a/vendor/github.com/GeoNet/kit/seis/ms/steim.go b/vendor/github.com/GeoNet/kit/seis/ms/steim.go index f0ee66a1..6abafc11 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/steim.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/steim.go @@ -14,7 +14,7 @@ func getNibble(word []byte, index int) uint8 { return res } -//value must be 0, 1, 2 or 3, the nibble must not have been previously set +// value must be 0, 1, 2 or 3, the nibble must not have been previously set func writeNibble(word []byte, index int, value uint8) { b := word[index/4] i := index % 4 @@ -23,7 +23,7 @@ func writeNibble(word []byte, index int, value uint8) { } /* - Takes v: an integer where only the first numbits bits are used to represent the number and returns an int32 +Takes v: an integer where only the first numbits bits are used to represent the number and returns an int32 */ func uintVarToInt32(v uint32, numbits uint8) int32 { neg := (v & (0x1 << (numbits - 1))) != 0 //check positive/negative diff --git a/vendor/github.com/GeoNet/kit/slogger/logger.go b/vendor/github.com/GeoNet/kit/slogger/logger.go new file mode 100644 index 00000000..5f45a8f3 --- /dev/null +++ b/vendor/github.com/GeoNet/kit/slogger/logger.go @@ -0,0 +1,82 @@ +package slogger + +import ( + "fmt" + "log" + "strings" + "sync" + "time" +) + +/* +SmartLogger wraps around the standard logger and adds functionality to avoid repeated logs. +usage: +logger := NewSmartLogger(2*time.Second, "error connecting to:") +logger.Log("error connecting to AVLN") +logger.Log("error connecting to AUCK") +*/ +type SmartLogger struct { + window time.Duration //time window to calculate repeated messages + repeatedPrefix string //message prefix to evaluate, compare whole message if not specified + + mu sync.Mutex + lastMessage string + lastLogTime time.Time + repeatCount int +} + +// NewSmartLogger creates a new SmartLogger with the given time window for detecting repeated messages. +// and an optional predefined message prefix +func NewSmartLogger(window time.Duration, repeatedPrefix string) *SmartLogger { + sl := &SmartLogger{ + window: window, + repeatedPrefix: repeatedPrefix, + } + return sl +} + +// Log logs a message, checking if it is repeated within the time window +// return the repeatCount +func (sl *SmartLogger) Log(message ...any) int { + sl.mu.Lock() + defer sl.mu.Unlock() + now := time.Now() + msgString := fmt.Sprintln(message...) + repeated := sl.checkRepeated(msgString) + if repeated && now.Sub(sl.lastLogTime) <= sl.window { + sl.repeatCount++ + } else { + if sl.repeatedPrefix != "" && !repeated { //this is a random message + log.Println(msgString) + sl.lastMessage = "" // Reset lastMessage to avoid tracking it as a repeated message + sl.lastLogTime = time.Time{} // Reset the time + return 0 + } + sl.flush() + sl.lastMessage = msgString + sl.lastLogTime = now + sl.repeatCount = 1 + } + return sl.repeatCount +} + +// flush writes out the summary of repeated messages +func (sl *SmartLogger) flush() { + if sl.repeatCount > 1 { + if sl.repeatedPrefix != "" { + log.Printf("message with prefix \"%s\" repeated %d times", sl.repeatedPrefix, sl.repeatCount) + } else { + log.Printf("message \"%s\" repeated %d times", sl.lastMessage, sl.repeatCount) + } + sl.repeatCount = 0 + } +} + +// checks if message is repeated +func (sl *SmartLogger) checkRepeated(message string) bool { + if sl.repeatedPrefix != "" { + return strings.HasPrefix(message, sl.repeatedPrefix) + } else { + return message == sl.lastMessage + } +} diff --git a/vendor/github.com/GeoNet/kit/weft/weft.go b/vendor/github.com/GeoNet/kit/weft/weft.go index 4f27089c..71dd8195 100644 --- a/vendor/github.com/GeoNet/kit/weft/weft.go +++ b/vendor/github.com/GeoNet/kit/weft/weft.go @@ -73,9 +73,9 @@ func (s StatusError) Status() int { // Status returns the HTTP status code appropriate for err. // It returns: -// * http.StatusOk if err is nil -// * err.Code if err is a StatusErr and Code is set -// * otherwise http.StatusServiceUnavailable +// - http.StatusOk if err is nil +// - err.Code if err is a StatusErr and Code is set +// - otherwise http.StatusServiceUnavailable func Status(err error) int { if err == nil { return http.StatusOK diff --git a/vendor/modules.txt b/vendor/modules.txt index fd891b87..5dbd5322 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,11 +1,13 @@ -# github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60 +# github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 ## explicit; go 1.21 github.com/GeoNet/kit/aws/s3 github.com/GeoNet/kit/aws/sqs github.com/GeoNet/kit/cfg +github.com/GeoNet/kit/health github.com/GeoNet/kit/metrics github.com/GeoNet/kit/sc3ml github.com/GeoNet/kit/seis/ms +github.com/GeoNet/kit/slogger github.com/GeoNet/kit/weft github.com/GeoNet/kit/weft/wefttest github.com/GeoNet/kit/wgs84