Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node.Cron implementation #205

Open
wants to merge 4 commits into
base: v310
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ To inspect the node, network stack, running applications, and processes, you can
To install the Observer tool, you need to have the Go compiler version 1.20 or higher. Run the following command:

```
$ go install ergo.services/tools/observer@latest
$ go install ergo.tools/observer@latest
```

You can also embed the [Observer application](https://docs.ergo.services/extra-library/applications/observer) into your node. To see it in action, see example `demo` at https://github.com/ergo-services/examples. For more information https://docs.ergo.services/tools/observer
Expand All @@ -61,7 +61,7 @@ For a quick start, use the [`ergo`](https://docs.ergo.services/tools/ergo) tool
To install use the following command:

```
$ go install ergo.services/tools/ergo@latest
$ go install ergo.tools/ergo@latest
```

Now, you can create your project with just one command. Here is example:
Expand Down Expand Up @@ -131,23 +131,8 @@ Starting from version 3.0.0, support for the Erlang network stack has been moved

Fully detailed changelog see in the [ChangeLog](CHANGELOG.md) file.

#### [v3.0.0](https://github.com/ergo-services/ergo/releases/tag/v1.999.300) 2024-09-04 [tag version v1.999.300] ####
#### [v3.1.0](https://github.com/ergo-services/ergo/releases/tag/v1.999.310) 2025-00-00 [tag version v1.999.310] ####

This version marks a significant milestone in the evolution of the Ergo Framework. The framework's design has been completely overhauled, and this version was built from the ground up. It includes:

- Significant API Improvements: The `gen.Process`, `gen.Node`, and `gen.Network` interfaces have been enhanced with numerous convenient methods.
- A New Network Stack: This version introduces a completely new network stack for improved performance and flexibility. See https://github.com/ergo-services/benchmarks for the details

Alongside the release of Ergo Framework 3.0.0, new tools and an additional components library are also introduced:

- Tools (observer, saturn) https://github.com/ergo-services/tools
- Loggers (rotate, colored) - https://github.com/ergo-services/logger
- Meta (websocket) - https://github.com/ergo-services/meta
- Application (observer) - https://github.com/ergo-services/application
- Registrar (client Saturn) - https://github.com/ergo-services/registrar
- Proto (erlang23) - https://github.com/ergo-services/proto

Finally, we've published comprehensive documentation for the framework, providing detailed guides to assist you in leveraging all the capabilities of Ergo Framework effectively. Its available at https://docs.ergo.services.

### Development and debugging ###

Expand Down
76 changes: 76 additions & 0 deletions gen/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package gen

import (
"time"
)

type CronOptions struct {
Jobs []CronJob
}

type Cron interface {
AddJob(job CronJob) error
RemoveJob(name Atom) error
EnableJob(name Atom) error
DisableJob(name Atom) error
// Jobs() []Job
// Job(name Atom) (Job, error)
Info() CronInfo
}

type CronJob struct {
// Name job name
Name Atom
// Spec time spec in "crontab" format
Spec string
// Location defines timezone
Location *time.Location
// Action can be either CronActionMessage, CronActionSpawn, CronActionRemoteSpawn
Action any
}

type CronActionMessage struct {
// Process defines where to send MessageCron. Can be local or remote one.
Process ProcessID

// Fallback process name if Process isn't reachable.
// MessageCronFallback will be sent with the details.
Fallback ProcessFallback
}

type CronActionSpawn struct {
// Register use registered name for the spawned process
Register Atom
// ProcessFactory
ProcessFactory ProcessFactory
// ProcessOptions
ProcessOptions ProcessOptions
// Args
Args []any

// Fallback process name if the spawning process has failed.
// MessageCronFallback will be sent with the details.
Fallback ProcessFallback
}
type CronActionRemoteSpawn struct {
// Node remote node name
Node Atom
// Name of the remote process factory
Name Atom

// Register use registered name for the spawned process
Register Atom
ProcessOptions ProcessOptions
Args []any

// Fallback process name if the spawning process has failed.
// MessageCronFallback will be sent with the details.
Fallback ProcessFallback
}

type CronInfo struct {
Jobs []CronJobInfo
}

type CronJobInfo struct {
}
13 changes: 13 additions & 0 deletions gen/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,16 @@ type MessageLogNetwork struct {
Peer Atom
Creation int64
}

type MessageCron struct {
Node Atom
Job Atom
Time time.Time
}

type MessageCronFallback struct {
Job Atom
Tag string
Time time.Time
Err error
}
4 changes: 4 additions & 0 deletions gen/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type Node interface {
NetworkStop() error
Network() Network

Cron() Cron

CertManager() CertManager

Security() SecurityOptions
Expand Down Expand Up @@ -220,6 +222,8 @@ type NodeOptions struct {
Env map[Env]any
// Network
Network NetworkOptions
// Cron
Cron CronOptions
// CertManager
CertManager CertManager
// Security options
Expand Down
236 changes: 236 additions & 0 deletions node/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package node

import (
"ergo.services/ergo/gen"
"ergo.services/ergo/lib"
"fmt"
"sync"
"time"
)

type cronNode interface {
Name() gen.Atom
Log() gen.Log
IsAlive() bool

Send(to any, message any) error
Spawn(factory gen.ProcessFactory, options gen.ProcessOptions, args ...any) (gen.PID, error)
SpawnRegister(register gen.Atom, factory gen.ProcessFactory, options gen.ProcessOptions, args ...any) (gen.PID, error)
}

type cron struct {
cronNode

sync.RWMutex

jobs map[gen.Atom]*cronJob
spool lib.QueueMPSC

timer *time.Timer
}

func createCron(node cronNode) *cron {
c := &cron{
cronNode: node,
jobs: make(map[gen.Atom]*cronJob),
spool: lib.NewQueueMPSC(),
}

// run every minute
now := time.Now()
next := now.Add(time.Minute).Truncate(time.Minute)
in := next.Sub(now)

c.timer = time.AfterFunc(in, func() {
if node.IsAlive() == false {
// node terminated
return
}
actionTime := time.Now().Truncate(time.Minute)
for {

item, ok := c.spool.Pop()
if ok == false {
break
}
cj := item.(*cronJob)
if cj.disable == true {
continue
}

// DO the job
go cj.do(actionTime)
}

now := time.Now()
next := now.Add(time.Minute).Truncate(time.Minute)
in := next.Sub(now)
c.timer.Reset(in)
c.schedule()
})

return c
}

func (c *cron) AddJob(job gen.CronJob) error {
if job.Name == "" {
return fmt.Errorf("empty job name")
}

switch ac := job.Action.(type) {
case gen.CronActionMessage:
if ac.Process.Name == "" {
return fmt.Errorf("incorrect value in gen.CronActionMessage.Process")
}

case gen.CronActionSpawn:
if ac.ProcessFactory == nil {
return fmt.Errorf("nil value in gen.CronActionSpawn.ProcessFactory")
}
case gen.CronActionRemoteSpawn:
if ac.Node == "" {
return fmt.Errorf("empty Node value in gen.CronActionRemoteSpawn")
}
if ac.Name == "" {
return fmt.Errorf("empty Name value in gen.CronActionRemoteSpawn")
}
default:
return fmt.Errorf("unknown action type %T", job.Action)
}

if job.Location == nil {
job.Location = time.Local
}

mask, err := cronParseSpec(job)
if err != nil {
return err
}

cj := &cronJob{
job: job,
cronNode: c.cronNode,
mask: mask,
}

c.Lock()
if _, exist := c.jobs[job.Name]; exist {
c.Unlock()
return gen.ErrTaken
}

c.jobs[job.Name] = cj
c.Unlock()

c.scheduleJob(cj)
return nil
}

func (c *cron) RemoveJob(name gen.Atom) error {

return nil
}

func (c *cron) EnableJob(name gen.Atom) error {

return nil
}

func (c *cron) DisableJob(name gen.Atom) error {
return nil
}

func (c *cron) Info() gen.CronInfo {
var info gen.CronInfo

return info
}

func (c *cron) terminate() {
c.timer.Stop()
}

func (c *cron) schedule() {
c.RLock()
defer c.RUnlock()
for _, cj := range c.jobs {
c.scheduleJob(cj)
}
}

func (c *cron) scheduleJob(cj *cronJob) {
if cj.disable == true {
return
}
spoolNextRun := time.Now().Add(time.Minute).Truncate(time.Minute)
if cj.mask.IsRunAt(spoolNextRun) == false {
return
}
c.spool.Push(cj)
}

// internal job

type cronJob struct {
cronNode

disable bool

job gen.CronJob
mask cronSpecMask

last time.Time
lastErr error
}

func (cj *cronJob) do(actionTime time.Time) {
if cj.disable {
return
}

// check if actionTime is actually now
// no time adjustment happened,
// no Day Light Saving happened
now := time.Now().In(cj.job.Location).Truncate(time.Minute)
if now != actionTime {
// do nothing
cj.Log().Debug("ignore job %s action time != now", cj.job.Name)
return
}

switch action := cj.job.Action.(type) {
case gen.CronActionMessage:
message := gen.MessageCron{
Node: cj.Name(),
Job: cj.job.Name,
Time: actionTime,
}

err := cj.Send(action.Process, message)
cj.last = actionTime
if err == nil {
cj.lastErr = nil
cj.Log().Info("(cron) %q has completed (sent message to: %s)",
message.Job, action.Process)
return
}
cj.lastErr = fmt.Errorf("unable to send cron message: %w", err)

if action.Fallback.Enable == false {
return
}
messageFallback := gen.MessageCronFallback{
Job: message.Job,
Tag: action.Fallback.Tag,
Time: message.Time,
Err: cj.lastErr,
}
cj.Send(action.Fallback.Name, messageFallback)
return

case gen.CronActionSpawn:
// TODO
case gen.CronActionRemoteSpawn:
// TODO
}
}
Loading