Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: add sqlite backend for the taskqueue #42

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c71cfc1
refactor: add boilerplate skeleton code for broker and results
dineshsalunke Sep 22, 2023
09c49e9
refactor: add constructors to create broker and results
dineshsalunke Sep 30, 2023
399d2f4
chore: add sqlite3 package
dineshsalunke Sep 30, 2023
1128fd1
refactor: add example for sqlite
dineshsalunke Sep 30, 2023
ce5b82f
refactor: use the correct file name
dineshsalunke Sep 30, 2023
6f94777
refactor: add missing extension
dineshsalunke Sep 30, 2023
6b921bf
refactor: log fatal if enqueuing failed
dineshsalunke Sep 30, 2023
cb732f8
refactor: add the sqlite3 import
dineshsalunke Sep 30, 2023
ca01a15
refactor: add the sqlite package
dineshsalunke Sep 30, 2023
67f7db7
refactor: add enum for job status
dineshsalunke Sep 30, 2023
9b31f5f
refactor: add result status enum
dineshsalunke Sep 30, 2023
ad3767c
refactor: prefix the errors so they are better to debug
dineshsalunke Oct 1, 2023
58e75e3
refactor: use json handler for log
dineshsalunke Oct 1, 2023
a206589
refactor: use the same db for results as well
dineshsalunke Oct 1, 2023
04f1b05
refactor: add variation to task argument
dineshsalunke Oct 1, 2023
08e4a5c
refactor: add status property for results
dineshsalunke Oct 1, 2023
eb933eb
fix: implement the Set method for results
dineshsalunke Oct 1, 2023
13b0351
refactor: implement get and set for failed in results
dineshsalunke Oct 1, 2023
e5e22a6
refactor: implement get and set for success in results
dineshsalunke Oct 1, 2023
c794b89
refactor: add status property in the db for results
dineshsalunke Oct 1, 2023
ecbea38
fix: implement the Enqueue for the broker
dineshsalunke Oct 1, 2023
fc55421
feat: implement the consume functionality in broker
dineshsalunke Oct 1, 2023
cfafac7
refactor: minor logging changes
dineshsalunke Oct 10, 2023
d54a1e5
fix: use Error in place of Debug
dineshsalunke Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions brokers/sqlite/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package sqlite

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"time"

_ "github.com/mattn/go-sqlite3"
"github.com/vmihailenco/msgpack/v5"

"github.com/kalbhor/tasqueue/v2"
)

type JobStatus string

