Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: add sqlite backend for the taskqueue #42

Open
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

dineshsalunke
Copy link

No description provided.

@kalbhor
Copy link
Owner

kalbhor commented Sep 25, 2023

Hey, a sqlite results store would be quite useful. I see you're interested, do let me know if any help needed

@dineshsalunke
Copy link
Author

dineshsalunke commented Sep 29, 2023

@kalbhor First of all sorry for the late response. we are surely interested in finishing this up, and for sure would need help on it ( We are just starting up on go ).

Will put out our ideas here so that we can brainstorm and clear our doubt's.

Also I am saying "WE" coz our team will be working on this as well, they will chime in soon

@dineshsalunke
Copy link
Author

@kalbhor done with initial implementation for the enqueuing and consuming with sqlite3, check it once and let me know your thoughts. If everything seems ok then will move forward with rest of implementation

@kalbhor
Copy link
Owner

kalbhor commented Oct 3, 2023

hey, sorry for the delay. Glanced at this and it looks good. Please give me few days to give a detailed review

@kalbhor kalbhor marked this pull request as ready for review October 10, 2023 15:08
Copy link
Owner

@kalbhor kalbhor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some review comments. Overall, the implementation looks good, just need to modify certain things according to how the library expects. We can have another review in the end to go into particulars like Go best practices, etc. Cheers 🎉


type JobStatus string

const (
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't require setting job status inside the broker implementation. The status is set by the library using the results store. Check:

func (s *Server) statusStarted(ctx context.Context, t JobMessage) error {

return nil, err
}

_, err = db.Exec(`
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of having separate queues is to have some sort of isolation in the way the data is "stored/queued". I think we may be better off with creating separate tables for separate queues. You can do something like jobs_%s and add the queue name inside this create query. I am open to discussion on whether this is the correct way to isolate queues, but this is what first came to mind

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the table could be much simpler. We don't store the jobID, status, etc in the broker. The broker simply pushes/stores []byte data in some queue. All the parsing to check for jobID, etc happens in the library's code.
Check:

func (s *Server) process(ctx context.Context, w chan []byte) {

}

func (broker *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error {
var job_msg tasqueue.JobMessage
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the above changes, this method should simply accept []byte data and insert it into the relevant table. No need to unmarshal it and structurally set it in the table

result, err := tx.Exec(`REPLACE INTO jobs(id,queue,msg,status) VALUES(?,?,?,?);`, job_msg.ID, queue, msg, job_msg.Status)
if err != nil {
tx.Rollback()
broker.log.Debug("failed to replace in jobs", err)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to note the logging methods are to be used as broker.log.Debug("failed to begin....", "error", err). Basically the first field is the message and then the fields are key followed by value. So in above example we have error as key with value being err.

for {
select {
case <-ctx.Done():
broker.log.Debug("stopping the consumer")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for consistency, we can use the same message as in other brokers. shutting down consumer..

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can add a PR later on to change each broker's message to shutting down consumer. The .. are unnecessary, not sure why I added them.


var NoJobToProcess = errors.New("no jobs to process")

func (broker *Broker) popJob(ctx context.Context, queue string) (string, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we've decided to change the table into something simple with just an ID and data column. I think it maybe better if we pop by doing a transaction that tries to read N max rows (This N can be defined in the broker's config) each time and then sends them one by one to the channel in Consume. I am unsure (haven't benchmarked), but surely doing a transaction and popping 1 element inside an infinite for{} may not be ideal.

Maybe we can discuss further on this and you can share some benchmarks (if you're unaware of how to benchmark in Go, I can do this).

return msg, nil
}

func (broker *Broker) GetPending(ctx context.Context, queue string) ([]string, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just dump all the contents in the particular queue's table

return nil, err
}

_, err = db.Exec(`
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table here should be have colums id (string), content, expiry. We don't need to have any specific columns for status. The library will use this generic result store for storing the job status' and this will be used by library clients to save job data as well.

return fmt.Errorf("NilError: not implemented")
}

func (results *Results) Set(ctx context.Context, id string, b []byte) error {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to marshal/unmarshal here. Just save []byte into content and use the particular id

return fmt.Errorf("DeleteJob: not implemented")
}

func (results *Results) GetFailed(ctx context.Context) ([]string, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have separate tables for failed, success with simple id and content

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants