-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wip: add sqlite backend for the taskqueue #42
base: main
Are you sure you want to change the base?
Conversation
Hey, a sqlite results store would be quite useful. I see you're interested, do let me know if any help needed |
@kalbhor First of all sorry for the late response. we are surely interested in finishing this up, and for sure would need help on it ( We are just starting up on go ). Will put out our ideas here so that we can brainstorm and clear our doubt's. Also I am saying "WE" coz our team will be working on this as well, they will chime in soon |
@kalbhor done with initial implementation for the enqueuing and consuming with sqlite3, check it once and let me know your thoughts. If everything seems ok then will move forward with rest of implementation |
hey, sorry for the delay. Glanced at this and it looks good. Please give me few days to give a detailed review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some review comments. Overall, the implementation looks good, just need to modify certain things according to how the library expects. We can have another review in the end to go into particulars like Go best practices, etc. Cheers 🎉
|
||
type JobStatus string | ||
|
||
const ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't require setting job status inside the broker implementation. The status is set by the library using the results store. Check:
Line 453 in ff4f99d
func (s *Server) statusStarted(ctx context.Context, t JobMessage) error { |
return nil, err | ||
} | ||
|
||
_, err = db.Exec(` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of having separate queues is to have some sort of isolation in the way the data is "stored/queued". I think we may be better off with creating separate tables for separate queues. You can do something like jobs_%s
and add the queue name inside this create query. I am open to discussion on whether this is the correct way to isolate queues, but this is what first came to mind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the table could be much simpler. We don't store the jobID, status, etc in the broker. The broker simply pushes/stores []byte
data in some queue. All the parsing to check for jobID, etc happens in the library's code.
Check:
Line 256 in ff4f99d
func (s *Server) process(ctx context.Context, w chan []byte) { |
} | ||
|
||
func (broker *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error { | ||
var job_msg tasqueue.JobMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the above changes, this method should simply accept []byte
data and insert it into the relevant table. No need to unmarshal it and structurally set it in the table
brokers/sqlite/broker.go
Outdated
result, err := tx.Exec(`REPLACE INTO jobs(id,queue,msg,status) VALUES(?,?,?,?);`, job_msg.ID, queue, msg, job_msg.Status) | ||
if err != nil { | ||
tx.Rollback() | ||
broker.log.Debug("failed to replace in jobs", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, to note the logging methods are to be used as broker.log.Debug("failed to begin....", "error", err)
. Basically the first field is the message and then the fields are key
followed by value
. So in above example we have error
as key with value being err
.
brokers/sqlite/broker.go
Outdated
for { | ||
select { | ||
case <-ctx.Done(): | ||
broker.log.Debug("stopping the consumer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for consistency, we can use the same message as in other brokers. shutting down consumer..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can add a PR later on to change each broker's message to shutting down consumer
. The ..
are unnecessary, not sure why I added them.
|
||
var NoJobToProcess = errors.New("no jobs to process") | ||
|
||
func (broker *Broker) popJob(ctx context.Context, queue string) (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we've decided to change the table into something simple with just an ID and data column. I think it maybe better if we pop by doing a transaction that tries to read N max rows (This N can be defined in the broker's config) each time and then sends them one by one to the channel in Consume
. I am unsure (haven't benchmarked), but surely doing a transaction and popping 1 element inside an infinite for{}
may not be ideal.
Maybe we can discuss further on this and you can share some benchmarks (if you're unaware of how to benchmark in Go, I can do this).
return msg, nil | ||
} | ||
|
||
func (broker *Broker) GetPending(ctx context.Context, queue string) ([]string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just dump all the contents in the particular queue's table
return nil, err | ||
} | ||
|
||
_, err = db.Exec(` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table here should be have colums id
(string), content
, expiry
. We don't need to have any specific columns for status. The library will use this generic result store for storing the job status' and this will be used by library clients to save job data as well.
return fmt.Errorf("NilError: not implemented") | ||
} | ||
|
||
func (results *Results) Set(ctx context.Context, id string, b []byte) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to marshal/unmarshal here. Just save []byte
into content and use the particular id
return fmt.Errorf("DeleteJob: not implemented") | ||
} | ||
|
||
func (results *Results) GetFailed(ctx context.Context) ([]string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have separate tables for failed, success with simple id
and content
No description provided.