Skip to content

Commit

Permalink
helper function to execute jobs with tracing and sentry monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Pascal-Delange committed May 24, 2024
1 parent 9388096 commit 8b2e5ba
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 162 deletions.
69 changes: 12 additions & 57 deletions jobs/execute_all_scheduled_scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,24 @@ package jobs

import (
"context"
"fmt"
"strings"

"github.com/checkmarble/marble-backend/tracing"
"github.com/checkmarble/marble-backend/usecases"
"github.com/checkmarble/marble-backend/utils"
"github.com/getsentry/sentry-go"
)

// Runs every minute
func ExecuteAllScheduledScenarios(ctx context.Context, usecases usecases.Usecases, config tracing.Configuration) error {
telemetryRessources, err := tracing.Init(config)
if err != nil {
return fmt.Errorf("error initializing tracing: %w", err)
}
ctx = utils.StoreOpenTelemetryTracerInContext(ctx, telemetryRessources.Tracer)

logger := utils.LoggerFromContext(ctx)
logger.InfoContext(ctx, "Start pending scheduled executions")

checkinId := sentry.CaptureCheckIn(
&sentry.CheckIn{
MonitorSlug: "scheduled-execution",
Status: sentry.CheckInStatusInProgress,
func ExecuteAllScheduledScenarios(ctx context.Context, uc usecases.Usecases, config tracing.Configuration) error {
return executeWithMonitoring(
ctx,
uc,
config,
"scheduled-execution",
func(
ctx context.Context, usecases usecases.Usecases,
) error {
usecasesWithCreds := GenerateUsecaseWithCredForMarbleAdmin(ctx, usecases)
runScheduledExecution := usecasesWithCreds.NewRunScheduledExecution()
return runScheduledExecution.ExecuteAllScheduledScenarios(ctx)
},
nil,
)
fmt.Println("checkinId: ", *checkinId)

usecasesWithCreds := GenerateUsecaseWithCredForMarbleAdmin(ctx, usecases)
runScheduledExecution := usecasesWithCreds.NewRunScheduledExecution()
err = runScheduledExecution.ExecuteAllScheduledScenarios(ctx)
if err != nil {
// Known issue where Cloud Run will sometimes fail to create the unix socket to connect to CloudSQL. In this case, we don't log the error in Sentry.
if strings.Contains(err.Error(), "failed to connect to `host=/cloudsql/") {
logger.WarnContext(ctx, "Failed to create unix socket to connect to CloudSQL. Wait for the next execution of the job.")
return nil
}
sentry.CaptureCheckIn(
&sentry.CheckIn{
ID: *checkinId,
MonitorSlug: "scheduled-execution",
Status: sentry.CheckInStatusError,
},
nil,
)
if hub := sentry.GetHubFromContext(ctx); hub != nil {
hub.CaptureException(err)
} else {
sentry.CaptureException(err)
}
return fmt.Errorf("error executing scheduled scenarios: %w", err)
}

sentry.CaptureCheckIn(
&sentry.CheckIn{
ID: *checkinId,
MonitorSlug: "scheduled-execution",
Status: sentry.CheckInStatusOK,
},
nil,
)

logger.InfoContext(ctx, "Done executing all due scenarios")
return nil
}
72 changes: 72 additions & 0 deletions jobs/execute_with_monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package jobs

import (
"context"
"fmt"
"strings"

"github.com/checkmarble/marble-backend/tracing"
"github.com/checkmarble/marble-backend/usecases"
"github.com/checkmarble/marble-backend/utils"
"github.com/getsentry/sentry-go"
)

func executeWithMonitoring(
ctx context.Context,
uc usecases.Usecases,
config tracing.Configuration,
jobName string,
fn func(context.Context, usecases.Usecases) error,
) error {
telemetryRessources, err := tracing.Init(config)
if err != nil {
return fmt.Errorf("error initializing tracing: %w", err)
}
ctx = utils.StoreOpenTelemetryTracerInContext(ctx, telemetryRessources.Tracer)

logger := utils.LoggerFromContext(ctx)
logger.InfoContext(ctx, fmt.Sprintf("Start job %s", jobName))

checkinId := sentry.CaptureCheckIn(
&sentry.CheckIn{
MonitorSlug: jobName,
Status: sentry.CheckInStatusInProgress,
},
nil,
)

err = fn(ctx, uc)
if err != nil {
// Known issue where Cloud Run will sometimes fail to create the unix socket to connect to CloudSQL. In this case, we don't log the error in Sentry.
if strings.Contains(err.Error(), "failed to connect to `host=/cloudsql/") {
logger.WarnContext(ctx, "Failed to create unix socket to connect to CloudSQL. Wait for the next execution of the job.")
return nil
}
sentry.CaptureCheckIn(
&sentry.CheckIn{
ID: *checkinId,
MonitorSlug: jobName,
Status: sentry.CheckInStatusError,
},
nil,
)
if hub := sentry.GetHubFromContext(ctx); hub != nil {
hub.CaptureException(err)
} else {
sentry.CaptureException(err)
}
return fmt.Errorf("error executing job %s: %w", jobName, err)
}

sentry.CaptureCheckIn(
&sentry.CheckIn{
ID: *checkinId,
MonitorSlug: jobName,
Status: sentry.CheckInStatusOK,
},
nil,
)

logger.InfoContext(ctx, fmt.Sprintf("Done executing job %s", jobName))
return nil
}
68 changes: 14 additions & 54 deletions jobs/ingest_data_from_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,66 +2,26 @@ package jobs

import (
"context"
"fmt"
"strings"

"github.com/checkmarble/marble-backend/tracing"
"github.com/checkmarble/marble-backend/usecases"
"github.com/checkmarble/marble-backend/utils"
"github.com/getsentry/sentry-go"
)

func IngestDataFromCsv(ctx context.Context, usecases usecases.Usecases, config tracing.Configuration) error {
telemetryRessources, err := tracing.Init(config)
if err != nil {
return fmt.Errorf("error initializing tracing: %w", err)
}
ctx = utils.StoreOpenTelemetryTracerInContext(ctx, telemetryRessources.Tracer)

checkinId := sentry.CaptureCheckIn(
&sentry.CheckIn{
MonitorSlug: "batch-ingestion",
Status: sentry.CheckInStatusInProgress,
},
nil,
)

usecasesWithCreds := GenerateUsecaseWithCredForMarbleAdmin(ctx, usecases)
usecase := usecasesWithCreds.NewIngestionUseCase()
logger := utils.LoggerFromContext(ctx)
logger.InfoContext(ctx, "Start ingesting data from upload logs")

err = usecase.IngestDataFromCsv(ctx, logger)
if err != nil {
// Known issue where Cloud Run will sometimes fail to create the unix socket to connect to CloudSQL. In this case, we don't log the error in Sentry.
if strings.Contains(err.Error(), "failed to connect to `host=/cloudsql/") {
logger.WarnContext(ctx, "Failed to create unix socket to connect to CloudSQL. Wait for the next execution of the job.")
return nil
}
sentry.CaptureCheckIn(
&sentry.CheckIn{
ID: *checkinId,
MonitorSlug: "batch-ingestion",
Status: sentry.CheckInStatusError,
},
nil,
)
if hub := sentry.GetHubFromContext(ctx); hub != nil {
hub.CaptureException(err)
} else {
sentry.CaptureException(err)
}
return fmt.Errorf("failed to ingest data from upload logs: %w", err)
}
logger.InfoContext(ctx, "Completed ingesting data from upload logs")
sentry.CaptureCheckIn(
&sentry.CheckIn{
ID: *checkinId,
MonitorSlug: "batch-ingestion",
Status: sentry.CheckInStatusOK,
func IngestDataFromCsv(ctx context.Context, uc usecases.Usecases, config tracing.Configuration) error {
return executeWithMonitoring(
ctx,
uc,
config,
"batch-ingestion",
func(
ctx context.Context, usecases usecases.Usecases,
) error {
usecasesWithCreds := GenerateUsecaseWithCredForMarbleAdmin(ctx, usecases)
usecase := usecasesWithCreds.NewIngestionUseCase()
logger := utils.LoggerFromContext(ctx)
logger.InfoContext(ctx, "Start ingesting data from upload logs")
return usecase.IngestDataFromCsv(ctx, logger)
},
nil,
)

return nil
}
61 changes: 10 additions & 51 deletions jobs/schedule_due_scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package jobs

import (
"context"
"fmt"
"log/slog"
"strings"

"github.com/cockroachdb/errors"
"github.com/getsentry/sentry-go"

"github.com/checkmarble/marble-backend/models"
"github.com/checkmarble/marble-backend/tracing"
Expand All @@ -16,56 +13,18 @@ import (
)

// Runs every hour at past 10min
func ScheduleDueScenarios(ctx context.Context, usecases usecases.Usecases, config tracing.Configuration) error {
logger := utils.LoggerFromContext(ctx)
telemetryRessources, err := tracing.Init(config)
if err != nil {
return fmt.Errorf("error initializing tracing: %w", err)
}
ctx = utils.StoreOpenTelemetryTracerInContext(ctx, telemetryRessources.Tracer)

checkinId := sentry.CaptureCheckIn(
&sentry.CheckIn{
MonitorSlug: "scenario-scheduler",
Status: sentry.CheckInStatusInProgress,
},
nil,
)

logger.InfoContext(ctx, "Start scheduling scenarios")

err = scheduledScenarios(ctx, usecases)
if err != nil {
// Known issue where Cloud Run will sometimes fail to create the unix socket to connect to CloudSQL. In this case, we don't log the error in Sentry.
if strings.Contains(err.Error(), "failed to connect to `host=/cloudsql/") {
logger.WarnContext(ctx, "Failed to create unix socket to connect to CloudSQL. Wait for the next execution of the job.")
return nil
}
sentry.CaptureCheckIn(
&sentry.CheckIn{
ID: *checkinId,
MonitorSlug: "scenario-scheduler",
Status: sentry.CheckInStatusError,
},
nil,
)
if hub := sentry.GetHubFromContext(ctx); hub != nil {
hub.CaptureException(err)
} else {
sentry.CaptureException(err)
}
return fmt.Errorf("failed to schedule due scenarios: %w", err)
}

sentry.CaptureCheckIn(
&sentry.CheckIn{
ID: *checkinId,
MonitorSlug: "scenario-scheduler",
Status: sentry.CheckInStatusOK,
func ScheduleDueScenarios(ctx context.Context, uc usecases.Usecases, config tracing.Configuration) error {
return executeWithMonitoring(
ctx,
uc,
config,
"scenario-scheduler",
func(
ctx context.Context, usecases usecases.Usecases,
) error {
return scheduledScenarios(ctx, usecases)
},
nil,
)
return nil
}

func scheduledScenarios(ctx context.Context, usecases usecases.Usecases) error {
Expand Down

0 comments on commit 8b2e5ba

Please sign in to comment.