Skip to content

Commit

Permalink
WIP: Use Manage and Manager instead of TaskResult
Browse files Browse the repository at this point in the history
  • Loading branch information
earthboundkid committed Sep 25, 2024
1 parent c67632b commit 9cecbaf
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 44 deletions.
87 changes: 59 additions & 28 deletions manage_tasks.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package flowmatic

import (
"iter"

"github.com/earthboundkid/deque/v2"
)

Expand Down Expand Up @@ -53,41 +51,74 @@ func manageTasks[Input, Output any](numWorkers int, task Task[Input, Output], ma
}
}

type TaskResult[Input, Output any] struct {
In Input
Out Output
Err error
newitems []Input
}

func (to *TaskResult[Input, Output]) HasErr() bool {
return to.Err != nil
}

func (to *TaskResult[Input, Output]) AddTask(in Input) {
to.newitems = append(to.newitems, in)
// Manage creates a Manager to run tasks concurrently
// using numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1).
func Manage[Input, Output any](numWorkers int, task Task[Input, Output], initial ...Input) *Manager[Input, Output] {
return &Manager[Input, Output]{
numWorkers: numWorkers,
task: task,
newItems: initial,
}
}

// Tasks runs tasks concurrently
// using numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1)
// Manager is TKTK TODO
// which a sequence of TaskResults yielded serially.
// To add more jobs to call AddTask on the TaskResult.
// To add more jobs to call Queue on the Manager.
// If a task panics during execution,
// the panic will be caught and rethrown.
func Tasks[Input, Output any](numWorkers int, task Task[Input, Output], initial ...Input) iter.Seq[*TaskResult[Input, Output]] {
return func(yield func(*TaskResult[Input, Output]) bool) {
type Manager[Input, Output any] struct {
numWorkers int
task Task[Input, Output]
newItems []Input
in Input
out Output
err error
executing bool
}

func (m *Manager[Input, Output]) Start() func(func() bool) {
if m.executing {
panic("already executing")
}
return func(yield func() bool) {
m.executing = true
manager := func(in Input, out Output, err error) ([]Input, bool) {
to := TaskResult[Input, Output]{
In: in,
Out: out,
Err: err,
}
if !yield(&to) {
m.in, m.out, m.err = in, out, err
m.newItems = nil
if !yield() {
return nil, false
}
return to.newitems, true
return m.newItems, true
}

manageTasks(numWorkers, task, manager, initial...)
manageTasks(m.numWorkers, m.task, manager, m.newItems...)
}
}

func (m *Manager[Input, Output]) Queue(in ...Input) {
m.newItems = append(m.newItems, in...)
}

func (m *Manager[Input, Output]) Input() Input {
return m.in
}

func (m *Manager[Input, Output]) Output() Output {
return m.out
}

func (m *Manager[Input, Output]) Error() error {
return m.err
}

func (m *Manager[Input, Output]) Result() (Output, error) {
return m.out, m.err
}

func (m *Manager[Input, Output]) Values() (Input, Output, error) {
return m.in, m.out, m.err
}

func (m *Manager[Input, Output]) HasErr() bool {
return m.err != nil
}
14 changes: 8 additions & 6 deletions manage_tasks_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,23 @@ func ExampleTasks() {
// tried tracks how many times a url has been queued to be fetched
tried := map[string]int{}

// Process the tasks with as many workers as GOMAXPROCS
for task := range flowmatic.Tasks(flowmatic.MaxProcs, task, "/") {
req, urls := task.In, task.Out
if task.HasErr() {
// Manage the tasks with as many workers as GOMAXPROCS
m := flowmatic.Manage(flowmatic.MaxProcs, task, "/")
for range m.Start() {
req := m.Input()
if m.HasErr() {
// If there's a problem fetching a page, try three times
if tried[req] < 3 {
tried[req]++
task.AddTask(req)
m.Queue(req)
}
continue
}
urls := m.Output()
results[req] = urls
for _, u := range urls {
if tried[u] == 0 {
task.AddTask(u)
m.Queue(u)
tried[u]++
}
}
Expand Down
20 changes: 12 additions & 8 deletions manage_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ func TestManageTasks_drainage(t *testing.T) {
int
error
}{}
for r := range flowmatic.Tasks(2, task, 0, 1) {
manager := flowmatic.Manage(2, task, 0, 1)
for range manager.Start() {
if time.Since(start) > sleepTime {
t.Fatal("sleep too much!")
t.Fatal("slept too much")
}
m[r.In] = struct {
in, out, err := manager.Values()
m[in] = struct {
int
error
}{r.Out, r.Err}
if r.HasErr() {
}{out, err}
if manager.HasErr() {
break
}
}
Expand Down Expand Up @@ -64,14 +66,16 @@ func TestManageTasks_drainage2(t *testing.T) {
int
error
}{}
for r := range flowmatic.Tasks(2, task, 0, 1) {
manager := flowmatic.Manage(2, task, 0, 1)
for range manager.Start() {
if time.Since(start) > sleepTime {
t.Fatal("slept too much")
}
m[r.In] = struct {
in, out, err := manager.Values()
m[in] = struct {
int
error
}{r.Out, r.Err}
}{out, err}
break
}
if s := fmt.Sprint(m); s != "map[1:-]" {
Expand Down
6 changes: 4 additions & 2 deletions panic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ func TestTasks_panic(t *testing.T) {
}
var triples []int
r := try(func() {
for r := range flowmatic.Tasks(1, task, 1, 2, 3, 4, 5, 6, 7) {
triples = append(triples, r.Out)
m := flowmatic.Manage(1, task, 1, 2, 3, 4, 5, 6, 7)
for range m.Start() {
_, out, _ := m.Values()
triples = append(triples, out)
}
})
if r == nil {
Expand Down

0 comments on commit 9cecbaf

Please sign in to comment.