Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gsoc subscribe #4727

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
08e4572
feat: gsoc
nugaon Jul 16, 2024
de66d61
feat: add gsoc listener to pull and pushsync
nugaon Jul 16, 2024
4e5d93c
feat: gsoc subscribe api
nugaon Jul 16, 2024
340c233
fix: gsoc address path parsing
nugaon Jul 17, 2024
9d5b641
test: gsoc as param for testServer
nugaon Jul 17, 2024
84831ac
test: gsoc api
nugaon Jul 17, 2024
bbe5d00
test: add empty function for so clisten in pushsync t
nugaon Jul 18, 2024
d800e9a
refactor: remove unused pusher
nugaon Jul 18, 2024
0f36ac7
docs: gsoc openapi
nugaon Jul 18, 2024
1b77027
test: unit
nugaon Jul 18, 2024
d53a776
docs: fix yaml indentation
nugaon Jul 18, 2024
66e44b8
feat: add new error handling
nugaon Jul 25, 2024
e5e79c0
feat: logger in gsoc listener
nugaon Jul 26, 2024
5c5bb7f
refactor: handle instead of handler
nugaon Sep 6, 2024
4f60796
docs: copypastes
nugaon Sep 6, 2024
f868608
refactor: rename register to subscribe
nugaon Sep 6, 2024
29af22d
refactor: unnecessary go call on gsoc handler
nugaon Sep 6, 2024
f429c01
feat: identity address in pull sync
nugaon Sep 11, 2024
1d5af44
test: multiple payload push
nugaon Sep 11, 2024
385a529
test: gsoc listener
nugaon Sep 11, 2024
6e28bb2
fix: param mismatch after rebasing
nugaon Sep 11, 2024
83c9309
fix: idAddress in pushsync where it is needed
nugaon Sep 12, 2024
615955b
test: working signature for pushsync
nugaon Sep 12, 2024
eefbfd7
refactor: log id_address on push failiure
nugaon Sep 25, 2024
18702c6
feat: id address usage on pusher and its inflight handling
nugaon Sep 25, 2024
7d42efd
fix: remove unnecessary stamp higher condition
nugaon Sep 25, 2024
b69dcc7
fix: reserve put
nugaon Sep 26, 2024
4c40e9a
fix: important reserve changes same ps index
nugaon Sep 30, 2024
e6e98f7
fix: lock by batch id and stamp
nugaon Sep 30, 2024
a3faffd
fix: new multex to lock reserve put
nugaon Oct 1, 2024
450d973
fix: remove ChunkTypeUnspecified check
nugaon Oct 1, 2024
7134e02
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,28 @@ paths:
default:
description: Default response

"/gsoc/subscribe/{address}":
get:
summary: Subscribe to GSOC payloads
tags:
- GSOC
- Subscribe
- Websocket
parameters:
- in: path
name: reference
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: "Single Owner Chunk address (which may have multiple payloads)"
responses:
"200":
description: Returns a WebSocket with a subscription for incoming message data on the requested SOC address.
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response

"/soc/{owner}/{id}":
post:
summary: Upload single owner chunk
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p"
Expand Down Expand Up @@ -151,6 +152,7 @@ type Service struct {
storer Storer
resolver resolver.Interface
pss pss.Interface
gsoc gsoc.Listener
steward steward.Interface
logger log.Logger
loggerV1 log.Logger
Expand Down Expand Up @@ -253,6 +255,7 @@ type ExtraOptions struct {
Storer Storer
Resolver resolver.Interface
Pss pss.Interface
Gsoc gsoc.Listener
FeedFactory feeds.Factory
Post postage.Service
AccessControl accesscontrol.Controller
Expand Down Expand Up @@ -336,6 +339,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.storer = e.Storer
s.resolver = e.Resolver
s.pss = e.Pss
s.gsoc = e.Gsoc
s.feedFactory = e.FeedFactory
s.post = e.Post
s.accesscontrol = e.AccessControl
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock"
Expand Down Expand Up @@ -93,6 +94,7 @@ type testServerOptions struct {
StateStorer storage.StateStorer
Resolver resolver.Interface
Pss pss.Interface
Gsoc gsoc.Listener
WsPath string
WsPingPeriod time.Duration
Logger log.Logger
Expand Down Expand Up @@ -191,6 +193,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
Storer: o.Storer,
Resolver: o.Resolver,
Pss: o.Pss,
Gsoc: o.Gsoc,
FeedFactory: o.Feeds,
Post: o.Post,
AccessControl: o.AccessControl,
Expand Down
119 changes: 119 additions & 0 deletions pkg/api/gsoc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("gsoc_subscribe").Build()

paths := struct {
Address []byte `map:"address" validate:"required"`
}{}
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkSize,
WriteBufferSize: swarm.ChunkSize,
CheckOrigin: s.checkOrigin,
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Debug("upgrade failed", "error", err)
logger.Error(nil, "upgrade failed")
jsonhttp.InternalServerError(w, "upgrade failed")
return
}

s.wsWg.Add(1)
go s.gsocListeningWs(conn, paths.Address)
}

func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
defer s.wsWg.Done()

var (
dataC = make(chan []byte)
gone = make(chan struct{})
ticker = time.NewTicker(s.WsPingPeriod)
err error
)
defer func() {
ticker.Stop()
_ = conn.Close()
}()
cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) {
select {
case dataC <- m:
case <-gone:
return
case <-s.quit:
return
}
})

defer cleanup()

conn.SetCloseHandler(func(code int, text string) error {
s.logger.Debug("gsoc ws: client gone", "code", code, "message", text)
close(gone)
return nil
})

