Skip to content

Commit

Permalink
Save WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
earthboundkid committed Sep 5, 2024
1 parent 7f30500 commit 8fcede2
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 37 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/earthboundkid/flowmatic

go 1.22
go 1.23

require github.com/carlmjohnson/deque v0.23.1
require github.com/earthboundkid/deque/v2 v2.24.2
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
github.com/carlmjohnson/deque v0.23.1 h1:X2HOJM9xcglY03deMZ0oZ1V2xtbqYV7dJDnZiSZN4Ak=
github.com/carlmjohnson/deque v0.23.1/go.mod h1:LF5NJjICBrEOPx84pxPL4nCimy5n9NQjxKi5cXkh+8U=
github.com/earthboundkid/deque/v2 v2.24.2 h1:U0vh6utzBx922tezr53ryt2tOIJ1GUKMABmYpxCgV48=
github.com/earthboundkid/deque/v2 v2.24.2/go.mod h1:k/HnjdCUwuMdqNzbS2exS37GEXJBzHSpNBVppL/nEHg=
30 changes: 16 additions & 14 deletions manage_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package flowmatic
import (
"iter"

"github.com/carlmjohnson/deque"
"github.com/earthboundkid/deque/v2"
)

// manager is a function that serially examines Task results to see if it produced any new Inputs.
Expand Down Expand Up @@ -53,37 +53,39 @@ func manageTasks[Input, Output any](numWorkers int, task Task[Input, Output], ma
}
}

type TaskOutput[Input, Output any] struct {
type TaskResult[Input, Output any] struct {
In Input
Out Output
Err error
pushtask func(Input)
newitems []Input
}

func (to *TaskOutput[Input, Output]) HasErr() bool {
func (to *TaskResult[Input, Output]) HasErr() bool {
return to.Err != nil
}

func (to *TaskOutput[Input, Output]) AddTask(in Input) {
to.pushtask(in)
func (to *TaskResult[Input, Output]) AddTask(in Input) {
to.newitems = append(to.newitems, in)
}

func Tasks[Input, Output any](numWorkers int, task Task[Input, Output], initial ...Input) iter.Seq[*TaskOutput[Input, Output]] {
return func(yield func(*TaskOutput[Input, Output]) bool) {
// Tasks runs tasks concurrently
// using numWorkers concurrent workers (or GOMAXPROCS workers if numWorkers < 1)
// which a sequence of TaskResults yielded serially.
// To add more jobs to call AddTask on the TaskResult.
// If a task panics during execution,
// the panic will be caught and rethrown.
func Tasks[Input, Output any](numWorkers int, task Task[Input, Output], initial ...Input) iter.Seq[*TaskResult[Input, Output]] {
return func(yield func(*TaskResult[Input, Output]) bool) {
manager := func(in Input, out Output, err error) ([]Input, bool) {
var newitems []Input
to := TaskOutput[Input, Output]{
to := TaskResult[Input, Output]{
In: in,
Out: out,
Err: err,
pushtask: func(newin Input) {
newitems = append(newitems, newin)
},
}
if !yield(&to) {
return nil, false
}
return newitems, true
return to.newitems, true
}

manageTasks(numWorkers, task, manager, initial...)
Expand Down
35 changes: 16 additions & 19 deletions manage_tasks_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flowmatic_test
import (
"fmt"
"io"
"maps"
"net/http"
"net/http/httptest"
"slices"
Expand All @@ -12,7 +13,7 @@ import (
"github.com/earthboundkid/flowmatic"
)

func ExampleAllTasks() {
func ExampleTasks() {
// Example site to crawl with recursive links
srv := httptest.NewServer(http.FileServer(http.FS(fstest.MapFS{
"index.html": &fstest.MapFile{
Expand All @@ -28,7 +29,7 @@ func ExampleAllTasks() {
Data: []byte("/c.html"),
},
"c.html": &fstest.MapFile{
Data: []byte("/"),
Data: []byte("/\n/x.html"),
},
})))
defer srv.Close()
Expand All @@ -41,6 +42,9 @@ func ExampleAllTasks() {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("bad response: %q", u)
}
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
Expand All @@ -49,39 +53,32 @@ func ExampleAllTasks() {
return strings.Split(string(body), "\n"), nil
}

// Manager keeps track of which pages have been visited and the results graph
tried := map[string]int{}
// results tracks which urls a url links to
results := map[string][]string{}
// tried tracks how many times a url has been queued to be fetched
tried := map[string]int{}

// Process the tasks with as many workers as GOMAXPROCS
it := flowmatic.Tasks(flowmatic.MaxProcs, task, "/")
for r := range it {
req := r.In
urls := r.Out
if r.HasErr() {
for task := range flowmatic.Tasks(flowmatic.MaxProcs, task, "/") {
req, urls := task.In, task.Out
if task.HasErr() {
// If there's a problem fetching a page, try three times
if tried[req] < 3 {
tried[req]++
r.AddTask(req)
continue
task.AddTask(req)
}
break
continue
}
results[req] = urls
for _, u := range urls {
if tried[u] == 0 {
r.AddTask(u)
task.AddTask(u)
tried[u]++
}
}
}

keys := make([]string, 0, len(results))
for key := range results {
keys = append(keys, key)
}
slices.Sort(keys)
for _, key := range keys {
for _, key := range slices.Sorted(maps.Keys(results)) {
fmt.Println(key, "links to:")
for _, v := range results[key] {
fmt.Println("- ", v)
Expand Down

0 comments on commit 8fcede2

Please sign in to comment.