Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gsoc subscribe #4901

Merged
merged 61 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
d6836dc
feat: gsoc
nugaon Jul 16, 2024
7378e32
feat: add gsoc listener to pull and pushsync
nugaon Jul 16, 2024
8baba8d
feat: gsoc subscribe api
nugaon Jul 16, 2024
3f80697
fix: gsoc address path parsing
nugaon Jul 17, 2024
e95fbd7
test: gsoc as param for testServer
nugaon Jul 17, 2024
4951d6f
test: gsoc api
nugaon Jul 17, 2024
455cf73
test: add empty function for so clisten in pushsync t
nugaon Jul 18, 2024
b970494
refactor: remove unused pusher
nugaon Jul 18, 2024
e07b696
docs: gsoc openapi
nugaon Jul 18, 2024
ee12d03
test: unit
nugaon Jul 18, 2024
858ff26
docs: fix yaml indentation
nugaon Jul 18, 2024
d979ee5
feat: add new error handling
nugaon Jul 25, 2024
e58b7e7
feat: logger in gsoc listener
nugaon Jul 26, 2024
713a985
refactor: handle instead of handler
nugaon Sep 6, 2024
3008299
docs: copypastes
nugaon Sep 6, 2024
36624ef
refactor: rename register to subscribe
nugaon Sep 6, 2024
99b5e1f
refactor: unnecessary go call on gsoc handler
nugaon Sep 6, 2024
6744b5f
feat: identity address in pull sync
nugaon Sep 11, 2024
bb0909a
test: multiple payload push
nugaon Sep 11, 2024
fca83e6
test: gsoc listener
nugaon Sep 11, 2024
3284476
fix: param mismatch after rebasing
nugaon Sep 11, 2024
37378e5
fix: idAddress in pushsync where it is needed
nugaon Sep 12, 2024
25bb9bd
test: working signature for pushsync
nugaon Sep 12, 2024
1178a90
refactor: log id_address on push failiure
nugaon Sep 25, 2024
837563d
feat: id address usage on pusher and its inflight handling
nugaon Sep 25, 2024
f5ad179
fix: remove unnecessary stamp higher condition
nugaon Sep 25, 2024
80858f4
fix: reserve put
nugaon Sep 26, 2024
de65176
fix: important reserve changes same ps index
nugaon Sep 30, 2024
009b827
fix: lock by batch id and stamp
nugaon Sep 30, 2024
73203b5
fix: new multex to lock reserve put
nugaon Oct 1, 2024
1ddacbf
fix: remove ChunkTypeUnspecified check
nugaon Oct 1, 2024
ecc639f
feat: postage stamping for gsoc
nugaon Oct 15, 2024
dde6e53
fix: use resenje multex (#4883)
acha-bill Oct 30, 2024
2ecf155
refactor: remove waitgroup in hook function calls
nugaon Oct 31, 2024
892f285
refactor: add closer function for gsoc sub
nugaon Oct 31, 2024
a725b2f
refactor: wrong parameter name ordering
nugaon Oct 31, 2024
8b420ab
docs: added comments
nugaon Oct 31, 2024
0a06031
test: add reserve case (#4886)
acha-bill Oct 31, 2024
f378d29
test: correcting fata prompt message
nugaon Oct 31, 2024
3121d3d
refactor: remove shouldDecrReserveSize
nugaon Oct 31, 2024
de26b73
docs: verbosing
nugaon Oct 31, 2024
f00055a
fix: eviction locking
nugaon Nov 4, 2024
a25e874
test: identity address unit
nugaon Nov 5, 2024
2a19241
refactor: gsoc handler param soc as reference
nugaon Nov 5, 2024
e336d8d
docs: update put desc
nugaon Nov 5, 2024
19c3272
refactor: export handler
nugaon Nov 5, 2024
e0dcc95
feat: remove pinning and uploadstore usage
nugaon Nov 7, 2024
d9b6935
feat: always save newer payload of soc
nugaon Nov 7, 2024
02dcc38
test: always save newer payload of soc
nugaon Nov 7, 2024
ccc7516
test: move testcase away
nugaon Nov 7, 2024
e0e914e
fix: replace gsoc payload
nugaon Nov 7, 2024
5988539
revert: remove pinning and uploadstore usage
nugaon Nov 7, 2024
2c00152
fix: replace gsoc payload
nugaon Nov 7, 2024
4cf5ab6
fix(gsoc): improvements (#4899)
istae Nov 11, 2024
a21c988
fix: node pushsync
istae Nov 11, 2024
24658d5
fix: chunkstamp loadWithHash, replace to increase refCnt, new soc evi…
istae Nov 11, 2024
5f9aefb
fix: reserve repair to use LoadWithStampHash
istae Nov 11, 2024
e1034fc
fix: log
istae Nov 11, 2024
927f6f2
fix: comment
istae Nov 13, 2024
c0e2922
feat: chunkstore replace unit test & soc pinning (#4902)
acha-bill Nov 13, 2024
0b52eff
test: gsoc integration test (#4904)
acha-bill Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,28 @@ paths:
default:
description: Default response

"/gsoc/subscribe/{address}":
get:
summary: Subscribe to GSOC payloads
tags:
- GSOC
- Subscribe
- Websocket
parameters:
- in: path
name: reference
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: "Single Owner Chunk address (which may have multiple payloads)"
responses:
"200":
description: Returns a WebSocket with a subscription for incoming message data on the requested SOC address.
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response

"/soc/{owner}/{id}":
post:
summary: Upload single owner chunk
Expand Down Expand Up @@ -847,8 +869,6 @@ paths:
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: true
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress"
Expand Down
15 changes: 14 additions & 1 deletion pkg/api/accesscontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
resp struct {
Reference swarm.Address `json:"reference"`
}
direct bool
}{
{
name: "bzz",
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
data: bytes.NewReader(sch.WrappedChunk.Data()),
expdata: sch.Chunk().Data(),
contenttype: "binary/octet-stream",
direct: true,
},
}

Expand All @@ -183,13 +185,24 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
upTestOpts = append(upTestOpts, jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True"))
}
t.Run(v.name, func(t *testing.T) {
client, _, _, _ := newTestServer(t, testServerOptions{
client, _, _, chanStore := newTestServer(t, testServerOptions{
Storer: storerMock,
Logger: logger,
Post: mockpost.New(mockpost.WithAcceptAll()),
PublicKey: pk.PublicKey,
AccessControl: mockac.New(),
DirectUpload: v.direct,
})

if chanStore != nil {
chanStore.Subscribe(func(chunk swarm.Chunk) {
err := storerMock.Put(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
})
}

header := jsonhttptest.Request(t, client, http.MethodPost, v.upurl, http.StatusCreated,
upTestOpts...,
)
Expand Down
11 changes: 10 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p"
Expand Down Expand Up @@ -151,6 +152,7 @@ type Service struct {
storer Storer
resolver resolver.Interface
pss pss.Interface
gsoc gsoc.Listener
steward steward.Interface
logger log.Logger
loggerV1 log.Logger
Expand Down Expand Up @@ -254,6 +256,7 @@ type ExtraOptions struct {
Storer Storer
Resolver resolver.Interface
Pss pss.Interface
Gsoc gsoc.Listener
FeedFactory feeds.Factory
Post postage.Service
AccessControl accesscontrol.Controller
Expand Down Expand Up @@ -337,6 +340,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.storer = e.Storer
s.resolver = e.Resolver
s.pss = e.Pss
s.gsoc = e.Gsoc
s.feedFactory = e.FeedFactory
s.post = e.Post
s.accesscontrol = e.AccessControl
Expand Down Expand Up @@ -682,7 +686,12 @@ type putterSessionWrapper struct {
}

func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error {
stamp, err := p.stamper.Stamp(chunk.Address())
idAddress, err := storage.IdentityAddress(chunk)
if err != nil {
return err
}

stamp, err := p.stamper.Stamp(chunk.Address(), idAddress)
if err != nil {
return err
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock"
Expand Down Expand Up @@ -93,6 +94,7 @@ type testServerOptions struct {
StateStorer storage.StateStorer
Resolver resolver.Interface
Pss pss.Interface
Gsoc gsoc.Listener
WsPath string
WsPingPeriod time.Duration
Logger log.Logger
Expand Down Expand Up @@ -194,6 +196,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
Storer: o.Storer,
Resolver: o.Resolver,
Pss: o.Pss,
Gsoc: o.Gsoc,
FeedFactory: o.Feeds,
Post: o.Post,
AccessControl: o.AccessControl,
Expand Down Expand Up @@ -630,6 +633,7 @@ type chanStorer struct {
lock sync.Mutex
chunks map[string]struct{}
quit chan struct{}
subs []func(chunk swarm.Chunk)
}

func newChanStore(cc <-chan *pusher.Op) *chanStorer {
Expand All @@ -647,6 +651,9 @@ func (c *chanStorer) drain(cc <-chan *pusher.Op) {
case op := <-cc:
c.lock.Lock()
c.chunks[op.Chunk.Address().ByteString()] = struct{}{}
for _, h := range c.subs {
h(op.Chunk)
}
c.lock.Unlock()
op.Err <- nil
case <-c.quit:
Expand All @@ -667,6 +674,12 @@ func (c *chanStorer) Has(addr swarm.Address) bool {
return ok
}

func (c *chanStorer) Subscribe(f func(chunk swarm.Chunk)) {
c.lock.Lock()
defer c.lock.Unlock()
c.subs = append(c.subs, f)
}

func createRedistributionAgentService(
t *testing.T,
addr swarm.Address,
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *Service) envelopePostHandler(w http.ResponseWriter, r *http.Request) {
return
}

stamp, err := stamper.Stamp(paths.Address)
stamp, err := stamper.Stamp(paths.Address, paths.Address)
if err != nil {
logger.Debug("split write all failed", "error", err)
logger.Error(nil, "split write all failed")
Expand Down
120 changes: 120 additions & 0 deletions pkg/api/gsoc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("gsoc_subscribe").Build()

paths := struct {
Address swarm.Address `map:"address,resolve" validate:"required"`
}{}

if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkSize,
WriteBufferSize: swarm.ChunkSize,
CheckOrigin: s.checkOrigin,
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Debug("upgrade failed", "error", err)
logger.Error(nil, "upgrade failed")
jsonhttp.InternalServerError(w, "upgrade failed")
return
}

s.wsWg.Add(1)
go s.gsocListeningWs(conn, paths.Address)
}

func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) {
defer s.wsWg.Done()

var (
dataC = make(chan []byte)
gone = make(chan struct{})
ticker = time.NewTicker(s.WsPingPeriod)
err error
)
defer func() {
ticker.Stop()
_ = conn.Close()
}()
cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) {
select {
case dataC <- m:
case <-gone:
return
case <-s.quit:
return
}
})

defer cleanup()

conn.SetCloseHandler(func(code int, text string) error {
s.logger.Debug("gsoc ws: client gone", "code", code, "message", text)
close(gone)
return nil
})

for {
select {
case b := <-dataC:
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}

err = conn.WriteMessage(websocket.BinaryMessage, b)
if err != nil {
s.logger.Debug("gsoc ws: write message failed", "error", err)
return
}

case <-s.quit:
// shutdown
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}
err = conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
s.logger.Debug("gsoc ws: write close message failed", "error", err)
}
return
case <-gone:
// client gone
return
case <-ticker.C:
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}
if err = conn.WriteMessage(websocket.PingMessage, nil); err != nil {
// error encountered while pinging client. client probably gone
return
}
}
}
}
Loading
Loading