From 3036328d0b044610229afd58f3d84d3667dd9d4f Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 9 Jan 2024 18:20:09 +0200 Subject: [PATCH 1/4] async-tasks-improvements --- db/db.go | 118 ++++++++++++++++++++++--- models/async_tasks.go | 2 + server/background_tasks.go | 13 ++- server/memphis_handlers_async_tasks.go | 4 +- server/memphis_handlers_stations.go | 29 +++--- 5 files changed, 139 insertions(+), 27 deletions(-) diff --git a/db/db.go b/db/db.go index 9852b4cf5..b3863582e 100644 --- a/db/db.go +++ b/db/db.go @@ -580,6 +580,8 @@ func createTables(MetadataDbClient MetadataStorage) error { tenant_name VARCHAR NOT NULL DEFAULT '$memphis', station_id INT NOT NULL, created_by VARCHAR NOT NULL, + status VARCHAR NOT NULL, + failure_reason VARCHAR NOT NULL DEFAULT '', PRIMARY KEY (id) );` @@ -603,6 +605,8 @@ func createTables(MetadataDbClient MetadataStorage) error { SELECT 1 FROM information_schema.tables WHERE table_name = 'async_tasks' AND table_schema = 'public' ) THEN ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS created_by VARCHAR NOT NULL; + ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS status VARCHAR NOT NULL; + ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS failure_reason VARCHAR NOT NULL DEFAULT ''; END IF; IF EXISTS ( SELECT 1 FROM information_schema.table_constraints @@ -7245,7 +7249,7 @@ func UpsertAsyncTask(task, brokerInCharge string, createdAt time.Time, tenantNam } defer conn.Release() - query := `INSERT INTO async_tasks (name, broker_in_charge, created_at, updated_at, tenant_name, station_id, created_by) VALUES($1, $2, $3, $4, $5, $6, $7) RETURNING *` + query := `INSERT INTO async_tasks (name, broker_in_charge, created_at, updated_at, tenant_name, station_id, created_by, status) VALUES($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *` stmt, err := conn.Conn().Prepare(ctx, "upsert_async_task", query) if err != nil { return models.AsyncTask{}, err @@ -7253,7 +7257,7 @@ func UpsertAsyncTask(task, brokerInCharge string, createdAt time.Time, tenantNam updatedAt := createdAt var asyncTask models.AsyncTask - err = conn.Conn().QueryRow(ctx, stmt.Name, task, brokerInCharge, createdAt, updatedAt, tenantName, stationId, username).Scan( + err = conn.Conn().QueryRow(ctx, stmt.Name, task, brokerInCharge, createdAt, updatedAt, tenantName, stationId, username, "running").Scan( &asyncTask.ID, &asyncTask.Name, &asyncTask.BrokrInCharge, @@ -7263,6 +7267,8 @@ func UpsertAsyncTask(task, brokerInCharge string, createdAt time.Time, tenantNam &asyncTask.TenantName, &asyncTask.StationId, &asyncTask.CreatedBy, + &asyncTask.Status, + &asyncTask.InvalidReason, ) if err != nil { @@ -7370,7 +7376,7 @@ func GetAsyncTaskByNameAndBrokerName(task, brokerName string) (bool, []models.As return true, asyncTask, nil } -func GetAllAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { +func GetActiveAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -7379,12 +7385,31 @@ func GetAllAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { return []models.AsyncTaskRes{}, err } defer conn.Release() - query := `SELECT a.id, a.name, a.created_at, a.created_by, s.name, a.meta_data - FROM async_tasks AS a - LEFT JOIN stations AS s ON a.station_id = s.id - WHERE a.tenant_name = $1 - ORDER BY a.created_at DESC;` - stmt, err := conn.Conn().Prepare(ctx, "get_all_async_tasks", query) + query := `SELECT + a.id, + a.name, + a.created_at, + a.created_by, + s.name, + a.meta_data + FROM + async_tasks AS a +LEFT JOIN stations AS s ON a.station_id = s.id + WHERE + a.tenant_name = $1 + AND (a.status = 'running' OR a.id IN ( + SELECT + a.id + FROM + async_tasks + WHERE + a.tenant_name = $1 + ORDER BY + a.updated_at DESC + LIMIT 10 + )); + ` + stmt, err := conn.Conn().Prepare(ctx, "get_active_async_tasks", query) if err != nil { return []models.AsyncTaskRes{}, err } @@ -7453,7 +7478,68 @@ func UpdateAsyncTask(task, tenantName string, updatedAt time.Time, metaData inte return nil } -func RemoveAsyncTask(task, tenantName string, stationId int) error { +func UpdateStatusAsyncTask(task, tenantName, status string, stationId int) error { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + query := `UPDATE async_tasks SET status = $1 WHERE name = $3 AND tenant_name=$4 AND station_id = $5` + stmt, err := conn.Conn().Prepare(ctx, "edit_status_async_task_by_task_and_tenant_name_and_station_id", query) + if err != nil { + return err + } + if tenantName != conf.GlobalAccount { + tenantName = strings.ToLower(tenantName) + } + _, err = conn.Conn().Query(ctx, stmt.Name, status, task, tenantName, stationId) + if err != nil { + return err + } + return nil +} + +func UpdateReasonAsyncTask(task, tenantName, status string, stationId int, reason, functionName string) error { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + var query string + var preparedStmt string + if stationId == -1 { + query = `UPDATE async_tasks SET status = $1, failure_reason = $2 WHERE name = $3 AND tenant_name=$4 AND station_id = $5 AND meta_data->>'name' = $6` + preparedStmt = "edit_status_functions_async_task_by_task_and_tenant_name_and_station_id" + } else { + query = `UPDATE async_tasks SET status = $1, failure_reason = $2 WHERE name = $3 AND tenant_name=$4 AND station_id = $5` + preparedStmt = "edit_status_resend_async_task_by_task_and_tenant_name_and_station_id" + + } + stmt, err := conn.Conn().Prepare(ctx, preparedStmt, query) + if err != nil { + return err + } + tenantName = strings.ToLower(tenantName) + if stationId == -1 { + _, err = conn.Conn().Query(ctx, stmt.Name, status, reason, task, tenantName, stationId, functionName) + if err != nil { + return err + } + } else { + _, err = conn.Conn().Query(ctx, stmt.Name, status, reason, task, tenantName, stationId) + if err != nil { + return err + } + } + + return nil +} + +func RemoveAsyncTasks() error { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() conn, err := MetadataDbClient.Client.Acquire(ctx) @@ -7461,12 +7547,18 @@ func RemoveAsyncTask(task, tenantName string, stationId int) error { return err } defer conn.Release() - query := `DELETE FROM async_tasks WHERE name = $1 AND tenant_name=$2 AND station_id = $3` - stmt, err := conn.Conn().Prepare(ctx, "remove_async_task_by_name_and_tenant_name_and_station_id", query) + query := `DELETE FROM async_tasks + WHERE status != 'running' AND id NOT IN ( + SELECT id + FROM async_tasks + ORDER BY updated_at DESC + LIMIT 10 + );` + stmt, err := conn.Conn().Prepare(ctx, "remove_async_task", query) if err != nil { return err } - _, err = conn.Conn().Exec(ctx, stmt.Name, task, tenantName, stationId) + _, err = conn.Conn().Exec(ctx, stmt.Name) if err != nil { return err } diff --git a/models/async_tasks.go b/models/async_tasks.go index 409e98691..4b90b4b1c 100644 --- a/models/async_tasks.go +++ b/models/async_tasks.go @@ -12,6 +12,8 @@ type AsyncTask struct { TenantName string `json:"tenant_name"` StationId int `json:"station_id"` CreatedBy string `json:"created_by"` + Status string `json:"status"` + InvalidReason string `json:"invalid_reason"` } type AsyncTaskRes struct { diff --git a/server/background_tasks.go b/server/background_tasks.go index c1944c23c..caeffbf96 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -354,8 +354,9 @@ func (s *Server) StartBackgroundTasks() error { go s.ConsumeFunctionTasks() go s.ScaleFunctionWorkers() go s.ConnectorsDeadPodsRescheduler() + go s.RemoveOldAndCompletedAsyncTasks() - return nil + return nil } func (s *Server) uploadMsgsToTier2Storage() { @@ -853,3 +854,13 @@ func (s *Server) ReleaseStuckLocks() { } } + +func (s *Server) RemoveOldAndCompletedAsyncTasks() { + ticker := time.NewTicker(15 * time.Minute) + for range ticker.C { + err := db.RemoveAsyncTasks() + if err != nil { + serv.Errorf("RemoveOldAndCompletedAsyncTasks at RemoveAsyncTasks : %v", err.Error()) + } + } +} diff --git a/server/memphis_handlers_async_tasks.go b/server/memphis_handlers_async_tasks.go index cb1f0cd1d..76c3698ea 100644 --- a/server/memphis_handlers_async_tasks.go +++ b/server/memphis_handlers_async_tasks.go @@ -103,9 +103,9 @@ func (ash AsyncTasksHandler) GetAsyncTasks(c *gin.Context) { } func (ash AsyncTasksHandler) GetAllAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { - asyncTasks, err := db.GetAllAsyncTasks(tenantName) + asyncTasks, err := db.GetActiveAsyncTasks(tenantName) if err != nil { - serv.Errorf("GetAllAsyncTasks at GetAllAsyncTasks: %v", err.Error()) + serv.Errorf("GetAllAsyncTasks at GetActiveAsyncTasks: %v", err.Error()) return []models.AsyncTaskRes{}, err } for i, task := range asyncTasks { diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 28b9956bf..7f425e8f5 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -2627,13 +2627,13 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName err := db.UpdateResendDisabledInStations(true, []int{stationId}) if err != nil { serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure(user, stationId, tenantName, stationName) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) return } task, err := db.UpsertAsyncTask("resend_all_dls_msgs", s.opts.ServerName, createdAt, tenantName, stationId, user.Username) if err != nil { serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpsertAsyncTask at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure(user, stationId, tenantName, stationName) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) return } @@ -2645,14 +2645,14 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName _, maxId, err = db.GetMinMaxIdsOfDlsMsgsByUpdatedAt(tenantName, createdAt, stationId) if err != nil { serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure(user, stationId, tenantName, stationName) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) return } } else { minId, maxId, err = db.GetMinMaxIdsOfDlsMsgsByUpdatedAt(tenantName, createdAt, stationId) if err != nil { serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure(user, stationId, tenantName, stationName) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) return } // -1 in order to prevent skipping the first element @@ -2663,7 +2663,7 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName _, dlsMsgs, err := db.GetDlsMsgsBatch(tenantName, minId, maxId, stationId) if err != nil { serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetDlsMsgsBatch at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure(user, stationId, tenantName, stationName) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) return } @@ -2687,17 +2687,17 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName } minId = offset if len(dlsMsgs) == 0 || offset == maxId { - err = db.RemoveAsyncTask(task.Name, tenantName, stationId) + err = db.UpdateStatusAsyncTask(task.Name, tenantName, "completed", stationId) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at RemoveAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure(user, stationId, tenantName, stationName) + serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateStatusAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error()) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) return } err = db.UpdateResendDisabledInStations(false, []int{stationId}) if err != nil { serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure(user, stationId, tenantName, stationName) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) return } @@ -2717,10 +2717,11 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName }() } -func (s *Server) handleResendAllFailure(user models.User, stationId int, tenantName, stationName string) { +func (s *Server) handleResendAllFailure(taskName string, user models.User, stationId int, tenantName, stationName string) { + msgErr := fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationName, user.Username) systemMessage := SystemMessage{ MessageType: "error", - MessagePayload: fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationName, user.Username), + MessagePayload: msgErr, } err := serv.sendSystemMessageOnWS(user, systemMessage) if err != nil { @@ -2732,4 +2733,10 @@ func (s *Server) handleResendAllFailure(user models.User, stationId int, tenantN serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateResendDisabledInStations at station %v : %v", tenantName, user.Username, stationName, err.Error()) return } + + err = db.UpdateReasonAsyncTask(taskName, tenantName, "failed", stationId, msgErr, "") + if err != nil { + serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateReasonAsyncTask at station %v : %v", tenantName, user.Username, stationName, err.Error()) + return + } } From c05b31df5f8618ad8611a50ac29d3d512e9ab1e0 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 9 Jan 2024 19:20:35 +0200 Subject: [PATCH 2/4] copy cahnges from cloud --- db/db.go | 14 +++++++++----- models/async_tasks.go | 14 ++++++++------ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/db/db.go b/db/db.go index b3863582e..a85e65d07 100644 --- a/db/db.go +++ b/db/db.go @@ -7391,11 +7391,13 @@ func GetActiveAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { a.created_at, a.created_by, s.name, - a.meta_data - FROM - async_tasks AS a -LEFT JOIN stations AS s ON a.station_id = s.id - WHERE + a.meta_data, + a.status, + a.failure_reason + FROM + async_tasks AS a + LEFT JOIN stations AS s ON a.station_id = s.id + WHERE a.tenant_name = $1 AND (a.status = 'running' OR a.id IN ( SELECT @@ -7432,6 +7434,8 @@ LEFT JOIN stations AS s ON a.station_id = s.id &task.CreatedBy, &sName, &task.Data, + &task.Status, + &task.InvalidReason, ) if err != nil { return []models.AsyncTaskRes{}, err diff --git a/models/async_tasks.go b/models/async_tasks.go index 4b90b4b1c..b00428d47 100644 --- a/models/async_tasks.go +++ b/models/async_tasks.go @@ -17,12 +17,14 @@ type AsyncTask struct { } type AsyncTaskRes struct { - ID int `json:"id"` - Name string `json:"name"` - CreatedAt time.Time `json:"created_at"` - CreatedBy string `json:"created_by"` - StationName string `json:"station_name"` - Data interface{} `json:"data"` + ID int `json:"id"` + Name string `json:"name"` + CreatedAt time.Time `json:"created_at"` + CreatedBy string `json:"created_by"` + StationName string `json:"station_name"` + Data interface{} `json:"data"` + Status string `json:"status"` + InvalidReason string `json:"invalid_reason"` } type MetaData struct { From 2e97b436e658c8315fdb1f05ca5a359e54f924a2 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 10 Jan 2024 10:56:55 +0200 Subject: [PATCH 3/4] add return --- server/memphis_handlers_monitoring.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/memphis_handlers_monitoring.go b/server/memphis_handlers_monitoring.go index 796db1012..9f9542b41 100644 --- a/server/memphis_handlers_monitoring.go +++ b/server/memphis_handlers_monitoring.go @@ -1385,6 +1385,7 @@ func (mh MonitoringHandler) GetGraphOverview(c *gin.Context) { if err != nil { serv.Errorf("[tenant: %v][user: %v]%v", user.TenantName, user.Username, err.Error()) c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return } c.IndentedJSON(200, res) From bfc2870eab6f003b6f168d12928465c6bc6eddda Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 10 Jan 2024 15:15:09 +0200 Subject: [PATCH 4/4] changes --- db/db.go | 81 +++++++++++--------------- models/async_tasks.go | 4 +- server/background_tasks.go | 8 +-- server/memphis_handlers_async_tasks.go | 14 ++--- server/memphis_handlers_stations.go | 52 ++++++++--------- server/memphis_handlers_ws.go | 8 +-- 6 files changed, 76 insertions(+), 91 deletions(-) diff --git a/db/db.go b/db/db.go index a85e65d07..19d18250a 100644 --- a/db/db.go +++ b/db/db.go @@ -580,7 +580,7 @@ func createTables(MetadataDbClient MetadataStorage) error { tenant_name VARCHAR NOT NULL DEFAULT '$memphis', station_id INT NOT NULL, created_by VARCHAR NOT NULL, - status VARCHAR NOT NULL, + status VARCHAR NOT NULL DEFAULT 'running', failure_reason VARCHAR NOT NULL DEFAULT '', PRIMARY KEY (id) );` @@ -605,7 +605,7 @@ func createTables(MetadataDbClient MetadataStorage) error { SELECT 1 FROM information_schema.tables WHERE table_name = 'async_tasks' AND table_schema = 'public' ) THEN ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS created_by VARCHAR NOT NULL; - ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS status VARCHAR NOT NULL; + ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS status VARCHAR NOT NULL DEFAULT 'running'; ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS failure_reason VARCHAR NOT NULL DEFAULT ''; END IF; IF EXISTS ( @@ -7268,7 +7268,7 @@ func UpsertAsyncTask(task, brokerInCharge string, createdAt time.Time, tenantNam &asyncTask.StationId, &asyncTask.CreatedBy, &asyncTask.Status, - &asyncTask.InvalidReason, + &asyncTask.FailureReason, ) if err != nil { @@ -7376,7 +7376,7 @@ func GetAsyncTaskByNameAndBrokerName(task, brokerName string) (bool, []models.As return true, asyncTask, nil } -func GetActiveAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { +func GetActiveAndUpdatedAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -7390,28 +7390,35 @@ func GetActiveAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { a.name, a.created_at, a.created_by, - s.name, + s.name AS station_name, a.meta_data, a.status, a.failure_reason FROM - async_tasks AS a - LEFT JOIN stations AS s ON a.station_id = s.id + async_tasks AS a + LEFT JOIN + stations AS s ON a.station_id = s.id WHERE a.tenant_name = $1 AND (a.status = 'running' OR a.id IN ( - SELECT - a.id - FROM - async_tasks - WHERE - a.tenant_name = $1 - ORDER BY - a.updated_at DESC - LIMIT 10 - )); - ` - stmt, err := conn.Conn().Prepare(ctx, "get_active_async_tasks", query) + SELECT + id + FROM + async_tasks + WHERE + tenant_name = $1 + AND status <> 'running' + ORDER BY + created_at DESC + LIMIT 10 + )) + ORDER BY + CASE + WHEN a.status = 'running' THEN 0 + ELSE 1 + END, + created_at DESC;` + stmt, err := conn.Conn().Prepare(ctx, "get_active_and_updated_async_tasks", query) if err != nil { return []models.AsyncTaskRes{}, err } @@ -7435,7 +7442,7 @@ func GetActiveAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { &sName, &task.Data, &task.Status, - &task.InvalidReason, + &task.FailureReason, ) if err != nil { return []models.AsyncTaskRes{}, err @@ -7482,30 +7489,7 @@ func UpdateAsyncTask(task, tenantName string, updatedAt time.Time, metaData inte return nil } -func UpdateStatusAsyncTask(task, tenantName, status string, stationId int) error { - ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) - defer cancelfunc() - conn, err := MetadataDbClient.Client.Acquire(ctx) - if err != nil { - return err - } - defer conn.Release() - query := `UPDATE async_tasks SET status = $1 WHERE name = $3 AND tenant_name=$4 AND station_id = $5` - stmt, err := conn.Conn().Prepare(ctx, "edit_status_async_task_by_task_and_tenant_name_and_station_id", query) - if err != nil { - return err - } - if tenantName != conf.GlobalAccount { - tenantName = strings.ToLower(tenantName) - } - _, err = conn.Conn().Query(ctx, stmt.Name, status, task, tenantName, stationId) - if err != nil { - return err - } - return nil -} - -func UpdateReasonAsyncTask(task, tenantName, status string, stationId int, reason, functionName string) error { +func UpdateStatusAsyncTask(task, tenantName, status string, stationId int, failureReason, functionName string) error { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() conn, err := MetadataDbClient.Client.Acquire(ctx) @@ -7529,12 +7513,12 @@ func UpdateReasonAsyncTask(task, tenantName, status string, stationId int, reaso } tenantName = strings.ToLower(tenantName) if stationId == -1 { - _, err = conn.Conn().Query(ctx, stmt.Name, status, reason, task, tenantName, stationId, functionName) + _, err = conn.Conn().Query(ctx, stmt.Name, status, failureReason, task, tenantName, stationId, functionName) if err != nil { return err } } else { - _, err = conn.Conn().Query(ctx, stmt.Name, status, reason, task, tenantName, stationId) + _, err = conn.Conn().Query(ctx, stmt.Name, status, failureReason, task, tenantName, stationId) if err != nil { return err } @@ -7543,7 +7527,7 @@ func UpdateReasonAsyncTask(task, tenantName, status string, stationId int, reaso return nil } -func RemoveAsyncTasks() error { +func RemoveOldAsyncTasks() error { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() conn, err := MetadataDbClient.Client.Acquire(ctx) @@ -7555,10 +7539,11 @@ func RemoveAsyncTasks() error { WHERE status != 'running' AND id NOT IN ( SELECT id FROM async_tasks + WHERE status != 'running' ORDER BY updated_at DESC LIMIT 10 );` - stmt, err := conn.Conn().Prepare(ctx, "remove_async_task", query) + stmt, err := conn.Conn().Prepare(ctx, "remove_old_async_task", query) if err != nil { return err } diff --git a/models/async_tasks.go b/models/async_tasks.go index b00428d47..01d578b16 100644 --- a/models/async_tasks.go +++ b/models/async_tasks.go @@ -13,7 +13,7 @@ type AsyncTask struct { StationId int `json:"station_id"` CreatedBy string `json:"created_by"` Status string `json:"status"` - InvalidReason string `json:"invalid_reason"` + FailureReason string `json:"failure_reason"` } type AsyncTaskRes struct { @@ -24,7 +24,7 @@ type AsyncTaskRes struct { StationName string `json:"station_name"` Data interface{} `json:"data"` Status string `json:"status"` - InvalidReason string `json:"invalid_reason"` + FailureReason string `json:"failure_reason"` } type MetaData struct { diff --git a/server/background_tasks.go b/server/background_tasks.go index caeffbf96..1fb308264 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -354,7 +354,7 @@ func (s *Server) StartBackgroundTasks() error { go s.ConsumeFunctionTasks() go s.ScaleFunctionWorkers() go s.ConnectorsDeadPodsRescheduler() - go s.RemoveOldAndCompletedAsyncTasks() + go s.removeOldAsyncTasks() return nil } @@ -855,12 +855,12 @@ func (s *Server) ReleaseStuckLocks() { } } -func (s *Server) RemoveOldAndCompletedAsyncTasks() { +func (s *Server) removeOldAsyncTasks() { ticker := time.NewTicker(15 * time.Minute) for range ticker.C { - err := db.RemoveAsyncTasks() + err := db.RemoveOldAsyncTasks() if err != nil { - serv.Errorf("RemoveOldAndCompletedAsyncTasks at RemoveAsyncTasks : %v", err.Error()) + serv.Errorf("RemoveOldAsyncTasks at db.RemoveOldAsyncTasks : %v", err.Error()) } } } diff --git a/server/memphis_handlers_async_tasks.go b/server/memphis_handlers_async_tasks.go index 76c3698ea..9ea7478c4 100644 --- a/server/memphis_handlers_async_tasks.go +++ b/server/memphis_handlers_async_tasks.go @@ -28,7 +28,7 @@ type AsyncTasksHandler struct{} func (s *Server) CompleteRelevantStuckAsyncTasks() { exist, asyncTasks, err := db.GetAsyncTaskByNameAndBrokerName("resend_all_dls_msgs", s.opts.ServerName) if err != nil { - serv.Errorf("CompleteRelevantStuckAsyncTasks: failed to get async tasks resend_all_dls_msgs: %v", err.Error()) + s.Errorf("CompleteRelevantStuckAsyncTasks: failed to get async tasks resend_all_dls_msgs: %v", err.Error()) return } if !exist { @@ -38,22 +38,22 @@ func (s *Server) CompleteRelevantStuckAsyncTasks() { for _, asyncTask := range asyncTasks { exist, station, err := db.GetStationById(asyncTask.StationId, asyncTask.TenantName) if err != nil { - serv.Errorf("[tenant: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, err.Error()) + s.Errorf("[tenant: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, err.Error()) return } if !exist { errMsg := fmt.Sprintf("Station %v does not exist", station.Name) - serv.Warnf("[tenant: %v][user: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, station.CreatedByUsername, errMsg) + s.Warnf("[tenant: %v][user: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, station.CreatedByUsername, errMsg) continue } exist, user, err := memphis_cache.GetUser(station.CreatedByUsername, asyncTask.TenantName, false) if err != nil { - serv.Errorf("[tenant:%v][user: %v] CompleteRelevantStuckAsyncTasks could not retrive user model from cache or db error: %v", asyncTask.TenantName, user.Username, err) + s.Errorf("[tenant:%v][user: %v] CompleteRelevantStuckAsyncTasks could not retrive user model from cache or db error: %v", asyncTask.TenantName, user.Username, err) continue } if !exist { - serv.Warnf("[tenant:%v][user: %v] CompleteRelevantStuckAsyncTasks user does not exist", asyncTask.TenantName, user.Username) + s.Warnf("[tenant:%v][user: %v] CompleteRelevantStuckAsyncTasks user does not exist", asyncTask.TenantName, user.Username) continue } @@ -103,9 +103,9 @@ func (ash AsyncTasksHandler) GetAsyncTasks(c *gin.Context) { } func (ash AsyncTasksHandler) GetAllAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) { - asyncTasks, err := db.GetActiveAsyncTasks(tenantName) + asyncTasks, err := db.GetActiveAndUpdatedAsyncTasks(tenantName) if err != nil { - serv.Errorf("GetAllAsyncTasks at GetActiveAsyncTasks: %v", err.Error()) + serv.Errorf("GetAllAsyncTasks at GetActiveAndUpdatedAsyncTasks: %v", err.Error()) return []models.AsyncTaskRes{}, err } for i, task := range asyncTasks { diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 7f425e8f5..d70689a50 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -2626,14 +2626,14 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName err := db.UpdateResendDisabledInStations(true, []int{stationId}) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error()) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error()) return } task, err := db.UpsertAsyncTask("resend_all_dls_msgs", s.opts.ServerName, createdAt, tenantName, stationId, user.Username) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpsertAsyncTask at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpsertAsyncTask at station %v : %v", tenantName, username, stationName, err.Error()) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error()) return } @@ -2644,15 +2644,15 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName minId = int(data) _, maxId, err = db.GetMinMaxIdsOfDlsMsgsByUpdatedAt(tenantName, createdAt, stationId) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error()) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error()) return } } else { minId, maxId, err = db.GetMinMaxIdsOfDlsMsgsByUpdatedAt(tenantName, createdAt, stationId) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error()) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error()) return } // -1 in order to prevent skipping the first element @@ -2662,8 +2662,8 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName for { _, dlsMsgs, err := db.GetDlsMsgsBatch(tenantName, minId, maxId, stationId) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetDlsMsgsBatch at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetDlsMsgsBatch at station %v : %v", tenantName, username, stationName, err.Error()) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error()) return } @@ -2673,31 +2673,31 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName data = models.MetaData{ Offset: offset, } - _, err = serv.ResendUnackedMsg(dlsMsg, user, stationName) + _, err = s.ResendUnackedMsg(dlsMsg, user, stationName) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at ResendUnackedMsg at station %v : %v", tenantName, username, stationName, err.Error()) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at ResendUnackedMsg at station %v : %v", tenantName, username, stationName, err.Error()) continue } } err = db.UpdateAsyncTask(task.Name, tenantName, time.Now(), data, stationId) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error()) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error()) continue } minId = offset if len(dlsMsgs) == 0 || offset == maxId { - err = db.UpdateStatusAsyncTask(task.Name, tenantName, "completed", stationId) + err = db.UpdateStatusAsyncTask(task.Name, tenantName, "completed", stationId, "", "") if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateStatusAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateStatusAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error()) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error()) return } err = db.UpdateResendDisabledInStations(false, []int{stationId}) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error()) - s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error()) + s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error()) return } @@ -2706,9 +2706,9 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName MessagePayload: fmt.Sprintf("Resend all unacked messages operation at station %s, triggered by user %s has been completed successfully", stationName, username), } - err = serv.sendSystemMessageOnWS(user, systemMessage) + err = s.sendSystemMessageOnWS(user, systemMessage) if err != nil { - serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at sendSystemMessageOnWS at station %v : %v", tenantName, username, stationName, err.Error()) + s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at sendSystemMessageOnWS at station %v : %v", tenantName, username, stationName, err.Error()) return } break @@ -2717,26 +2717,26 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName }() } -func (s *Server) handleResendAllFailure(taskName string, user models.User, stationId int, tenantName, stationName string) { - msgErr := fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationName, user.Username) +func (s *Server) handleResendAllFailure(taskName string, user models.User, stationId int, tenantName, stationName string, errMsg string) { + msgErr := fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error", stationName, user.Username) systemMessage := SystemMessage{ MessageType: "error", MessagePayload: msgErr, } - err := serv.sendSystemMessageOnWS(user, systemMessage) + err := s.sendSystemMessageOnWS(user, systemMessage) if err != nil { serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at sendSystemMessageOnWS at station %v : %v", tenantName, user.Username, stationName, err.Error()) return } err = db.UpdateResendDisabledInStations(false, []int{stationId}) if err != nil { - serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateResendDisabledInStations at station %v : %v", tenantName, user.Username, stationName, err.Error()) + s.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateResendDisabledInStations at station %v : %v", tenantName, user.Username, stationName, err.Error()) return } - err = db.UpdateReasonAsyncTask(taskName, tenantName, "failed", stationId, msgErr, "") + err = db.UpdateStatusAsyncTask(taskName, tenantName, "failed", stationId, errMsg, "") if err != nil { - serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateReasonAsyncTask at station %v : %v", tenantName, user.Username, stationName, err.Error()) + s.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateReasonAsyncTask at station %v : %v", tenantName, user.Username, stationName, err.Error()) return } } diff --git a/server/memphis_handlers_ws.go b/server/memphis_handlers_ws.go index adfb22760..5aa62d156 100644 --- a/server/memphis_handlers_ws.go +++ b/server/memphis_handlers_ws.go @@ -529,9 +529,9 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, } func (s *Server) sendSystemMessageOnWS(user models.User, systemMessage SystemMessage) error { - v, err := serv.Varz(nil) + v, err := s.Varz(nil) if err != nil { - serv.Errorf("[tenant: %v][user: %v]sendSystemMessageOnWS: %v", user.TenantName, user.Username, err.Error()) + s.Errorf("[tenant: %v][user: %v]sendSystemMessageOnWS: %v", user.TenantName, user.Username, err.Error()) return err } var serverNames []string @@ -542,7 +542,7 @@ func (s *Server) sendSystemMessageOnWS(user models.User, systemMessage SystemMes serverNames = append(serverNames, "memphis-"+strconv.Itoa(i)) } - acc, err := serv.lookupAccount(user.TenantName) + acc, err := s.lookupAccount(user.TenantName) if err != nil { err = fmt.Errorf("sendSystemMessageOnWS at lookupAccount: %v", err.Error()) return err @@ -566,7 +566,7 @@ func (s *Server) sendSystemMessageOnWS(user models.User, systemMessage SystemMes for _, serverName := range serverNames { replySubj := fmt.Sprintf(memphisWS_TemplSubj_Publish, memphisWS_Subj_GetSystemMessages+"."+serverName) - serv.sendInternalAccountMsgWithEcho(acc, replySubj, updateRaw) + s.sendInternalAccountMsgWithEcho(acc, replySubj, updateRaw) } return nil