From cd25de85ce7e22e22a2d0cfc2f0f255366366e1c Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 25 Nov 2024 12:36:56 -1000 Subject: [PATCH] APIs take chan argument and return error --- client/rpc/pin.go | 2 +- client/rpc/unixfs.go | 2 +- core/commands/ls.go | 9 ++-- core/commands/pin/pin.go | 8 +-- core/commands/pin/remotepin.go | 53 ++++++++++++-------- core/coreapi/unixfs.go | 90 +++++++++++++--------------------- core/coreiface/tests/unixfs.go | 20 ++++++-- core/coreiface/unixfs.go | 30 +++++++----- 8 files changed, 115 insertions(+), 99 deletions(-) diff --git a/client/rpc/pin.go b/client/rpc/pin.go index c63ca515ff9..bceb5498cea 100644 --- a/client/rpc/pin.go +++ b/client/rpc/pin.go @@ -87,7 +87,7 @@ func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts if err != io.EOF { return err } - return nil + break } c, err := cid.Parse(out.Cid) diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go index 70eb92a3a1d..faa6086c240 100644 --- a/client/rpc/unixfs.go +++ b/client/rpc/unixfs.go @@ -173,7 +173,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirE if err != io.EOF { return err } - return nil + break } if len(link.Objects) != 1 { diff --git a/core/commands/ls.go b/core/commands/ls.go index da1d9ebc530..bdd475d96cb 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -1,6 +1,7 @@ package commands import ( + "context" "fmt" "io" "os" @@ -143,9 +144,9 @@ The JSON output contains type information. } results := make(chan iface.DirEntry) - var lsErr error + lsErr := make(chan error, 1) go func() { - lsErr = api.Unixfs().Ls(lsCtx, pth, results, + lsErr <- api.Unixfs().Ls(lsCtx, pth, results, options.Unixfs.ResolveChildren(resolveSize || resolveType)) }() @@ -175,8 +176,8 @@ The JSON output contains type information. return err } } - if lsErr != nil { - return lsErr + if err = <-lsErr; err != nil { + return err } dirDone(i) } diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index a73e352e268..428a75b695d 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -557,13 +557,13 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api panic("unhandled pin type") } - var lsErr error - pins := make(chan iface.Pin) + pins := make(chan coreiface.Pin) + lsErr := make(chan error, 1) lsCtx, cancel := context.WithCancel(req.Context) defer cancel() go func() { - lsErr = api.Pin().Ls(lsCtx, opt, pins, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name)) + lsErr <- api.Pin().Ls(lsCtx, pins, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name)) }() for p := range pins { @@ -578,7 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api return err } } - return lsErr + return <-lsErr } const ( diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go index 7319bcd9644..fc852fbbfc6 100644 --- a/core/commands/pin/remotepin.go +++ b/core/commands/pin/remotepin.go @@ -293,14 +293,18 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states. ctx, cancel := context.WithCancel(req.Context) defer cancel() - psCh, errCh := lsRemote(ctx, req, c) + psCh := make(chan pinclient.PinStatusGetter) + lsErr := make(chan error, 1) + go func() { + lsErr <- lsRemote(ctx, req, c, psCh) + }() for ps := range psCh { if err := res.Emit(toRemotePinOutput(ps)); err != nil { return err } } - return <-errCh + return <-lsErr }, Type: RemotePinOutput{}, Encoders: cmds.EncoderMap{ @@ -347,7 +351,12 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out c opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...)) } - return c.Ls(ctx, out, opts...) + rmtOut, rmtErr := c.Ls(ctx, opts...) + for p := range rmtOut { + out <- p + } + return <-rmtErr + //return c.Ls2(ctx, out, opts...) } var rmRemotePinCmd = &cmds.Command{ @@ -389,33 +398,37 @@ To list and then remove all pending pin requests, pass an explicit status list: cmds.BoolOption(pinForceOptionName, "Allow removal of multiple pins matching the query without additional confirmation.").WithDefault(false), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - ctx, cancel := context.WithCancel(req.Context) - defer cancel() - c, err := getRemotePinServiceFromRequest(req, env) if err != nil { return err } rmIDs := []string{} - if len(req.Arguments) == 0 { - psCh, errCh := lsRemote(ctx, req, c) - for ps := range psCh { - rmIDs = append(rmIDs, ps.GetRequestId()) - } - if err = <-errCh; err != nil { - return fmt.Errorf("error while listing remote pins: %v", err) - } - - if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) { - return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal") - } - } else { + if len(req.Arguments) != 0 { return fmt.Errorf("unexpected argument %q", req.Arguments[0]) } + psCh := make(chan pinclient.PinStatusGetter) + errCh := make(chan error, 1) + ctx, cancel := context.WithCancel(req.Context) + defer cancel() + + go func() { + errCh <- lsRemote(ctx, req, c, psCh) + }() + for ps := range psCh { + rmIDs = append(rmIDs, ps.GetRequestId()) + } + if err = <-errCh; err != nil { + return fmt.Errorf("error while listing remote pins: %v", err) + } + + if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) { + return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal") + } + for _, rmID := range rmIDs { - if err := c.DeleteByID(ctx, rmID); err != nil { + if err = c.DeleteByID(ctx, rmID); err != nil { return fmt.Errorf("removing pin identified by requestid=%q failed: %v", rmID, err) } } diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index 8689834e577..d52ed75ccd9 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -2,6 +2,7 @@ package coreapi import ( "context" + "errors" "fmt" blockservice "github.com/ipfs/boxo/blockservice" @@ -197,18 +198,15 @@ func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error) // Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format: // ` ` -func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, <-chan error) { +func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- coreiface.DirEntry, opts ...options.UnixfsLsOption) error { ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String()))) defer span.End() + defer close(out) + settings, err := options.UnixfsLsOptions(opts...) if err != nil { - errOut := make(chan error, 1) - errOut <- err - close(errOut) - out := make(chan coreiface.DirEntry) - close(out) - return out, errOut + return err } span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren)) @@ -218,28 +216,18 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.Unixf dagnode, err := ses.ResolveNode(ctx, p) if err != nil { - errOut := make(chan error, 1) - errOut <- err - close(errOut) - out := make(chan coreiface.DirEntry) - close(out) - return out, errOut + return err } dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode) if err != nil { - if err == uio.ErrNotADir { - return uses.lsFromLinks(ctx, dagnode.Links(), settings) + if errors.Is(err, uio.ErrNotADir) { + return uses.lsFromLinks(ctx, dagnode.Links(), settings, out) } - errOut := make(chan error, 1) - errOut <- err - close(errOut) - out := make(chan coreiface.DirEntry) - close(out) - return out, errOut + return err } - return uses.lsFromLinksAsync(ctx, dir, settings) + return uses.lsFromDirLinks(ctx, dir, settings, out) } func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) (coreiface.DirEntry, error) { @@ -300,55 +288,47 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se return lnk, nil } -func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) { - out := make(chan coreiface.DirEntry, uio.DefaultShardWidth) - errOut := make(chan error, 1) - - go func() { - defer close(out) - defer close(errOut) - - for l := range dir.EnumLinksAsync(ctx) { - dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel - if err != nil { - errOut <- err - return - } - select { - case out <- dirEnt: - case <-ctx.Done(): - return - } +func (api *UnixfsAPI) lsFromDirLinks(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error { + for l := range dir.EnumLinksAsync(ctx) { + dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel + if err != nil { + return err } - }() - - return out, errOut + select { + case out <- dirEnt: + case <-ctx.Done(): + return nil + } + } + return nil } -func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, <-chan error) { - out := make(chan coreiface.DirEntry, uio.DefaultShardWidth) - errOut := make(chan error, 1) - +func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error { + // Create links channel large enough to not block when writing to out is slower. + links := make(chan coreiface.DirEntry, len(ndlinks)) + errs := make(chan error, 1) go func() { - defer close(out) - defer close(errOut) - + defer close(links) + defer close(errs) for _, l := range ndlinks { lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}} - dirEnt, err := api.processLink(ctx, lr, settings) // TODO: perf: processing can be done in background and in parallel + lnk, err := api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async if err != nil { - errOut <- err + errs <- err return } select { - case out <- dirEnt: + case links <- lnk: case <-ctx.Done(): return } } }() - return out, errOut + for lnk := range links { + out <- lnk + } + return <-errs } func (api *UnixfsAPI) core() *CoreAPI { diff --git a/core/coreiface/tests/unixfs.go b/core/coreiface/tests/unixfs.go index e806ae9baef..987d39b2620 100644 --- a/core/coreiface/tests/unixfs.go +++ b/core/coreiface/tests/unixfs.go @@ -681,7 +681,11 @@ func (tp *TestSuite) TestLs(t *testing.T) { t.Fatal(err) } - entries, errCh := api.Unixfs().Ls(ctx, p) + errCh := make(chan error, 1) + entries := make(chan coreiface.DirEntry) + go func() { + errCh <- api.Unixfs().Ls(ctx, p, entries) + }() entry, ok := <-entries if !ok { @@ -777,7 +781,12 @@ func (tp *TestSuite) TestLsEmptyDir(t *testing.T) { t.Fatal(err) } - links, errCh := api.Unixfs().Ls(ctx, p) + errCh := make(chan error, 1) + links := make(chan coreiface.DirEntry) + go func() { + errCh <- api.Unixfs().Ls(ctx, p, links) + }() + var count int for range links { count++ @@ -810,7 +819,12 @@ func (tp *TestSuite) TestLsNonUnixfs(t *testing.T) { t.Fatal(err) } - links, errCh := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid())) + errCh := make(chan error, 1) + links := make(chan coreiface.DirEntry) + go func() { + errCh <- api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()), links) + }() + var count int for range links { count++ diff --git a/core/coreiface/unixfs.go b/core/coreiface/unixfs.go index 6bdcdfa5045..10371998c20 100644 --- a/core/coreiface/unixfs.go +++ b/core/coreiface/unixfs.go @@ -80,22 +80,25 @@ type UnixfsAPI interface { // to operations performed on the returned file Get(context.Context, path.Path) (files.Node, error) - // Ls returns the list of links in a directory. Links aren't guaranteed to - // be returned in order. If an error occurs, the DirEntry channel is closed - // and an error is output on the error channel. Both channels are closed if - // the context is canceled. + // Ls writes the links in a directory to the DirEntry channel. Links aren't + // guaranteed to be returned in order. If an error occurs or the context is + // canceled, the DirEntry channel is closed and an error is returned. // // Example: // - // dirs, errs := Ls(ctx, p) + // dirs := make(chan DirEntry) + // lsErr := make(chan error, 1) + // go func() { + // lsErr <- Ls(ctx, p, dirs) + // }() // for dirEnt := range dirs { // fmt.Println("Dir name:", dirEnt.Name) // } - // err := <-errs + // err := <-lsErr // if err != nil { // return fmt.Errorf("error listing directory: %w", err) // } - Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, <-chan error) + Ls(context.Context, path.Path, chan<- DirEntry, ...options.UnixfsLsOption) error } // LsIter returns a go iterator that allows ranging over DirEntry results. @@ -114,13 +117,18 @@ func LsIter(ctx context.Context, api UnixfsAPI, p path.Path, opts ...options.Uni return func(yield func(DirEntry, error) bool) { ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel Ls if done iterating early - results, asyncErr := api.Ls(ctx, p, opts...) - for result := range results { - if !yield(result, nil) { + + dirs := make(chan DirEntry) + lsErr := make(chan error, 1) + go func() { + lsErr <- api.Ls(ctx, p, dirs, opts...) + }() + for dirEnt := range dirs { + if !yield(dirEnt, nil) { return } } - if err := <-asyncErr; err != nil { + if err := <-lsErr; err != nil { yield(DirEntry{}, err) } }