const (
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't require setting job status inside the broker implementation. The status is set by the library using the results store. Check:

func (s *Server) statusStarted(ctx context.Context, t JobMessage) error {

QUEUED JobStatus = "QUEUED"
PROCESSING JobStatus = "PROCESSING"
FAILED JobStatus = "FAILED"
COMPLETED JobStatus = "COMPLETED"
)

type Broker struct {
db *sql.DB
log *slog.Logger
}

type Options struct {
DataSource string
}

func New(options Options, lo *slog.Logger) (*Broker, error) {
db, err := sql.Open("sqlite3", options.DataSource)
if err != nil {
return nil, err
}

_, err = db.Exec(`
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of having separate queues is to have some sort of isolation in the way the data is "stored/queued". I think we may be better off with creating separate tables for separate queues. You can do something like jobs_%s and add the queue name inside this create query. I am open to discussion on whether this is the correct way to isolate queues, but this is what first came to mind

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the table could be much simpler. We don't store the jobID, status, etc in the broker. The broker simply pushes/stores []byte data in some queue. All the parsing to check for jobID, etc happens in the library's code.
Check:

func (s *Server) process(ctx context.Context, w chan []byte) {

CREATE TABLE IF NOT EXISTS jobs(
id VARCHAR PRIMARY KEY,
queue VARCHAR NOT NULL,
msg TEXT NOT NULL,
status VARCHAT NOT NULL,
timestamp DATE DEFAULT(datetime('now', 'localtime'))
);
`)
if err != nil {
return nil, err
}

return &Broker{
db: db,
log: lo,
}, nil
}

func (broker *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error {
var job_msg tasqueue.JobMessage
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the above changes, this method should simply accept []byte data and insert it into the relevant table. No need to unmarshal it and structurally set it in the table

if err := msgpack.Unmarshal(msg, &job_msg); err != nil {
return err
}
tx, err := broker.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
tx.Rollback()
broker.log.Debug("failed to begin the transaction", err)
return err
}
result, err := tx.Exec(`REPLACE INTO jobs(id,queue,msg,status) VALUES(?,?,?,?);`, job_msg.ID, queue, msg, job_msg.Status)
if err != nil {
tx.Rollback()
broker.log.Debug("failed to replace in jobs", err)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to note the logging methods are to be used as broker.log.Debug("failed to begin....", "error", err). Basically the first field is the message and then the fields are key followed by value. So in above example we have error as key with value being err.

return err
}
affected, err := result.RowsAffected()
if err != nil {
tx.Rollback()
broker.log.Debug("failed get affected rows jobs", err)
return err
}
if affected == 0 {
tx.Rollback()
broker.log.Debug("no rows affected")
return fmt.Errorf("failed to replace the job %s, %s", job_msg.ID, msg)
}
if err = tx.Commit(); err != nil {
broker.log.Debug("failed commit the transaction", err)
return err
}
return nil
}

func (broker *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
for {
select {
case <-ctx.Done():
broker.log.Debug("stopping the consumer")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for consistency, we can use the same message as in other brokers. shutting down consumer..

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can add a PR later on to change each broker's message to shutting down consumer. The .. are unnecessary, not sure why I added them.

return
default:
msg, err := broker.popJob(ctx, queue)
if err == nil {
work <- []byte(msg)
} else {
if err == NoJobToProcess {
broker.log.Debug("no jobs to process")
} else {
broker.log.Debug("failed to pop job", msg, err)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an error log instead of debug. General note is, if we are not returning the error from a method, it should log at error level. If we are returning an error to the method caller, just a Debug log for more context is fine. (Since the error is sent to method caller, it will deal with it)

return
}
}
}
}
}

var NoJobToProcess = errors.New("no jobs to process")

func (broker *Broker) popJob(ctx context.Context, queue string) (string, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we've decided to change the table into something simple with just an ID and data column. I think it maybe better if we pop by doing a transaction that tries to read N max rows (This N can be defined in the broker's config) each time and then sends them one by one to the channel in Consume. I am unsure (haven't benchmarked), but surely doing a transaction and popping 1 element inside an infinite for{} may not be ideal.

Maybe we can discuss further on this and you can share some benchmarks (if you're unaware of how to benchmark in Go, I can do this).

var msg string
tx, err := broker.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return msg, err
}
var id string
if err = tx.QueryRowContext(ctx, "SELECT id, msg FROM jobs WHERE queue = ? AND status = ?;", queue, tasqueue.StatusStarted).Scan(&id, &msg); err != nil {
tx.Rollback()
if err == sql.ErrNoRows {
return msg, NoJobToProcess
}
return msg, err
}

result, err := tx.ExecContext(ctx, `UPDATE jobs SET status = ? WHERE id = ?;`, tasqueue.StatusProcessing, id)
if err != nil {
tx.Rollback()
return msg, err
}
affected, err := result.RowsAffected()
if err != nil {
tx.Rollback()
return msg, err
}
if affected == 0 {
tx.Rollback()
return msg, NoJobToProcess
}

if err = tx.Commit(); err != nil {
return msg, err
}

return msg, nil
}

func (broker *Broker) GetPending(ctx context.Context, queue string) ([]string, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just dump all the contents in the particular queue's table

messages := make([]string, 0)
rows, err := broker.db.QueryContext(ctx, `SELECT msg FROM jobs WHERE queue = ? AND status = ?`, queue, tasqueue.StatusStarted)
if err != nil {
return messages, err
}
for rows.Next() {
var msg string
if err = rows.Scan(&msg); err != nil {
return messages, err
}
messages = append(messages, msg)
}
return messages, nil
}

func (broker *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
return fmt.Errorf("EnqueueScheduled: not implemeted")
}
83 changes: 83 additions & 0 deletions examples/sqlite/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"time"

"github.com/kalbhor/tasqueue/v2"
sqb "github.com/kalbhor/tasqueue/v2/brokers/sqlite"
"github.com/kalbhor/tasqueue/v2/examples/tasks"
sqr "github.com/kalbhor/tasqueue/v2/results/sqlite"
)

func main() {
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
lo := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
broker, err := sqb.New(sqb.Options{
DataSource: "jobs.db",
}, lo)
if err != nil {
log.Fatal(err)
}
results, err := sqr.New(sqr.Options{
DataSource: "jobs.db",
}, lo)
if err != nil {
log.Fatal(err)
}
srv, err := tasqueue.NewServer(tasqueue.ServerOpts{
Broker: broker,
Results: results,
Logger: lo.Handler(),
})
if err != nil {
log.Fatal(err)
}

err = srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{Concurrency: 5})
if err != nil {
log.Fatal(err)
}

go srv.Start(ctx)

b, _ := json.Marshal(tasks.SumPayload{Arg1: 40, Arg2: 40})
task, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {
log.Fatal(err)
}
_, err = srv.Enqueue(ctx, task)
if err != nil {
log.Fatal(err)
}

b, _ = json.Marshal(tasks.SumPayload{Arg1: 2, Arg2: 2})
task, err = tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {
log.Fatal(err)
}
_, err = srv.Enqueue(ctx, task)
if err != nil {
log.Fatal(err)
}
fmt.Println("exit..")
for {
select {
case <-time.NewTicker(time.Second * 1).C:
ids, err := srv.GetSuccess(ctx)
if err != nil {
log.Fatal(err)
}

log.Println(ids)
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.3.0
github.com/mattn/go-sqlite3 v1.14.17
github.com/nats-io/nats.go v1.28.0
github.com/robfig/cron/v3 v3.0.1
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
Expand Down
Loading