-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wip: add sqlite backend for the taskqueue #42
base: main
Are you sure you want to change the base?
Changes from 22 commits
c71cfc1
09c49e9
399d2f4
1128fd1
ce5b82f
6f94777
6b921bf
cb732f8
ca01a15
67f7db7
9b31f5f
ad3767c
58e75e3
a206589
04f1b05
08e4a5c
eb933eb
13b0351
e5e22a6
c794b89
ecbea38
fc55421
cfafac7
d54a1e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,174 @@ | ||||
package sqlite | ||||
|
||||
import ( | ||||
"context" | ||||
"database/sql" | ||||
"errors" | ||||
"fmt" | ||||
"log/slog" | ||||
"time" | ||||
|
||||
_ "github.com/mattn/go-sqlite3" | ||||
"github.com/vmihailenco/msgpack/v5" | ||||
|
||||
"github.com/kalbhor/tasqueue/v2" | ||||
) | ||||
|
||||
type JobStatus string | ||||
|
||||
const ( | ||||
QUEUED JobStatus = "QUEUED" | ||||
PROCESSING JobStatus = "PROCESSING" | ||||
FAILED JobStatus = "FAILED" | ||||
COMPLETED JobStatus = "COMPLETED" | ||||
) | ||||
|
||||
type Broker struct { | ||||
db *sql.DB | ||||
log *slog.Logger | ||||
} | ||||
|
||||
type Options struct { | ||||
DataSource string | ||||
} | ||||
|
||||
func New(options Options, lo *slog.Logger) (*Broker, error) { | ||||
db, err := sql.Open("sqlite3", options.DataSource) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
_, err = db.Exec(` | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea of having separate queues is to have some sort of isolation in the way the data is "stored/queued". I think we may be better off with creating separate tables for separate queues. You can do something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, the table could be much simpler. We don't store the jobID, status, etc in the broker. The broker simply pushes/stores Line 256 in ff4f99d
|
||||
CREATE TABLE IF NOT EXISTS jobs( | ||||
id VARCHAR PRIMARY KEY, | ||||
queue VARCHAR NOT NULL, | ||||
msg TEXT NOT NULL, | ||||
status VARCHAT NOT NULL, | ||||
timestamp DATE DEFAULT(datetime('now', 'localtime')) | ||||
); | ||||
`) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
return &Broker{ | ||||
db: db, | ||||
log: lo, | ||||
}, nil | ||||
} | ||||
|
||||
func (broker *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error { | ||||
var job_msg tasqueue.JobMessage | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Due to the above changes, this method should simply accept |
||||
if err := msgpack.Unmarshal(msg, &job_msg); err != nil { | ||||
return err | ||||
} | ||||
tx, err := broker.db.BeginTx(ctx, &sql.TxOptions{}) | ||||
if err != nil { | ||||
tx.Rollback() | ||||
broker.log.Debug("failed to begin the transaction", err) | ||||
return err | ||||
} | ||||
result, err := tx.Exec(`REPLACE INTO jobs(id,queue,msg,status) VALUES(?,?,?,?);`, job_msg.ID, queue, msg, job_msg.Status) | ||||
if err != nil { | ||||
tx.Rollback() | ||||
broker.log.Debug("failed to replace in jobs", err) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, to note the logging methods are to be used as |
||||
return err | ||||
} | ||||
affected, err := result.RowsAffected() | ||||
if err != nil { | ||||
tx.Rollback() | ||||
broker.log.Debug("failed get affected rows jobs", err) | ||||
return err | ||||
} | ||||
if affected == 0 { | ||||
tx.Rollback() | ||||
broker.log.Debug("no rows affected") | ||||
return fmt.Errorf("failed to replace the job %s, %s", job_msg.ID, msg) | ||||
} | ||||
if err = tx.Commit(); err != nil { | ||||
broker.log.Debug("failed commit the transaction", err) | ||||
return err | ||||
} | ||||
return nil | ||||
} | ||||
|
||||
func (broker *Broker) Consume(ctx context.Context, work chan []byte, queue string) { | ||||
for { | ||||
select { | ||||
case <-ctx.Done(): | ||||
broker.log.Debug("stopping the consumer") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for consistency, we can use the same message as in other brokers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe you can add a PR later on to change each broker's message to |
||||
return | ||||
default: | ||||
msg, err := broker.popJob(ctx, queue) | ||||
if err == nil { | ||||
work <- []byte(msg) | ||||
} else { | ||||
if err == NoJobToProcess { | ||||
broker.log.Debug("no jobs to process") | ||||
} else { | ||||
broker.log.Debug("failed to pop job", msg, err) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be an error log instead of debug. General note is, if we are not returning the error from a method, it should log at |
||||
return | ||||
} | ||||
} | ||||
} | ||||
} | ||||
} | ||||
|
||||
var NoJobToProcess = errors.New("no jobs to process") | ||||
|
||||
func (broker *Broker) popJob(ctx context.Context, queue string) (string, error) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we've decided to change the table into something simple with just an ID and data column. I think it maybe better if we pop by doing a transaction that tries to read N max rows (This N can be defined in the broker's config) each time and then sends them one by one to the channel in Maybe we can discuss further on this and you can share some benchmarks (if you're unaware of how to benchmark in Go, I can do this). |
||||
var msg string | ||||
tx, err := broker.db.BeginTx(ctx, &sql.TxOptions{}) | ||||
if err != nil { | ||||
return msg, err | ||||
} | ||||
var id string | ||||
if err = tx.QueryRowContext(ctx, "SELECT id, msg FROM jobs WHERE queue = ? AND status = ?;", queue, tasqueue.StatusStarted).Scan(&id, &msg); err != nil { | ||||
tx.Rollback() | ||||
if err == sql.ErrNoRows { | ||||
return msg, NoJobToProcess | ||||
} | ||||
return msg, err | ||||
} | ||||
|
||||
result, err := tx.ExecContext(ctx, `UPDATE jobs SET status = ? WHERE id = ?;`, tasqueue.StatusProcessing, id) | ||||
if err != nil { | ||||
tx.Rollback() | ||||
return msg, err | ||||
} | ||||
affected, err := result.RowsAffected() | ||||
if err != nil { | ||||
tx.Rollback() | ||||
return msg, err | ||||
} | ||||
if affected == 0 { | ||||
tx.Rollback() | ||||
return msg, NoJobToProcess | ||||
} | ||||
|
||||
if err = tx.Commit(); err != nil { | ||||
return msg, err | ||||
} | ||||
|
||||
return msg, nil | ||||
} | ||||
|
||||
func (broker *Broker) GetPending(ctx context.Context, queue string) ([]string, error) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should just dump all the contents in the particular queue's table |
||||
messages := make([]string, 0) | ||||
rows, err := broker.db.QueryContext(ctx, `SELECT msg FROM jobs WHERE queue = ? AND status = ?`, queue, tasqueue.StatusStarted) | ||||
if err != nil { | ||||
return messages, err | ||||
} | ||||
for rows.Next() { | ||||
var msg string | ||||
if err = rows.Scan(&msg); err != nil { | ||||
return messages, err | ||||
} | ||||
messages = append(messages, msg) | ||||
} | ||||
return messages, nil | ||||
} | ||||
|
||||
func (broker *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error { | ||||
return fmt.Errorf("EnqueueScheduled: not implemeted") | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"log" | ||
"log/slog" | ||
"os" | ||
"os/signal" | ||
"time" | ||
|
||
"github.com/kalbhor/tasqueue/v2" | ||
sqb "github.com/kalbhor/tasqueue/v2/brokers/sqlite" | ||
"github.com/kalbhor/tasqueue/v2/examples/tasks" | ||
sqr "github.com/kalbhor/tasqueue/v2/results/sqlite" | ||
) | ||
|
||
func main() { | ||
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) | ||
lo := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ | ||
Level: slog.LevelDebug, | ||
})) | ||
broker, err := sqb.New(sqb.Options{ | ||
DataSource: "jobs.db", | ||
}, lo) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
results, err := sqr.New(sqr.Options{ | ||
DataSource: "jobs.db", | ||
}, lo) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
srv, err := tasqueue.NewServer(tasqueue.ServerOpts{ | ||
Broker: broker, | ||
Results: results, | ||
Logger: lo.Handler(), | ||
}) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
err = srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{Concurrency: 5}) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
go srv.Start(ctx) | ||
|
||
b, _ := json.Marshal(tasks.SumPayload{Arg1: 40, Arg2: 40}) | ||
task, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{}) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
_, err = srv.Enqueue(ctx, task) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
b, _ = json.Marshal(tasks.SumPayload{Arg1: 2, Arg2: 2}) | ||
task, err = tasqueue.NewJob("add", b, tasqueue.JobOpts{}) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
_, err = srv.Enqueue(ctx, task) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
fmt.Println("exit..") | ||
for { | ||
select { | ||
case <-time.NewTicker(time.Second * 1).C: | ||
ids, err := srv.GetSuccess(ctx) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
log.Println(ids) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't require setting job status inside the broker implementation. The status is set by the library using the results store. Check:
Tasqueue/server.go
Line 453 in ff4f99d