Skip to content

Commit

Permalink
pangpanglabs#107 Add job to cronjob & Add default http api
Browse files Browse the repository at this point in the history
  • Loading branch information
Jang Jaehue committed Jan 16, 2020
1 parent 826ca8e commit 111171a
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 78 deletions.
55 changes: 0 additions & 55 deletions cronjob/behaviorlog.go
Original file line number Diff line number Diff line change
@@ -1,56 +1 @@
package cronjob

import (
"context"
"time"

"github.com/labstack/gommon/random"
"github.com/pangpanglabs/goutils/behaviorlog"
"github.com/pangpanglabs/goutils/kafka"
"github.com/sirupsen/logrus"
)

func BehaviorLogger(serviceName string, config kafka.Config) Middleware {
var producer *kafka.Producer
if config.Brokers != nil && config.Topic != "" {
if p, err := kafka.NewProducer(config.Brokers, config.Topic,
kafka.WithDefault(),
kafka.WithTLS(config.SSL)); err != nil {
logrus.Error("Create Kafka Producer Error", err)
} else {
producer = p
}
}

return func(next HandlerFunc) HandlerFunc {
return func(ctx context.Context) (err error) {
behaviorLogContext := behaviorlog.NewNopContext()
behaviorLogContext.Producer = producer
behaviorLogContext.Service = serviceName
behaviorLogContext.RequestID = random.String(32)
behaviorLogContext.ActionID = random.String(32)
behaviorLogContext.Timestamp = time.Now()
behaviorLogContext.RemoteIP = "127.0.0.1"
behaviorLogContext.Host = "127.0.0.1"

behaviorLogContext.Uri = ""
behaviorLogContext.Method = ""
behaviorLogContext.Path = ""
behaviorLogContext.Params = map[string]interface{}{}
behaviorLogContext.Referer = ""
behaviorLogContext.UserAgent = ""
behaviorLogContext.RequestLength = 0
behaviorLogContext.BizAttr = map[string]interface{}{}
behaviorLogContext.AuthToken = ""

err = next(context.WithValue(ctx, behaviorlog.LogContextName, behaviorLogContext))
if err != nil {
behaviorLogContext.Err = err.Error()
}

behaviorLogContext.Write()

return err
}
}
}
2 changes: 1 addition & 1 deletion cronjob/context_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func ContextDB(service string, xormEngine *xorm.Engine, kafkaConfig kafka.Config) Middleware {
ctxdb := ctxdb.New(xormEngine, service, kafkaConfig)

return func(next HandlerFunc) HandlerFunc {
return func(job, action string, next HandlerFunc) HandlerFunc {
return func(ctx context.Context) error {
session := ctxdb.NewSession(ctx)
defer session.Close()
Expand Down
64 changes: 46 additions & 18 deletions cronjob/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,83 @@ package cronjob

import (
"context"
"fmt"
"os"
"os/signal"
"log"
"net/http"

"github.com/pangpanglabs/goutils/echomiddleware"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
"github.com/robfig/cron"
"github.com/sirupsen/logrus"
)

type Cron struct {
name string
cron *cron.Cron
chain middlewareChain
}

func New() *Cron {
return &Cron{
cron: cron.New(),
}
type Job struct {
Name string
c *Cron
}
func Default(serviceName string, behaviorlogKafkaConfig echomiddleware.KafkaConfig) *Cron {

func New(name string) *Cron {
c := Cron{
name: name,
cron: cron.New(),
}

c.chain.append(
BehaviorLogger(serviceName, behaviorlogKafkaConfig),
Recover(),
)

return &c
}

func (c *Cron) Use(middlewares ...Middleware) {
c.chain.append(middlewares...)
}
func (c *Cron) AddFunc(spec string, f HandlerFunc) {

func (c *Cron) AddJob(job string) *Job {
return &Job{
Name: job,
}
}
func (j *Job) AddAction(action, spec string, f HandlerFunc) *Job {
j.c.cron.AddFunc(spec, func() {
ctx := context.Background()
if err := j.c.chain.run(j.Name, action, f)(ctx); err != nil {
logrus.WithError(err).Error("")
}
})
return j
}
func (c *Cron) AddAction(action, spec string, f HandlerFunc) {
c.cron.AddFunc(spec, func() {
ctx := context.Background()

if err := c.chain.run(f)(ctx); err != nil {
if err := c.chain.run(c.name, action, f)(ctx); err != nil {
logrus.WithError(err).Error("")
}
})
}

func (c *Cron) Start() error {
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
func (c *Cron) Start(address string) {
go c.cron.Start()

c.cron.Start()
e := echo.New()
e.GET("/ping", func(ctx echo.Context) error {
return ctx.String(http.StatusOK, "pong")
})
e.GET("/entries", func(ctx echo.Context) error {
return ctx.JSON(http.StatusOK, c.cron.Entries())
})

return fmt.Errorf("Got signal: %v", <-quit)
e.Pre(middleware.RemoveTrailingSlash())
e.Use(middleware.Recover())
e.Use(middleware.CORS())

if err := e.Start(address); err != nil {
log.Println(err)
}
c.cron.Stop()
}
6 changes: 3 additions & 3 deletions cronjob/middleware_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package cronjob
import "context"

type HandlerFunc func(ctx context.Context) error
type Middleware func(next HandlerFunc) HandlerFunc
type Middleware func(job, action string, next HandlerFunc) HandlerFunc
type middlewareChain struct {
middlewares []Middleware
}

func (c middlewareChain) run(f HandlerFunc) HandlerFunc {
func (c middlewareChain) run(job, action string, f HandlerFunc) HandlerFunc {
for i := range c.middlewares {
f = c.middlewares[len(c.middlewares)-1-i](f)
f = c.middlewares[len(c.middlewares)-1-i](job, action, f)
}

return f
Expand Down
2 changes: 1 addition & 1 deletion cronjob/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Recover() Middleware {
disableStackAll := false
disablePrintStack := false

return func(next HandlerFunc) HandlerFunc {
return func(job, action string, next HandlerFunc) HandlerFunc {
return func(ctx context.Context) error {

defer func() {
Expand Down

0 comments on commit 111171a

Please sign in to comment.