diff --git a/brokers/sqlite/broker.go b/brokers/sqlite/broker.go new file mode 100644 index 0000000..67a4b17 --- /dev/null +++ b/brokers/sqlite/broker.go @@ -0,0 +1,175 @@ +package sqlite + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + "time" + + _ "github.com/mattn/go-sqlite3" + "github.com/vmihailenco/msgpack/v5" + + "github.com/kalbhor/tasqueue/v2" +) + +type JobStatus string + +const ( + QUEUED JobStatus = "QUEUED" + PROCESSING JobStatus = "PROCESSING" + FAILED JobStatus = "FAILED" + COMPLETED JobStatus = "COMPLETED" +) + +type Broker struct { + db *sql.DB + log *slog.Logger +} + +type Options struct { + DataSource string +} + +func New(options Options, lo *slog.Logger) (*Broker, error) { + db, err := sql.Open("sqlite3", options.DataSource) + if err != nil { + return nil, err + } + + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS jobs( + id VARCHAR PRIMARY KEY, + queue VARCHAR NOT NULL, + msg TEXT NOT NULL, + status VARCHAT NOT NULL, + timestamp DATE DEFAULT(datetime('now', 'localtime')) + ); + `) + if err != nil { + return nil, err + } + + return &Broker{ + db: db, + log: lo, + }, nil +} + +func (broker *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error { + var job_msg tasqueue.JobMessage + if err := msgpack.Unmarshal(msg, &job_msg); err != nil { + return err + } + tx, err := broker.db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + tx.Rollback() + broker.log.Debug("failed to begin the transaction", err) + return err + } + result, err := tx.Exec(`REPLACE INTO jobs(id,queue,msg,status) VALUES(?,?,?,?);`, job_msg.ID, queue, msg, job_msg.Status) + if err != nil { + tx.Rollback() + broker.log.Debug("failed to begin ..", "error", err) + return err + } + affected, err := result.RowsAffected() + if err != nil { + tx.Rollback() + broker.log.Debug("failed get affected rows jobs", err) + return err + } + if affected == 0 { + tx.Rollback() + broker.log.Debug("no rows affected") + return fmt.Errorf("failed to replace the job %s, %s", job_msg.ID, msg) + } + if err = tx.Commit(); err != nil { + broker.log.Debug("failed commit the transaction", err) + return err + } + return nil +} + +func (broker *Broker) Consume(ctx context.Context, work chan []byte, queue string) { + for { + select { + case <-ctx.Done(): + broker.log.Debug("shutting down the consumer") + return + default: + broker.log.Debug("receiving from consumer..") + msg, err := broker.popJob(ctx, queue) + if err == nil { + work <- []byte(msg) + } else { + if err == NoJobToProcess { + broker.log.Error("error consuming from sqlite queue", "error", err) + } else { + broker.log.Error("error parsing response from sqlite", "error", err) + return + } + } + } + } +} + +var NoJobToProcess = errors.New("no jobs to process") + +func (broker *Broker) popJob(ctx context.Context, queue string) (string, error) { + var msg string + tx, err := broker.db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return msg, err + } + var id string + if err = tx.QueryRowContext(ctx, "SELECT id, msg FROM jobs WHERE queue = ? AND status = ?;", queue, tasqueue.StatusStarted).Scan(&id, &msg); err != nil { + tx.Rollback() + if err == sql.ErrNoRows { + return msg, NoJobToProcess + } + return msg, err + } + + result, err := tx.ExecContext(ctx, `UPDATE jobs SET status = ? WHERE id = ?;`, tasqueue.StatusProcessing, id) + if err != nil { + tx.Rollback() + return msg, err + } + affected, err := result.RowsAffected() + if err != nil { + tx.Rollback() + return msg, err + } + if affected == 0 { + tx.Rollback() + return msg, NoJobToProcess + } + + if err = tx.Commit(); err != nil { + return msg, err + } + + return msg, nil +} + +func (broker *Broker) GetPending(ctx context.Context, queue string) ([]string, error) { + messages := make([]string, 0) + rows, err := broker.db.QueryContext(ctx, `SELECT msg FROM jobs WHERE queue = ? AND status = ?`, queue, tasqueue.StatusStarted) + if err != nil { + return messages, err + } + for rows.Next() { + var msg string + if err = rows.Scan(&msg); err != nil { + return messages, err + } + messages = append(messages, msg) + } + return messages, nil +} + +func (broker *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error { + return fmt.Errorf("EnqueueScheduled: not implemeted") +} diff --git a/examples/sqlite/main.go b/examples/sqlite/main.go new file mode 100644 index 0000000..c231a41 --- /dev/null +++ b/examples/sqlite/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "log/slog" + "os" + "os/signal" + "time" + + "github.com/kalbhor/tasqueue/v2" + sqb "github.com/kalbhor/tasqueue/v2/brokers/sqlite" + "github.com/kalbhor/tasqueue/v2/examples/tasks" + sqr "github.com/kalbhor/tasqueue/v2/results/sqlite" +) + +func main() { + ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + lo := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + broker, err := sqb.New(sqb.Options{ + DataSource: "jobs.db", + }, lo) + if err != nil { + log.Fatal(err) + } + results, err := sqr.New(sqr.Options{ + DataSource: "jobs.db", + }, lo) + if err != nil { + log.Fatal(err) + } + srv, err := tasqueue.NewServer(tasqueue.ServerOpts{ + Broker: broker, + Results: results, + Logger: lo.Handler(), + }) + if err != nil { + log.Fatal(err) + } + + err = srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{Concurrency: 5}) + if err != nil { + log.Fatal(err) + } + + go srv.Start(ctx) + + b, _ := json.Marshal(tasks.SumPayload{Arg1: 40, Arg2: 40}) + task, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{}) + if err != nil { + log.Fatal(err) + } + _, err = srv.Enqueue(ctx, task) + if err != nil { + log.Fatal(err) + } + + b, _ = json.Marshal(tasks.SumPayload{Arg1: 2, Arg2: 2}) + task, err = tasqueue.NewJob("add", b, tasqueue.JobOpts{}) + if err != nil { + log.Fatal(err) + } + _, err = srv.Enqueue(ctx, task) + if err != nil { + log.Fatal(err) + } + fmt.Println("exit..") + for { + select { + case <-time.NewTicker(time.Second * 1).C: + ids, err := srv.GetSuccess(ctx) + if err != nil { + log.Fatal(err) + } + + log.Println(ids) + } + } +} diff --git a/go.mod b/go.mod index 185c53f..d87a2f6 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis/v8 v8.11.5 github.com/google/uuid v1.3.0 + github.com/mattn/go-sqlite3 v1.14.17 github.com/nats-io/nats.go v1.28.0 github.com/robfig/cron/v3 v3.0.1 github.com/vmihailenco/msgpack/v5 v5.3.5 diff --git a/go.sum b/go.sum index 54e1818..a7c45c0 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= +github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= diff --git a/results/sqlite/results.go b/results/sqlite/results.go new file mode 100644 index 0000000..05520a6 --- /dev/null +++ b/results/sqlite/results.go @@ -0,0 +1,157 @@ +package sqlite + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + + _ "github.com/mattn/go-sqlite3" + "github.com/vmihailenco/msgpack/v5" + + "github.com/kalbhor/tasqueue/v2" +) + +type ResultStatus string + +const ( + QUEUED ResultStatus = "QUEUED" + PROCESSING ResultStatus = "PROCESSING" + FAILED ResultStatus = "FAILED" + COMPLETED ResultStatus = "COMPLETED" +) + +type Results struct { + db *sql.DB + log *slog.Logger +} + +type Options struct { + DataSource string +} + +func New(options Options, lo *slog.Logger) (*Results, error) { + db, err := sql.Open("sqlite3", options.DataSource) + if err != nil { + return nil, err + } + + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS results( + id VARCHAR PRIMARY KEY, + msg TEXT NOT NULL, + status VARCHAR NOT NULL + ); + `) + + return &Results{ + db: db, + log: lo, + }, nil +} + +func (results *Results) Get(ctx context.Context, id string) ([]byte, error) { + return nil, fmt.Errorf("Get: not implemented") +} + +func (results *Results) NilError() error { + return fmt.Errorf("NilError: not implemented") +} + +func (results *Results) Set(ctx context.Context, id string, b []byte) error { + var msg tasqueue.JobMessage + if err := msgpack.Unmarshal(b, &msg); err != nil { + return err + } + tx, err := results.db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return err + } + result, err := tx.Exec(`REPLACE INTO results(id,msg,status) VALUES(?,?,?);`, id, b, msg.Status) + if err != nil { + tx.Rollback() + return nil + } + affected, err := result.RowsAffected() + if err != nil { + tx.Rollback() + return nil + } + if affected == 0 { + tx.Rollback() + return fmt.Errorf("failed to replace into table %s, %s", id, b) + } + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +func (results *Results) DeleteJob(ctx context.Context, id string) error { + return fmt.Errorf("DeleteJob: not implemented") +} + +func (results *Results) GetFailed(ctx context.Context) ([]string, error) { + ids := make([]string, 0) + rows, err := results.db.QueryContext(ctx, `SELECT id FROM results WHERE status = ?;`, tasqueue.StatusFailed) + if err != nil { + return ids, err + } + for rows.Next() { + var id string + if err = rows.Scan(&id); err != nil { + return ids, err + } + ids = append(ids, id) + } + return ids, nil +} + +func (results *Results) GetSuccess(ctx context.Context) ([]string, error) { + ids := make([]string, 0) + rows, err := results.db.QueryContext(ctx, `SELECT id FROM results WHERE status = ?;`, tasqueue.StatusDone) + if err != nil { + return ids, err + } + for rows.Next() { + var id string + if err = rows.Scan(&id); err != nil { + return ids, err + } + ids = append(ids, id) + } + return ids, nil +} + +func (results *Results) SetFailed(ctx context.Context, id string) error { + tx, err := results.db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return err + } + _, err = tx.Exec(`UPDATE results SET status = ? WHERE id = ?;`, id, tasqueue.StatusFailed) + if err != nil { + tx.Rollback() + return err + } + if err = tx.Commit(); err != nil { + return err + } + return nil +} + +func (results *Results) SetSuccess(ctx context.Context, id string) error { + tx, err := results.db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return err + } + _, err = tx.Exec(`UPDATE results SET status = ? WHERE id = ?;`, id, tasqueue.StatusDone) + if err != nil { + tx.Rollback() + return err + } + if err = tx.Commit(); err != nil { + return err + } + return nil +}