for {
select {
case b := <-dataC:
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}

err = conn.WriteMessage(websocket.BinaryMessage, b)
if err != nil {
s.logger.Debug("gsoc ws: write message failed", "error", err)
return
}

case <-s.quit:
// shutdown
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}
err = conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
s.logger.Debug("gsoc ws: write close message failed", "error", err)
}
return
case <-gone:
// client gone
return
case <-ticker.C:
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}
if err = conn.WriteMessage(websocket.PingMessage, nil); err != nil {
// error encountered while pinging client. client probably gone
return
}
}
}
}
171 changes: 171 additions & 0 deletions pkg/api/gsoc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api_test

import (
"encoding/hex"
"fmt"
"net/url"
"strings"
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/log"
mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
"github.com/ethersphere/bee/v2/pkg/soc"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/util/testutil"
"github.com/gorilla/websocket"
)

// TestGsocWebsocketSingleHandler creates a single websocket handler on a chunk address, and receives a message
func TestGsocWebsocketSingleHandler(t *testing.T) {
t.Parallel()

var (
id = make([]byte, 32)
g, cl, signer, _ = newGsocTest(t, id, 0)
respC = make(chan error, 1)
payload = []byte("hello there!")
)

err := cl.SetReadDeadline(time.Now().Add(2 * time.Second))
if err != nil {
t.Fatal(err)
}
cl.SetReadLimit(swarm.ChunkSize)

ch, _ := cac.New(payload)
socCh := soc.New(id, ch)
ch, _ = socCh.Sign(signer)
socCh, _ = soc.FromChunk(ch)
g.Handle(*socCh)

go expectMessage(t, cl, respC, payload)
if err := <-respC; err != nil {
t.Fatal(err)
}
}

func TestGsocWebsocketMultiHandler(t *testing.T) {
t.Parallel()

var (
id = make([]byte, 32)
g, cl, signer, listener = newGsocTest(t, make([]byte, 32), 0)
owner, _ = signer.EthereumAddress()
chunkAddr, _ = soc.CreateAddress(id, owner.Bytes())
u = url.URL{Scheme: "ws", Host: listener, Path: fmt.Sprintf("/gsoc/subscribe/%s", hex.EncodeToString(chunkAddr.Bytes()))}
cl2, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
respC = make(chan error, 2)
)
if err != nil {
t.Fatalf("dial: %v. url %v", err, u.String())
}
testutil.CleanupCloser(t, cl2)

err = cl.SetReadDeadline(time.Now().Add(2 * time.Second))
if err != nil {
t.Fatal(err)
}
cl.SetReadLimit(swarm.ChunkSize)

ch, _ := cac.New(payload)
socCh := soc.New(id, ch)
ch, _ = socCh.Sign(signer)
socCh, _ = soc.FromChunk(ch)

// close the websocket before calling GSOC with the message
err = cl.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
t.Fatal(err)
}

g.Handle(*socCh)

go expectMessage(t, cl, respC, payload)
go expectMessage(t, cl2, respC, payload)
if err := <-respC; err != nil {
t.Fatal(err)
}
if err := <-respC; err != nil {
t.Fatal(err)
}
}

// TestGsocPong tests that the websocket api adheres to the websocket standard
// and sends ping-pong messages to keep the connection alive.
// The test opens a websocket, keeps it alive for 500ms, then receives a GSOC message.
func TestGsocPong(t *testing.T) {
t.Parallel()
id := make([]byte, 32)

var (
g, cl, signer, _ = newGsocTest(t, id, 90*time.Millisecond)

respC = make(chan error, 1)
pongWait = 1 * time.Millisecond
)

cl.SetReadLimit(swarm.ChunkSize)
err := cl.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
t.Fatal(err)
}

time.Sleep(500 * time.Millisecond) // wait to see that the websocket is kept alive
ch, _ := cac.New([]byte("hello there!"))
socCh := soc.New(id, ch)
ch, _ = socCh.Sign(signer)
socCh, _ = soc.FromChunk(ch)

g.Handle(*socCh)

go expectMessage(t, cl, respC, nil)
if err := <-respC; err == nil || !strings.Contains(err.Error(), "i/o timeout") {
// note: error has *websocket.netError type so we need to check error by checking message
t.Fatal("want timeout error")
}
}

func newGsocTest(t *testing.T, socId []byte, pingPeriod time.Duration) (gsoc.Listener, *websocket.Conn, crypto.Signer, string) {
t.Helper()
if pingPeriod == 0 {
pingPeriod = 10 * time.Second
}
var (
batchStore = mockbatchstore.New()
storer = mockstorer.New()
)

privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
signer := crypto.NewDefaultSigner(privKey)
owner, err := signer.EthereumAddress()
if err != nil {
t.Fatal(err)
}
chunkAddr, _ := soc.CreateAddress(socId, owner.Bytes())

gsoc := gsoc.New(log.NewLogger("test"))
testutil.CleanupCloser(t, gsoc)

_, cl, listener, _ := newTestServer(t, testServerOptions{
Gsoc: gsoc,
WsPath: fmt.Sprintf("/gsoc/subscribe/%s", hex.EncodeToString(chunkAddr.Bytes())),
Storer: storer,
BatchStore: batchStore,
Logger: log.Noop,
WsPingPeriod: pingPeriod,
})

return gsoc, cl, signer, listener
}
4 changes: 4 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ func (s *Service) mountAPI() {
web.FinalHandlerFunc(s.pssWsHandler),
))

handle("/gsoc/subscribe/{address}", web.ChainHandlers(
web.FinalHandlerFunc(s.gsocWsHandler),
))

handle("/tags", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listTagsHandler),
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.NotFound(w, "batch with id not found")
case errors.Is(err, errInvalidPostageBatch):
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.NotImplemented(w, "operation is not supported in dev mode")
default:
jsonhttp.BadRequest(w, nil)
}
Expand Down
Loading
Loading