Skip to content

Commit

Permalink
fix(redistribution): reveal with correct depth, swip21 (#4865)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 15, 2024
1 parent 74db3d4 commit e7bc895
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 71 deletions.
1 change: 1 addition & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ func createRedistributionAgentService(
tranService,
&mockHealth{},
log.Noop,
0,
)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func NewBee(
return nil, fmt.Errorf("status service: %w", err)
}

saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile)
saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile, uint8(o.ReserveCapacityDoubling))
b.saludCloser = saludService

rC, unsub := saludService.SubscribeNetworkStorageRadius()
Expand Down Expand Up @@ -1091,6 +1091,7 @@ func NewBee(
transactionService,
saludService,
logger,
uint8(o.ReserveCapacityDoubling),
)
if err != nil {
return nil, fmt.Errorf("storage incentives agent: %w", err)
Expand Down
29 changes: 14 additions & 15 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ type peerStatus interface {
PeerSnapshot(ctx context.Context, peer swarm.Address) (*status.Snapshot, error)
}

type reserve interface {
storer.RadiusChecker
ReserveCapacityDoubling() int
}

type service struct {
wg sync.WaitGroup
quit chan struct{}
Expand All @@ -53,34 +48,38 @@ type service struct {
status peerStatus
metrics metrics
isSelfHealthy *atomic.Bool
reserve reserve
reserve storer.RadiusChecker

radiusSubsMtx sync.Mutex
radiusC []chan uint8

capacityDoubling uint8
}

func New(
status peerStatus,
topology topologyDriver,
reserve reserve,
reserve storer.RadiusChecker,
logger log.Logger,
warmup time.Duration,
mode string,
minPeersPerbin int,
durPercentile float64,
connsPercentile float64,
capacityDoubling uint8,
) *service {

metrics := newMetrics()

s := &service{
quit: make(chan struct{}),
logger: logger.WithName(loggerName).Register(),
status: status,
topology: topology,
metrics: metrics,
isSelfHealthy: atomic.NewBool(true),
reserve: reserve,
quit: make(chan struct{}),
logger: logger.WithName(loggerName).Register(),
status: status,
topology: topology,
metrics: metrics,
isSelfHealthy: atomic.NewBool(true),
reserve: reserve,
capacityDoubling: capacityDoubling,
}

s.wg.Add(1)
Expand Down Expand Up @@ -221,7 +220,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
}
}

networkRadiusEstimation := s.reserve.StorageRadius() + uint8(s.reserve.ReserveCapacityDoubling())
networkRadiusEstimation := s.reserve.StorageRadius() + s.capacityDoubling

selfHealth := true
if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius {
Expand Down
11 changes: 5 additions & 6 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestSalud(t *testing.T) {
mockstorer.WithReserveSize(100),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestSelfUnhealthyRadius(t *testing.T) {
mockstorer.WithReserveSize(100),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
Expand Down Expand Up @@ -151,10 +151,9 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) {
reserve := mockstorer.NewReserve(
mockstorer.WithRadius(6),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 2)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
Expand Down Expand Up @@ -184,7 +183,7 @@ func TestSubToRadius(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0)

c, unsub := service.SubscribeNetworkStorageRadius()
t.Cleanup(unsub)
Expand Down Expand Up @@ -217,7 +216,7 @@ func TestUnsub(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0)
testutil.CleanupCloser(t, service)

c, unsub := service.SubscribeNetworkStorageRadius()
Expand Down
18 changes: 11 additions & 7 deletions pkg/storageincentives/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Agent struct {
chainStateGetter postage.ChainStateGetter
commitLock sync.Mutex
health Health
capacityDoubling uint8
}

func New(overlay swarm.Address,
Expand All @@ -89,6 +90,7 @@ func New(overlay swarm.Address,
tranService transaction.Service,
health Health,
logger log.Logger,
capacityDoubling uint8,
) (*Agent, error) {
a := &Agent{
overlay: overlay,
Expand All @@ -104,6 +106,7 @@ func New(overlay swarm.Address,
redistributionStatuser: redistributionStatuser,
health: health,
chainStateGetter: chainStateGetter,
capacityDoubling: capacityDoubling,
}

state, err := NewRedistributionState(logger, ethAddress, stateStore, erc20Service, tranService)
Expand Down Expand Up @@ -389,14 +392,15 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error {
}

func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) {
storageRadius := a.store.StorageRadius()
// minimum proximity between the achor and the stored chunks
commitedDepth := a.store.StorageRadius() + a.capacityDoubling

if a.state.IsFrozen() {
a.logger.Info("skipping round because node is frozen")
return false, nil
}

isPlaying, err := a.contract.IsPlaying(ctx, storageRadius)
isPlaying, err := a.contract.IsPlaying(ctx, commitedDepth)
if err != nil {
a.metrics.ErrCheckIsPlaying.Inc()
return false, err
Expand Down Expand Up @@ -429,21 +433,21 @@ func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) {
}

now := time.Now()
sample, err := a.makeSample(ctx, storageRadius)
sample, err := a.makeSample(ctx, commitedDepth)
if err != nil {
return false, err
}
dur := time.Since(now)
a.metrics.SampleDuration.Set(dur.Seconds())

a.logger.Info("produced sample", "hash", sample.ReserveSampleHash, "radius", sample.StorageRadius, "round", round)
a.logger.Info("produced sample", "hash", sample.ReserveSampleHash, "radius", commitedDepth, "round", round)

a.state.SetSampleData(round, sample, dur)

return true, nil
}

func (a *Agent) makeSample(ctx context.Context, storageRadius uint8) (SampleData, error) {
func (a *Agent) makeSample(ctx context.Context, commitedDepth uint8) (SampleData, error) {
salt, err := a.contract.ReserveSalt(ctx)
if err != nil {
return SampleData{}, err
Expand All @@ -454,7 +458,7 @@ func (a *Agent) makeSample(ctx context.Context, storageRadius uint8) (SampleData
return SampleData{}, err
}

rSample, err := a.store.ReserveSample(ctx, salt, storageRadius, uint64(timeLimiter), a.minBatchBalance())
rSample, err := a.store.ReserveSample(ctx, salt, commitedDepth, uint64(timeLimiter), a.minBatchBalance())
if err != nil {
return SampleData{}, err
}
Expand All @@ -468,7 +472,7 @@ func (a *Agent) makeSample(ctx context.Context, storageRadius uint8) (SampleData
Anchor1: salt,
ReserveSampleItems: rSample.Items,
ReserveSampleHash: sampleHash,
StorageRadius: storageRadius,
StorageRadius: commitedDepth,
}

return sample, nil
Expand Down
39 changes: 31 additions & 8 deletions pkg/storageincentives/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestAgent(t *testing.T) {
limit uint64
expectedCalls bool
balance *big.Int
doubling uint8
}{{
name: "3 blocks per phase, same block number returns twice",
blocksPerRound: 9,
Expand All @@ -49,6 +50,7 @@ func TestAgent(t *testing.T) {
expectedCalls: true,
limit: 108, // computed with blocksPerRound * (exptectedCalls + 2)
balance: bigBalance,
doubling: 1,
}, {
name: "3 blocks per phase, block number returns every block",
blocksPerRound: 9,
Expand All @@ -57,6 +59,7 @@ func TestAgent(t *testing.T) {
expectedCalls: true,
limit: 108,
balance: bigBalance,
doubling: 0,
}, {
name: "no expected calls - block number returns late after each phase",
blocksPerRound: 9,
Expand All @@ -65,6 +68,7 @@ func TestAgent(t *testing.T) {
expectedCalls: false,
limit: 108,
balance: bigBalance,
doubling: 0,
}, {
name: "4 blocks per phase, block number returns every other block",
blocksPerRound: 12,
Expand All @@ -73,6 +77,7 @@ func TestAgent(t *testing.T) {
expectedCalls: true,
limit: 144,
balance: bigBalance,
doubling: 1,
}, {
// This test case is based on previous, but this time agent will not have enough
// balance to participate in the game so no calls are going to be made.
Expand All @@ -83,6 +88,7 @@ func TestAgent(t *testing.T) {
expectedCalls: false,
limit: 144,
balance: big.NewInt(0),
doubling: 1,
},
}

Expand All @@ -106,9 +112,12 @@ func TestAgent(t *testing.T) {
block: tc.blocksPerRound,
balance: tc.balance,
}
contract := &mockContract{}

service, _ := createService(t, addr, backend, contract, tc.blocksPerRound, tc.blocksPerPhase)
var radius uint8 = 8

contract := &mockContract{t: t, expectedRadius: radius + tc.doubling}

service, _ := createService(t, addr, backend, contract, tc.blocksPerRound, tc.blocksPerPhase, radius, tc.doubling)
testutil.CleanupCloser(t, service)

<-wait
Expand Down Expand Up @@ -156,7 +165,10 @@ func createService(
backend storageincentives.ChainBackend,
contract redistribution.Contract,
blocksPerRound uint64,
blocksPerPhase uint64) (*storageincentives.Agent, error) {
blocksPerPhase uint64,
radius uint8,
doubling uint8,
) (*storageincentives.Agent, error) {
t.Helper()

postageContract := contractMock.New(contractMock.WithExpiresBatchesFunc(func(context.Context) error {
Expand All @@ -168,7 +180,7 @@ func createService(
}))

reserve := resMock.NewReserve(
resMock.WithRadius(0),
resMock.WithRadius(radius),
resMock.WithSample(storer.RandSample(t, nil)),
)

Expand All @@ -189,6 +201,7 @@ func createService(
transactionmock.New(),
&mockHealth{},
log.Noop,
doubling,
)
}

Expand Down Expand Up @@ -257,15 +270,20 @@ const (
)

type mockContract struct {
callsList []contractCall
mtx sync.Mutex
callsList []contractCall
mtx sync.Mutex
expectedRadius uint8
t *testing.T
}

func (m *mockContract) ReserveSalt(context.Context) ([]byte, error) {
return nil, nil
}

func (m *mockContract) IsPlaying(context.Context, uint8) (bool, error) {
func (m *mockContract) IsPlaying(_ context.Context, r uint8) (bool, error) {
if r != m.expectedRadius {
m.t.Fatalf("isPlaying: expected radius %d, got %d", m.expectedRadius, r)
}
return true, nil
}

Expand All @@ -290,9 +308,14 @@ func (m *mockContract) Commit(context.Context, []byte, uint64) (common.Hash, err
return common.Hash{}, nil
}

func (m *mockContract) Reveal(context.Context, uint8, []byte, []byte) (common.Hash, error) {
func (m *mockContract) Reveal(_ context.Context, r uint8, _ []byte, _ []byte) (common.Hash, error) {
m.mtx.Lock()
defer m.mtx.Unlock()

if r != m.expectedRadius {
m.t.Fatalf("reveal: expected radius %d, got %d", m.expectedRadius, r)
}

m.callsList = append(m.callsList, revealCall)
return common.Hash{}, nil
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/storer/mock/mockreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,6 @@ func (s *ReserveStore) ReserveSize() int {
return s.reservesize
}

func (s *ReserveStore) ReserveCapacityDoubling() int {
return s.capacityDoubling
}

func (s *ReserveStore) ReserveLastBinIDs() (curs []uint64, epoch uint64, err error) {
return s.cursors, s.epoch, s.cursorsErr
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,6 @@ func (db *DB) StorageRadius() uint8 {
return db.reserve.Radius()
}

func (db *DB) ReserveCapacityDoubling() int {
return db.reserveOptions.capacityDoubling
}

func (db *DB) ReserveSize() int {
if db.reserve == nil {
return 0
Expand Down
Loading

0 comments on commit e7bc895

Please sign in to comment.