Skip to content

Commit

Permalink
APIs take chan argument and return error
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Nov 25, 2024
1 parent 89890b4 commit cd25de8
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 99 deletions.
2 changes: 1 addition & 1 deletion client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts
if err != io.EOF {
return err
}
return nil
break
}

c, err := cid.Parse(out.Cid)
Expand Down
2 changes: 1 addition & 1 deletion client/rpc/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirE
if err != io.EOF {
return err
}
return nil
break
}

if len(link.Objects) != 1 {
Expand Down
9 changes: 5 additions & 4 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -143,9 +144,9 @@ The JSON output contains type information.
}

results := make(chan iface.DirEntry)
var lsErr error
lsErr := make(chan error, 1)
go func() {
lsErr = api.Unixfs().Ls(lsCtx, pth, results,
lsErr <- api.Unixfs().Ls(lsCtx, pth, results,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
}()

Expand Down Expand Up @@ -175,8 +176,8 @@ The JSON output contains type information.
return err
}
}
if lsErr != nil {
return lsErr
if err = <-lsErr; err != nil {
return err
}
dirDone(i)
}
Expand Down
8 changes: 4 additions & 4 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,13 +557,13 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}

var lsErr error
pins := make(chan iface.Pin)
pins := make(chan coreiface.Pin)
lsErr := make(chan error, 1)
lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

go func() {
lsErr = api.Pin().Ls(lsCtx, opt, pins, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
lsErr <- api.Pin().Ls(lsCtx, pins, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
}()

for p := range pins {
Expand All @@ -578,7 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}
return lsErr
return <-lsErr
}

const (
Expand Down
53 changes: 33 additions & 20 deletions core/commands/pin/remotepin.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,18 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
ctx, cancel := context.WithCancel(req.Context)
defer cancel()

psCh, errCh := lsRemote(ctx, req, c)
psCh := make(chan pinclient.PinStatusGetter)
lsErr := make(chan error, 1)
go func() {
lsErr <- lsRemote(ctx, req, c, psCh)
}()
for ps := range psCh {
if err := res.Emit(toRemotePinOutput(ps)); err != nil {
return err
}
}

return <-errCh
return <-lsErr
},
Type: RemotePinOutput{},
Encoders: cmds.EncoderMap{
Expand Down Expand Up @@ -347,7 +351,12 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out c
opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...))
}

return c.Ls(ctx, out, opts...)
rmtOut, rmtErr := c.Ls(ctx, opts...)
for p := range rmtOut {
out <- p
}
return <-rmtErr
//return c.Ls2(ctx, out, opts...)
}

var rmRemotePinCmd = &cmds.Command{
Expand Down Expand Up @@ -389,33 +398,37 @@ To list and then remove all pending pin requests, pass an explicit status list:
cmds.BoolOption(pinForceOptionName, "Allow removal of multiple pins matching the query without additional confirmation.").WithDefault(false),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
ctx, cancel := context.WithCancel(req.Context)
defer cancel()

c, err := getRemotePinServiceFromRequest(req, env)
if err != nil {
return err
}

rmIDs := []string{}
if len(req.Arguments) == 0 {
psCh, errCh := lsRemote(ctx, req, c)
for ps := range psCh {
rmIDs = append(rmIDs, ps.GetRequestId())
}
if err = <-errCh; err != nil {
return fmt.Errorf("error while listing remote pins: %v", err)
}

if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) {
return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal")
}
} else {
if len(req.Arguments) != 0 {
return fmt.Errorf("unexpected argument %q", req.Arguments[0])
}

psCh := make(chan pinclient.PinStatusGetter)
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(req.Context)
defer cancel()

go func() {
errCh <- lsRemote(ctx, req, c, psCh)
}()
for ps := range psCh {
rmIDs = append(rmIDs, ps.GetRequestId())
}
if err = <-errCh; err != nil {
return fmt.Errorf("error while listing remote pins: %v", err)
}

if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) {
return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal")
}

