Skip to content

Commit

Permalink
fix: cronjob not running
Browse files Browse the repository at this point in the history
  • Loading branch information
KagChi committed May 22, 2023
1 parent b480fe9 commit 10eae83
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 47 deletions.
8 changes: 4 additions & 4 deletions broker/cron_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func CronJob(client redis.UniversalClient, broker Broker, message Message) ([]by
})
}

client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId), value, 0).Err()
client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, *message.D.Name), value, 0).Err()
client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId), value, 0).Err()
client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, *message.D.Name), timeVal, 0).Err()

if message.D.Route != nil {
err = client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId), *message.D.Route, 0).Err()
Expand All @@ -64,14 +64,14 @@ func CronJob(client redis.UniversalClient, broker Broker, message Message) ([]by
}
}

client.SAdd(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, *message.D.Name), fmt.Sprintf("%s:%s", taskId, *message.D.Name)).Err()
client.SAdd(context.Background(), constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, *message.D.Name)).Err()

log.Infof("Added task with ID %s to run every %v.\n", taskId, timeVal)

processor.ProcessCronJob(client, *broker.Channel, *message.D.Name, taskId)

return json.Marshal(map[string]interface{}{
"t": constants.TASK_DELAY,
"t": constants.TASK_CRON,
"d": map[string]interface{}{
"taskId": taskId,
},
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ require (
github.com/redis/go-redis/v9 v9.0.4
)

require github.com/robfig/cron v1.2.0
require github.com/go-co-op/gocron v1.27.1

require (
github.com/robfig/cron/v3 v3.0.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
26 changes: 22 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,53 @@ github.com/caarlos0/env/v8 v8.0.0 h1:POhxHhSpuxrLMIdvTGARuZqR4Jjm8AYmoi/JKlcScs0
github.com/caarlos0/env/v8 v8.0.0/go.mod h1:7K4wMY9bH0esiXSSHlfHLX5xKGQMnkH5Fk4TDSSSzfo=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/disgoorg/log v1.2.0 h1:sqlXnu/ZKAlIlHV9IO+dbMto7/hCQ474vlIdMWk8QKo=
github.com/disgoorg/log v1.2.0/go.mod h1:3x1KDG6DI1CE2pDwi3qlwT3wlXpeHW/5rVay+1qDqOo=
github.com/disgoorg/snowflake/v2 v2.0.1 h1:CuUxGLwggUxEswZOmZ+mZ5i0xSumQdXW9tXW7uGqe+0=
github.com/disgoorg/snowflake/v2 v2.0.1/go.mod h1:SPU9c2CNn5DSyb86QcKtdZgix9osEtKrHLW4rMhfLCs=
github.com/go-co-op/gocron v1.27.1 h1:fYmK6COvF3rdFBbB4yQGWaf6TKIMjPv+1oaFrVx9bl8=
github.com/go-co-op/gocron v1.27.1/go.mod h1:39f6KNSGVOU1LO/ZOoZfcSxwlsJDQOKSu8erN0SH48Y=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM=
github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc=
github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
24 changes: 17 additions & 7 deletions lib/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,25 @@ func InitTask(conf *config.Config) *Task {
log.Fatalf("Unable to declare exchange due to: %v", err)
}

processor.ProcessJob(task.Redis, *task.Broker.Channel); broker.HandleReceive(task.Redis, *task.Broker)
go func () {
Delays := task.Redis.ZCard(context.Background(), constants.TASK_REDIS_KEY).Val()

Members := task.Redis.SMembers(context.Background(), constants.TASK_REDIS_CRON_SETS).Val()
log.Infof("Found %d delayed jobs", Delays)
processor.ProcessJob(task.Redis, *task.Broker.Channel); broker.HandleReceive(task.Redis, *task.Broker)
}()

for _, member := range Members {
taskId := strings.Split(member, ":")[0]
name := strings.Split(member, ":")[1]
go func () {
Members := task.Redis.SMembers(context.Background(), constants.TASK_REDIS_CRON_SETS).Val()

log.Infof("Found %d cron jobs", len(Members))

for _, member := range Members {
taskId := strings.Split(member, ":")[0]
name := strings.Split(member, ":")[1]

processor.ProcessCronJob(task.Redis, *task.Broker.Channel, name, taskId)
}
}()

processor.ProcessCronJob(task.Redis, *task.Broker.Channel, name, taskId)
}
return &task
}
68 changes: 37 additions & 31 deletions processor/cron_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package processor
import (
"context"
"fmt"
"time"

"github.com/disgoorg/log"
"github.com/go-co-op/gocron"
"github.com/nezuchan/scheduled-tasks/constants"
"github.com/rabbitmq/amqp091-go"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron"
)

func ProcessCronJob(client redis.UniversalClient, broker amqp091.Channel, name string, taskId string) {
Expand All @@ -17,36 +19,40 @@ func ProcessCronJob(client redis.UniversalClient, broker amqp091.Channel, name s
if CronValue == 1 && TaskKey == 1 {
Cron := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name)).Val()

c := cron.New()

c.AddFunc(Cron, func() {
Value := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val()

if Value == "" {
c.Stop()
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId))
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name))
client.SRem(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, name))).Err()
return
}

Route := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val()

if Route != "" {
broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, Route, false, false, amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(Value),
})
} else {
broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, "*", false, false, amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(Value),
})
}
})

c.Start()

c := gocron.NewScheduler(time.UTC)

c.Cron(Cron).Do(
func(){
log.Infof("Sending cron job %s to client", name)
Value := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId)).Val()

if Value == "" {
c.Stop()
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId))
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId))
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name))
client.SRem(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, name))).Err()
c.Clear()
return
}

Route := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val()

if Route != "" {
broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, Route, false, false, amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(Value),
})
} else {
broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, "*", false, false, amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(Value),
})
}
},
)

c.StartAsync()
} else {
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId))
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name))
Expand Down

0 comments on commit 10eae83

Please sign in to comment.