Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gsoc): subscribe #4727

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
08e4572
feat: gsoc
nugaon Jul 16, 2024
de66d61
feat: add gsoc listener to pull and pushsync
nugaon Jul 16, 2024
4e5d93c
feat: gsoc subscribe api
nugaon Jul 16, 2024
340c233
fix: gsoc address path parsing
nugaon Jul 17, 2024
9d5b641
test: gsoc as param for testServer
nugaon Jul 17, 2024
84831ac
test: gsoc api
nugaon Jul 17, 2024
bbe5d00
test: add empty function for so clisten in pushsync t
nugaon Jul 18, 2024
d800e9a
refactor: remove unused pusher
nugaon Jul 18, 2024
0f36ac7
docs: gsoc openapi
nugaon Jul 18, 2024
1b77027
test: unit
nugaon Jul 18, 2024
d53a776
docs: fix yaml indentation
nugaon Jul 18, 2024
66e44b8
feat: add new error handling
nugaon Jul 25, 2024
e5e79c0
feat: logger in gsoc listener
nugaon Jul 26, 2024
5c5bb7f
refactor: handle instead of handler
nugaon Sep 6, 2024
4f60796
docs: copypastes
nugaon Sep 6, 2024
f868608
refactor: rename register to subscribe
nugaon Sep 6, 2024
29af22d
refactor: unnecessary go call on gsoc handler
nugaon Sep 6, 2024
f429c01
feat: identity address in pull sync
nugaon Sep 11, 2024
1d5af44
test: multiple payload push
nugaon Sep 11, 2024
385a529
test: gsoc listener
nugaon Sep 11, 2024
6e28bb2
fix: param mismatch after rebasing
nugaon Sep 11, 2024
83c9309
fix: idAddress in pushsync where it is needed
nugaon Sep 12, 2024
615955b
test: working signature for pushsync
nugaon Sep 12, 2024
eefbfd7
refactor: log id_address on push failiure
nugaon Sep 25, 2024
18702c6
feat: id address usage on pusher and its inflight handling
nugaon Sep 25, 2024
7d42efd
fix: remove unnecessary stamp higher condition
nugaon Sep 25, 2024
b69dcc7
fix: reserve put
nugaon Sep 26, 2024
4c40e9a
fix: important reserve changes same ps index
nugaon Sep 30, 2024
e6e98f7
fix: lock by batch id and stamp
nugaon Sep 30, 2024
a3faffd
fix: new multex to lock reserve put
nugaon Oct 1, 2024
450d973
fix: remove ChunkTypeUnspecified check
nugaon Oct 1, 2024
7134e02
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon Oct 22, 2024
68f97ce
feat: postage stamping for gsoc
nugaon Oct 15, 2024
a50e693
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon Oct 29, 2024
074602b
fix: use resenje multex (#4883)
acha-bill Oct 30, 2024
cbec217
refactor: remove waitgroup in hook function calls
nugaon Oct 31, 2024
8abb1ad
refactor: add closer function for gsoc sub
nugaon Oct 31, 2024
ec01cf9
refactor: wrong parameter name ordering
nugaon Oct 31, 2024
d7e49e7
refactor: remove duplicated api route def
nugaon Oct 31, 2024
8e4cb2e
docs: added comments
nugaon Oct 31, 2024
160f258
test: add reserve case (#4886)
acha-bill Oct 31, 2024
c9098fd
test: correcting fata prompt message
nugaon Oct 31, 2024
4b27d65
refactor: remove shouldDecrReserveSize
nugaon Oct 31, 2024
691a43d
docs: verbosing
nugaon Oct 31, 2024
7432ed3
fix: eviction locking
nugaon Nov 4, 2024
0ce6407
test: identity address unit
nugaon Nov 5, 2024
d54dff5
refactor: gsoc handler param soc as reference
nugaon Nov 5, 2024
c03ddeb
docs: update put desc
nugaon Nov 5, 2024
55e7af1
refactor: export handler
nugaon Nov 5, 2024
a39e9d6
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon Nov 6, 2024
9e78194
feat: remove pinning and uploadstore usage
nugaon Nov 7, 2024
a9ba53b
feat: always save newer payload of soc
nugaon Nov 7, 2024
4893749
test: always save newer payload of soc
nugaon Nov 7, 2024
b0d6912
test: move testcase away
nugaon Nov 7, 2024
230c071
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon Nov 7, 2024
4d8661f
fix: replace gsoc payload
nugaon Nov 7, 2024
515b519
revert: remove pinning and uploadstore usage
nugaon Nov 7, 2024
8c416af
fix: replace gsoc payload
nugaon Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,28 @@ paths:
default:
description: Default response

"/gsoc/subscribe/{address}":
get:
summary: Subscribe to GSOC payloads
tags:
- GSOC
- Subscribe
- Websocket
parameters:
- in: path
name: reference
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: "Single Owner Chunk address (which may have multiple payloads)"
responses:
"200":
description: Returns a WebSocket with a subscription for incoming message data on the requested SOC address.
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response

"/soc/{owner}/{id}":
post:
summary: Upload single owner chunk
Expand Down
12 changes: 11 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p"
Expand All @@ -47,6 +48,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/settlement/swap"
"github.com/ethersphere/bee/v2/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/v2/pkg/settlement/swap/erc20"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/status"
"github.com/ethersphere/bee/v2/pkg/steward"
storage "github.com/ethersphere/bee/v2/pkg/storage"
Expand Down Expand Up @@ -151,6 +153,7 @@ type Service struct {
storer Storer
resolver resolver.Interface
pss pss.Interface
gsoc gsoc.Listener
steward steward.Interface
logger log.Logger
loggerV1 log.Logger
Expand Down Expand Up @@ -254,6 +257,7 @@ type ExtraOptions struct {
Storer Storer
Resolver resolver.Interface
Pss pss.Interface
Gsoc gsoc.Listener
FeedFactory feeds.Factory
Post postage.Service
AccessControl accesscontrol.Controller
Expand Down Expand Up @@ -337,6 +341,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.storer = e.Storer
s.resolver = e.Resolver
s.pss = e.Pss
s.gsoc = e.Gsoc
s.feedFactory = e.FeedFactory
s.post = e.Post
s.accesscontrol = e.AccessControl
Expand Down Expand Up @@ -682,7 +687,12 @@ type putterSessionWrapper struct {
}

func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error {
stamp, err := p.stamper.Stamp(chunk.Address())
idAddress, err := soc.IdentityAddress(chunk)
if err != nil {
return err
}

stamp, err := p.stamper.Stamp(chunk.Address(), idAddress)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock"
Expand Down Expand Up @@ -93,6 +94,7 @@ type testServerOptions struct {
StateStorer storage.StateStorer
Resolver resolver.Interface
Pss pss.Interface
Gsoc gsoc.Listener
WsPath string
WsPingPeriod time.Duration
Logger log.Logger
Expand Down Expand Up @@ -194,6 +196,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
Storer: o.Storer,
Resolver: o.Resolver,
Pss: o.Pss,
Gsoc: o.Gsoc,
FeedFactory: o.Feeds,
Post: o.Post,
AccessControl: o.AccessControl,
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *Service) envelopePostHandler(w http.ResponseWriter, r *http.Request) {
return
}

stamp, err := stamper.Stamp(paths.Address)
stamp, err := stamper.Stamp(paths.Address, paths.Address)
if err != nil {
logger.Debug("split write all failed", "error", err)
logger.Error(nil, "split write all failed")
Expand Down
119 changes: 119 additions & 0 deletions pkg/api/gsoc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("gsoc_subscribe").Build()

paths := struct {
Address []byte `map:"address" validate:"required"`
}{}
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkSize,
WriteBufferSize: swarm.ChunkSize,
CheckOrigin: s.checkOrigin,
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Debug("upgrade failed", "error", err)
logger.Error(nil, "upgrade failed")
jsonhttp.InternalServerError(w, "upgrade failed")
return
}

s.wsWg.Add(1)
go s.gsocListeningWs(conn, paths.Address)
}

func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
defer s.wsWg.Done()

var (
dataC = make(chan []byte)
gone = make(chan struct{})
ticker = time.NewTicker(s.WsPingPeriod)
err error
)
defer func() {
ticker.Stop()
_ = conn.Close()
}()
cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) {
select {
case dataC <- m:
case <-gone:
return
case <-s.quit:
return
}
})

defer cleanup()

conn.SetCloseHandler(func(code int, text string) error {
s.logger.Debug("gsoc ws: client gone", "code", code, "message", text)
close(gone)
return nil
})

for {
select {
case b := <-dataC:
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}

err = conn.WriteMessage(websocket.BinaryMessage, b)
if err != nil {
s.logger.Debug("gsoc ws: write message failed", "error", err)
return
}

case <-s.quit:
// shutdown
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}
err = conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
s.logger.Debug("gsoc ws: write close message failed", "error", err)
}
return
case <-gone:
// client gone
return
case <-ticker.C:
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}
if err = conn.WriteMessage(websocket.PingMessage, nil); err != nil {
// error encountered while pinging client. client probably gone
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}
}
Loading
Loading