Skip to content

Commit

Permalink
WIP: Manage rework; handle reuse, use Queue for initial
Browse files Browse the repository at this point in the history
  • Loading branch information
earthboundkid committed Sep 30, 2024
1 parent 9cecbaf commit 14af29a
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 25 deletions.
26 changes: 16 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,20 +261,29 @@ task := func(u string) ([]string, error) {
return getLinks(page), nil
}

// Process the tasks with as many workers as GOMAXPROCS
manager := flowmatic.ManageTasks(flowmatic.MaxProcs, task)

// Map from page to links
// Doesn't need a lock because only the manager touches it
results := map[string][]string{}
var managerErr error

// Manager keeps track of which pages have been visited and the results graph
manager := func(req string, links []string, err error) ([]string, bool) {
// Prime the initial queue
manager.Queue("http://example.com/")

// Start execution and track of which pages have been visited
// and the results graph
for range manager.Exec() {
// Halt execution after the first error
if err != nil {
managerErr = err
return nil, false
if manager.HasErr() {
managerErr = manager.Error()
break
}
links := manager.Output()

// Save final results in map
results[req] = urls
results[manager.Input()] = links

// Check for new pages to scrape
var newpages []string
Expand All @@ -288,18 +297,15 @@ manager := func(req string, links []string, err error) ([]string, bool) {
// Add placeholder to map to prevent double scraping
results[link] = nil
}
return newpages, true
}

// Process the tasks with as many workers as GOMAXPROCS
flowmatic.ManageTasks(flowmatic.MaxProcs, task, manager, "http://example.com/")
// Check if anything went wrong
if managerErr != nil {
fmt.Println("error", managerErr)
}
```

Normally, it is very difficult to keep track of concurrent code because any combination of events could occur in any order or simultaneously, and each combination has to be accounted for by the programmer. `flowmatic.ManageTasks` makes it simple to write concurrent code because everything follows a simple rule: **tasks happen concurrently; the manager runs serially**.
Normally, it is very difficult to keep track of concurrent code because any combination of events could occur in any order or simultaneously, and each combination has to be accounted for by the programmer. `flowmatic.Manage` makes it simple to write concurrent code because everything follows a simple rule: **tasks happen concurrently; the manager runs serially**.

Centralizing control in the manager makes reasoning about the code radically simpler. When writing locking code, if you have M states and N methods, you need to think about all N states in each of the M methods, giving you an M × N code explosion. By centralizing the logic, the N states only need to be considered in one location: the manager.

Expand Down
6 changes: 3 additions & 3 deletions manage_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ func manageTasks[Input, Output any](numWorkers int, task Task[Input, Output], ma

// Manage creates a Manager to run tasks concurrently
// using numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1).
func Manage[Input, Output any](numWorkers int, task Task[Input, Output], initial ...Input) *Manager[Input, Output] {
func Manage[Input, Output any](numWorkers int, task Task[Input, Output]) *Manager[Input, Output] {
return &Manager[Input, Output]{
numWorkers: numWorkers,
task: task,
newItems: initial,
}
}

Expand All @@ -76,7 +75,7 @@ type Manager[Input, Output any] struct {
executing bool
}

func (m *Manager[Input, Output]) Start() func(func() bool) {
func (m *Manager[Input, Output]) Exec() func(func() bool) {
if m.executing {
panic("already executing")
}
Expand All @@ -92,6 +91,7 @@ func (m *Manager[Input, Output]) Start() func(func() bool) {
}

manageTasks(m.numWorkers, m.task, manager, m.newItems...)
m.executing = false
}
}

Expand Down
7 changes: 4 additions & 3 deletions manage_tasks_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/earthboundkid/flowmatic/v2"
)

func ExampleTasks() {
func ExampleManager() {
// Example site to crawl with recursive links
srv := httptest.NewServer(http.FileServer(http.FS(fstest.MapFS{
"index.html": &fstest.MapFile{
Expand Down Expand Up @@ -59,8 +59,9 @@ func ExampleTasks() {
tried := map[string]int{}

// Manage the tasks with as many workers as GOMAXPROCS
m := flowmatic.Manage(flowmatic.MaxProcs, task, "/")
for range m.Start() {
m := flowmatic.Manage(flowmatic.MaxProcs, task)
m.Queue("/")
for range m.Exec() {
req := m.Input()
if m.HasErr() {
// If there's a problem fetching a page, try three times
Expand Down
10 changes: 6 additions & 4 deletions manage_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ func TestManageTasks_drainage(t *testing.T) {
int
error
}{}
manager := flowmatic.Manage(2, task, 0, 1)
for range manager.Start() {
manager := flowmatic.Manage(2, task)
manager.Queue(0, 1)
for range manager.Exec() {
if time.Since(start) > sleepTime {
t.Fatal("slept too much")
}
Expand Down Expand Up @@ -66,8 +67,9 @@ func TestManageTasks_drainage2(t *testing.T) {
int
error
}{}
manager := flowmatic.Manage(2, task, 0, 1)
for range manager.Start() {
manager := flowmatic.Manage(2, task)
manager.Queue(0, 1)
for range manager.Exec() {
if time.Since(start) > sleepTime {
t.Fatal("slept too much")
}
Expand Down
49 changes: 44 additions & 5 deletions panic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func try(f func()) (r any) {
return
}

func TestTasks_panic(t *testing.T) {
func TestManage_panic(t *testing.T) {
task := func(n int) (int, error) {
if n == 3 {
panic("3!!")
Expand All @@ -27,10 +27,10 @@ func TestTasks_panic(t *testing.T) {
}
var triples []int
r := try(func() {
m := flowmatic.Manage(1, task, 1, 2, 3, 4, 5, 6, 7)
for range m.Start() {
_, out, _ := m.Values()
triples = append(triples, out)
m := flowmatic.Manage(1, task)
m.Queue(1, 2, 3, 4, 5, 6, 7)
for range m.Exec() {
triples = append(triples, m.Output())
}
})
if r == nil {
Expand All @@ -42,6 +42,45 @@ func TestTasks_panic(t *testing.T) {
if fmt.Sprint(triples) != "[3 6]" {
t.Fatal(triples)
}
task2 := func(n int) (int, error) {
return n * 3, nil
}
triples = nil
r2 := try(func() {
m := flowmatic.Manage(flowmatic.MaxProcs, task2)
m.Queue(1, 2, 3, 4, 5, 6, 7)
for range m.Exec() {
for range m.Exec() {
triples = append(triples, m.Output())
}
}
})
if r2 != "already executing" {
t.Fatal("should have panicked")
}
var triples2 []int
r3 := try(func() {
m := flowmatic.Manage(flowmatic.MaxProcs, task2)
m.Queue(1, 2, 3, 4)
for range m.Exec() {
triples = append(triples, m.Output())
}
slices.Sort(triples)
m.Queue(5, 6, 7, 8)
for range m.Exec() {
triples2 = append(triples2, m.Output())
}
slices.Sort(triples2)
})
if r3 != nil {
t.Fatal("should not have panicked")
}
if fmt.Sprint(triples) != "[3 6 9 12]" {
t.Fatal(triples)
}
if fmt.Sprint(triples2) != "[15 18 21 24]" {
t.Fatal(triples2)
}
}

func TestEach_panic(t *testing.T) {
Expand Down

0 comments on commit 14af29a

Please sign in to comment.