diff --git a/broker/cron_job.go b/broker/cron_job.go index 7f6771a..9847860 100644 --- a/broker/cron_job.go +++ b/broker/cron_job.go @@ -53,8 +53,8 @@ func CronJob(client redis.UniversalClient, broker Broker, message Message) ([]by }) } - client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId), value, 0).Err() - client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, *message.D.Name), value, 0).Err() + client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId), value, 0).Err() + client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, *message.D.Name), timeVal, 0).Err() if message.D.Route != nil { err = client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId), *message.D.Route, 0).Err() @@ -64,14 +64,14 @@ func CronJob(client redis.UniversalClient, broker Broker, message Message) ([]by } } - client.SAdd(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, *message.D.Name), fmt.Sprintf("%s:%s", taskId, *message.D.Name)).Err() + client.SAdd(context.Background(), constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, *message.D.Name)).Err() log.Infof("Added task with ID %s to run every %v.\n", taskId, timeVal) processor.ProcessCronJob(client, *broker.Channel, *message.D.Name, taskId) return json.Marshal(map[string]interface{}{ - "t": constants.TASK_DELAY, + "t": constants.TASK_CRON, "d": map[string]interface{}{ "taskId": taskId, }, diff --git a/go.mod b/go.mod index 4053e64..d09cdb0 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,12 @@ require ( github.com/redis/go-redis/v9 v9.0.4 ) -require github.com/robfig/cron v1.2.0 +require github.com/go-co-op/gocron v1.27.1 + +require ( + github.com/robfig/cron/v3 v3.0.1 // indirect + go.uber.org/atomic v1.9.0 // indirect +) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index 667f3cc..0157037 100644 --- a/go.sum +++ b/go.sum @@ -4,7 +4,9 @@ github.com/caarlos0/env/v8 v8.0.0 h1:POhxHhSpuxrLMIdvTGARuZqR4Jjm8AYmoi/JKlcScs0 github.com/caarlos0/env/v8 v8.0.0/go.mod h1:7K4wMY9bH0esiXSSHlfHLX5xKGQMnkH5Fk4TDSSSzfo= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= @@ -12,27 +14,43 @@ github.com/disgoorg/log v1.2.0 h1:sqlXnu/ZKAlIlHV9IO+dbMto7/hCQ474vlIdMWk8QKo= github.com/disgoorg/log v1.2.0/go.mod h1:3x1KDG6DI1CE2pDwi3qlwT3wlXpeHW/5rVay+1qDqOo= github.com/disgoorg/snowflake/v2 v2.0.1 h1:CuUxGLwggUxEswZOmZ+mZ5i0xSumQdXW9tXW7uGqe+0= github.com/disgoorg/snowflake/v2 v2.0.1/go.mod h1:SPU9c2CNn5DSyb86QcKtdZgix9osEtKrHLW4rMhfLCs= +github.com/go-co-op/gocron v1.27.1 h1:fYmK6COvF3rdFBbB4yQGWaf6TKIMjPv+1oaFrVx9bl8= +github.com/go-co-op/gocron v1.27.1/go.mod h1:39f6KNSGVOU1LO/ZOoZfcSxwlsJDQOKSu8erN0SH48Y= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM= -github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc= github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lib/task.go b/lib/task.go index da8f6fe..b0c77c5 100644 --- a/lib/task.go +++ b/lib/task.go @@ -37,15 +37,25 @@ func InitTask(conf *config.Config) *Task { log.Fatalf("Unable to declare exchange due to: %v", err) } - processor.ProcessJob(task.Redis, *task.Broker.Channel); broker.HandleReceive(task.Redis, *task.Broker) + go func () { + Delays := task.Redis.ZCard(context.Background(), constants.TASK_REDIS_KEY).Val() - Members := task.Redis.SMembers(context.Background(), constants.TASK_REDIS_CRON_SETS).Val() + log.Infof("Found %d delayed jobs", Delays) + processor.ProcessJob(task.Redis, *task.Broker.Channel); broker.HandleReceive(task.Redis, *task.Broker) + }() - for _, member := range Members { - taskId := strings.Split(member, ":")[0] - name := strings.Split(member, ":")[1] + go func () { + Members := task.Redis.SMembers(context.Background(), constants.TASK_REDIS_CRON_SETS).Val() + + log.Infof("Found %d cron jobs", len(Members)) + + for _, member := range Members { + taskId := strings.Split(member, ":")[0] + name := strings.Split(member, ":")[1] + + processor.ProcessCronJob(task.Redis, *task.Broker.Channel, name, taskId) + } + }() - processor.ProcessCronJob(task.Redis, *task.Broker.Channel, name, taskId) - } return &task } diff --git a/processor/cron_processor.go b/processor/cron_processor.go index ebefe60..2688845 100644 --- a/processor/cron_processor.go +++ b/processor/cron_processor.go @@ -3,11 +3,13 @@ package processor import ( "context" "fmt" + "time" + "github.com/disgoorg/log" + "github.com/go-co-op/gocron" "github.com/nezuchan/scheduled-tasks/constants" "github.com/rabbitmq/amqp091-go" "github.com/redis/go-redis/v9" - "github.com/robfig/cron" ) func ProcessCronJob(client redis.UniversalClient, broker amqp091.Channel, name string, taskId string) { @@ -17,36 +19,40 @@ func ProcessCronJob(client redis.UniversalClient, broker amqp091.Channel, name s if CronValue == 1 && TaskKey == 1 { Cron := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name)).Val() - c := cron.New() - - c.AddFunc(Cron, func() { - Value := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val() - - if Value == "" { - c.Stop() - client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)) - client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name)) - client.SRem(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, name))).Err() - return - } - - Route := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val() - - if Route != "" { - broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, Route, false, false, amqp091.Publishing{ - ContentType: "text/plain", - Body: []byte(Value), - }) - } else { - broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, "*", false, false, amqp091.Publishing{ - ContentType: "text/plain", - Body: []byte(Value), - }) - } - }) - - c.Start() - + c := gocron.NewScheduler(time.UTC) + + c.Cron(Cron).Do( + func(){ + log.Infof("Sending cron job %s to client", name) + Value := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId)).Val() + + if Value == "" { + c.Stop() + client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId)) + client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)) + client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name)) + client.SRem(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, name))).Err() + c.Clear() + return + } + + Route := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val() + + if Route != "" { + broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, Route, false, false, amqp091.Publishing{ + ContentType: "text/plain", + Body: []byte(Value), + }) + } else { + broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, "*", false, false, amqp091.Publishing{ + ContentType: "text/plain", + Body: []byte(Value), + }) + } + }, + ) + + c.StartAsync() } else { client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)) client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name))