for _, rmID := range rmIDs {
if err := c.DeleteByID(ctx, rmID); err != nil {
if err = c.DeleteByID(ctx, rmID); err != nil {
return fmt.Errorf("removing pin identified by requestid=%q failed: %v", rmID, err)
}
}
Expand Down
90 changes: 35 additions & 55 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coreapi

import (
"context"
"errors"
"fmt"

blockservice "github.com/ipfs/boxo/blockservice"
Expand Down Expand Up @@ -197,18 +198,15 @@ func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error)

// Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format:
// `<link base58 hash> <link size in bytes> <link name>`
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, <-chan error) {
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- coreiface.DirEntry, opts ...options.UnixfsLsOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()

defer close(out)

settings, err := options.UnixfsLsOptions(opts...)
if err != nil {
errOut := make(chan error, 1)
errOut <- err
close(errOut)
out := make(chan coreiface.DirEntry)
close(out)
return out, errOut
return err
}

span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren))
Expand All @@ -218,28 +216,18 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.Unixf

dagnode, err := ses.ResolveNode(ctx, p)
if err != nil {
errOut := make(chan error, 1)
errOut <- err
close(errOut)
out := make(chan coreiface.DirEntry)
close(out)
return out, errOut
return err
}

dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode)
if err != nil {
if err == uio.ErrNotADir {
return uses.lsFromLinks(ctx, dagnode.Links(), settings)
if errors.Is(err, uio.ErrNotADir) {
return uses.lsFromLinks(ctx, dagnode.Links(), settings, out)
}
errOut := make(chan error, 1)
errOut <- err
close(errOut)
out := make(chan coreiface.DirEntry)
close(out)
return out, errOut
return err
}

return uses.lsFromLinksAsync(ctx, dir, settings)
return uses.lsFromDirLinks(ctx, dir, settings, out)
}

func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) (coreiface.DirEntry, error) {
Expand Down Expand Up @@ -300,55 +288,47 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se
return lnk, nil
}

func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) {
out := make(chan coreiface.DirEntry, uio.DefaultShardWidth)
errOut := make(chan error, 1)

go func() {
defer close(out)
defer close(errOut)

for l := range dir.EnumLinksAsync(ctx) {
dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel
if err != nil {
errOut <- err
return
}
select {
case out <- dirEnt:
case <-ctx.Done():
return
}
func (api *UnixfsAPI) lsFromDirLinks(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error {
for l := range dir.EnumLinksAsync(ctx) {
dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel
if err != nil {
return err
}
}()

return out, errOut
select {
case out <- dirEnt:
case <-ctx.Done():
return nil
}
}
return nil
}

func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) {
out := make(chan coreiface.DirEntry, uio.DefaultShardWidth)
errOut := make(chan error, 1)

func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error {
// Create links channel large enough to not block when writing to out is slower.
links := make(chan coreiface.DirEntry, len(ndlinks))
errs := make(chan error, 1)
go func() {
defer close(out)
defer close(errOut)

defer close(links)
defer close(errs)
for _, l := range ndlinks {
lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
dirEnt, err := api.processLink(ctx, lr, settings) // TODO: perf: processing can be done in background and in parallel
lnk, err := api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async
if err != nil {
errOut <- err
errs <- err
return
}
select {
case out <- dirEnt:
case links <- lnk:
case <-ctx.Done():
return
}
}
}()

return out, errOut
for lnk := range links {
out <- lnk
}
return <-errs
}

func (api *UnixfsAPI) core() *CoreAPI {
Expand Down
20 changes: 17 additions & 3 deletions core/coreiface/tests/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,11 @@ func (tp *TestSuite) TestLs(t *testing.T) {
t.Fatal(err)
}

entries, errCh := api.Unixfs().Ls(ctx, p)
errCh := make(chan error, 1)
entries := make(chan coreiface.DirEntry)
go func() {
errCh <- api.Unixfs().Ls(ctx, p, entries)
}()

entry, ok := <-entries
if !ok {
Expand Down Expand Up @@ -777,7 +781,12 @@ func (tp *TestSuite) TestLsEmptyDir(t *testing.T) {
t.Fatal(err)
}

links, errCh := api.Unixfs().Ls(ctx, p)
errCh := make(chan error, 1)
links := make(chan coreiface.DirEntry)
go func() {
errCh <- api.Unixfs().Ls(ctx, p, links)
}()

var count int
for range links {
count++
Expand Down Expand Up @@ -810,7 +819,12 @@ func (tp *TestSuite) TestLsNonUnixfs(t *testing.T) {
t.Fatal(err)
}

links, errCh := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()))
errCh := make(chan error, 1)
links := make(chan coreiface.DirEntry)
go func() {
errCh <- api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()), links)
}()

var count int
for range links {
count++
Expand Down
Loading

0 comments on commit cd25de8

Please sign in to comment.