diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 98b3578502..7bd20c07e8 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -709,6 +709,7 @@ func createRedistributionAgentService( tranService, &mockHealth{}, log.Noop, + 0, ) } diff --git a/pkg/node/node.go b/pkg/node/node.go index d0b6dd84c3..b8498aa868 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -906,7 +906,7 @@ func NewBee( return nil, fmt.Errorf("status service: %w", err) } - saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile) + saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile, uint8(o.ReserveCapacityDoubling)) b.saludCloser = saludService rC, unsub := saludService.SubscribeNetworkStorageRadius() @@ -1091,6 +1091,7 @@ func NewBee( transactionService, saludService, logger, + uint8(o.ReserveCapacityDoubling), ) if err != nil { return nil, fmt.Errorf("storage incentives agent: %w", err) diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index d63e813975..397362499b 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -40,11 +40,6 @@ type peerStatus interface { PeerSnapshot(ctx context.Context, peer swarm.Address) (*status.Snapshot, error) } -type reserve interface { - storer.RadiusChecker - ReserveCapacityDoubling() int -} - type service struct { wg sync.WaitGroup quit chan struct{} @@ -53,34 +48,38 @@ type service struct { status peerStatus metrics metrics isSelfHealthy *atomic.Bool - reserve reserve + reserve storer.RadiusChecker radiusSubsMtx sync.Mutex radiusC []chan uint8 + + capacityDoubling uint8 } func New( status peerStatus, topology topologyDriver, - reserve reserve, + reserve storer.RadiusChecker, logger log.Logger, warmup time.Duration, mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64, + capacityDoubling uint8, ) *service { metrics := newMetrics() s := &service{ - quit: make(chan struct{}), - logger: logger.WithName(loggerName).Register(), - status: status, - topology: topology, - metrics: metrics, - isSelfHealthy: atomic.NewBool(true), - reserve: reserve, + quit: make(chan struct{}), + logger: logger.WithName(loggerName).Register(), + status: status, + topology: topology, + metrics: metrics, + isSelfHealthy: atomic.NewBool(true), + reserve: reserve, + capacityDoubling: capacityDoubling, } s.wg.Add(1) @@ -221,7 +220,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, } } - networkRadiusEstimation := s.reserve.StorageRadius() + uint8(s.reserve.ReserveCapacityDoubling()) + networkRadiusEstimation := s.reserve.StorageRadius() + s.capacityDoubling selfHealth := true if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius { diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index 4aa7926af9..5fc4dda733 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -70,7 +70,7 @@ func TestSalud(t *testing.T) { mockstorer.WithReserveSize(100), ) - service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -116,7 +116,7 @@ func TestSelfUnhealthyRadius(t *testing.T) { mockstorer.WithReserveSize(100), ) - service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0) testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { @@ -151,10 +151,9 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) { reserve := mockstorer.NewReserve( mockstorer.WithRadius(6), mockstorer.WithReserveSize(100), - mockstorer.WithCapacityDoubling(2), ) - service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 2) testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { @@ -184,7 +183,7 @@ func TestSubToRadius(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0) c, unsub := service.SubscribeNetworkStorageRadius() t.Cleanup(unsub) @@ -217,7 +216,7 @@ func TestUnsub(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0) testutil.CleanupCloser(t, service) c, unsub := service.SubscribeNetworkStorageRadius() diff --git a/pkg/storageincentives/agent.go b/pkg/storageincentives/agent.go index b71a77c3df..afd4614b4e 100644 --- a/pkg/storageincentives/agent.go +++ b/pkg/storageincentives/agent.go @@ -70,6 +70,7 @@ type Agent struct { chainStateGetter postage.ChainStateGetter commitLock sync.Mutex health Health + capacityDoubling uint8 } func New(overlay swarm.Address, @@ -89,6 +90,7 @@ func New(overlay swarm.Address, tranService transaction.Service, health Health, logger log.Logger, + capacityDoubling uint8, ) (*Agent, error) { a := &Agent{ overlay: overlay, @@ -104,6 +106,7 @@ func New(overlay swarm.Address, redistributionStatuser: redistributionStatuser, health: health, chainStateGetter: chainStateGetter, + capacityDoubling: capacityDoubling, } state, err := NewRedistributionState(logger, ethAddress, stateStore, erc20Service, tranService) @@ -389,14 +392,15 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { } func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) { - storageRadius := a.store.StorageRadius() + // minimum proximity between the achor and the stored chunks + commitedDepth := a.store.StorageRadius() + a.capacityDoubling if a.state.IsFrozen() { a.logger.Info("skipping round because node is frozen") return false, nil } - isPlaying, err := a.contract.IsPlaying(ctx, storageRadius) + isPlaying, err := a.contract.IsPlaying(ctx, commitedDepth) if err != nil { a.metrics.ErrCheckIsPlaying.Inc() return false, err @@ -429,21 +433,21 @@ func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) { } now := time.Now() - sample, err := a.makeSample(ctx, storageRadius) + sample, err := a.makeSample(ctx, commitedDepth) if err != nil { return false, err } dur := time.Since(now) a.metrics.SampleDuration.Set(dur.Seconds()) - a.logger.Info("produced sample", "hash", sample.ReserveSampleHash, "radius", sample.StorageRadius, "round", round) + a.logger.Info("produced sample", "hash", sample.ReserveSampleHash, "radius", commitedDepth, "round", round) a.state.SetSampleData(round, sample, dur) return true, nil } -func (a *Agent) makeSample(ctx context.Context, storageRadius uint8) (SampleData, error) { +func (a *Agent) makeSample(ctx context.Context, commitedDepth uint8) (SampleData, error) { salt, err := a.contract.ReserveSalt(ctx) if err != nil { return SampleData{}, err @@ -454,7 +458,7 @@ func (a *Agent) makeSample(ctx context.Context, storageRadius uint8) (SampleData return SampleData{}, err } - rSample, err := a.store.ReserveSample(ctx, salt, storageRadius, uint64(timeLimiter), a.minBatchBalance()) + rSample, err := a.store.ReserveSample(ctx, salt, commitedDepth, uint64(timeLimiter), a.minBatchBalance()) if err != nil { return SampleData{}, err } @@ -468,7 +472,7 @@ func (a *Agent) makeSample(ctx context.Context, storageRadius uint8) (SampleData Anchor1: salt, ReserveSampleItems: rSample.Items, ReserveSampleHash: sampleHash, - StorageRadius: storageRadius, + StorageRadius: commitedDepth, } return sample, nil diff --git a/pkg/storageincentives/agent_test.go b/pkg/storageincentives/agent_test.go index ae078f1a87..0ae0eda22f 100644 --- a/pkg/storageincentives/agent_test.go +++ b/pkg/storageincentives/agent_test.go @@ -41,6 +41,7 @@ func TestAgent(t *testing.T) { limit uint64 expectedCalls bool balance *big.Int + doubling uint8 }{{ name: "3 blocks per phase, same block number returns twice", blocksPerRound: 9, @@ -49,6 +50,7 @@ func TestAgent(t *testing.T) { expectedCalls: true, limit: 108, // computed with blocksPerRound * (exptectedCalls + 2) balance: bigBalance, + doubling: 1, }, { name: "3 blocks per phase, block number returns every block", blocksPerRound: 9, @@ -57,6 +59,7 @@ func TestAgent(t *testing.T) { expectedCalls: true, limit: 108, balance: bigBalance, + doubling: 0, }, { name: "no expected calls - block number returns late after each phase", blocksPerRound: 9, @@ -65,6 +68,7 @@ func TestAgent(t *testing.T) { expectedCalls: false, limit: 108, balance: bigBalance, + doubling: 0, }, { name: "4 blocks per phase, block number returns every other block", blocksPerRound: 12, @@ -73,6 +77,7 @@ func TestAgent(t *testing.T) { expectedCalls: true, limit: 144, balance: bigBalance, + doubling: 1, }, { // This test case is based on previous, but this time agent will not have enough // balance to participate in the game so no calls are going to be made. @@ -83,6 +88,7 @@ func TestAgent(t *testing.T) { expectedCalls: false, limit: 144, balance: big.NewInt(0), + doubling: 1, }, } @@ -106,9 +112,12 @@ func TestAgent(t *testing.T) { block: tc.blocksPerRound, balance: tc.balance, } - contract := &mockContract{} - service, _ := createService(t, addr, backend, contract, tc.blocksPerRound, tc.blocksPerPhase) + var radius uint8 = 8 + + contract := &mockContract{t: t, expectedRadius: radius + tc.doubling} + + service, _ := createService(t, addr, backend, contract, tc.blocksPerRound, tc.blocksPerPhase, radius, tc.doubling) testutil.CleanupCloser(t, service) <-wait @@ -156,7 +165,10 @@ func createService( backend storageincentives.ChainBackend, contract redistribution.Contract, blocksPerRound uint64, - blocksPerPhase uint64) (*storageincentives.Agent, error) { + blocksPerPhase uint64, + radius uint8, + doubling uint8, +) (*storageincentives.Agent, error) { t.Helper() postageContract := contractMock.New(contractMock.WithExpiresBatchesFunc(func(context.Context) error { @@ -168,7 +180,7 @@ func createService( })) reserve := resMock.NewReserve( - resMock.WithRadius(0), + resMock.WithRadius(radius), resMock.WithSample(storer.RandSample(t, nil)), ) @@ -189,6 +201,7 @@ func createService( transactionmock.New(), &mockHealth{}, log.Noop, + doubling, ) } @@ -257,15 +270,20 @@ const ( ) type mockContract struct { - callsList []contractCall - mtx sync.Mutex + callsList []contractCall + mtx sync.Mutex + expectedRadius uint8 + t *testing.T } func (m *mockContract) ReserveSalt(context.Context) ([]byte, error) { return nil, nil } -func (m *mockContract) IsPlaying(context.Context, uint8) (bool, error) { +func (m *mockContract) IsPlaying(_ context.Context, r uint8) (bool, error) { + if r != m.expectedRadius { + m.t.Fatalf("isPlaying: expected radius %d, got %d", m.expectedRadius, r) + } return true, nil } @@ -290,9 +308,14 @@ func (m *mockContract) Commit(context.Context, []byte, uint64) (common.Hash, err return common.Hash{}, nil } -func (m *mockContract) Reveal(context.Context, uint8, []byte, []byte) (common.Hash, error) { +func (m *mockContract) Reveal(_ context.Context, r uint8, _ []byte, _ []byte) (common.Hash, error) { m.mtx.Lock() defer m.mtx.Unlock() + + if r != m.expectedRadius { + m.t.Fatalf("reveal: expected radius %d, got %d", m.expectedRadius, r) + } + m.callsList = append(m.callsList, revealCall) return common.Hash{}, nil } diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index 05e164c678..897403fe4c 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -178,10 +178,6 @@ func (s *ReserveStore) ReserveSize() int { return s.reservesize } -func (s *ReserveStore) ReserveCapacityDoubling() int { - return s.capacityDoubling -} - func (s *ReserveStore) ReserveLastBinIDs() (curs []uint64, epoch uint64, err error) { return s.cursors, s.epoch, s.cursorsErr } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 1a4ec6f21e..7f765ef5c0 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -412,10 +412,6 @@ func (db *DB) StorageRadius() uint8 { return db.reserve.Radius() } -func (db *DB) ReserveCapacityDoubling() int { - return db.reserveOptions.capacityDoubling -} - func (db *DB) ReserveSize() int { if db.reserve == nil { return 0 diff --git a/pkg/storer/sample.go b/pkg/storer/sample.go index b58328ea5f..409d4b3109 100644 --- a/pkg/storer/sample.go +++ b/pkg/storer/sample.go @@ -113,10 +113,15 @@ func getChunkType(chunk swarm.Chunk) swarm.ChunkType { // calculation within the round limits. // In order to optimize this we use a simple pipeline pattern: // Iterate chunk addresses -> Get the chunk data and calculate transformed hash -> Assemble the sample +// If the node has doubled their capacity by some factor, sampling process need to only pertain to the +// chunks of the selected neighborhood as determined by the anchor and the "committed depth" and NOT the whole reseve. +// The committed depth is the sum of the radius and the doubling factor. +// For example, the committed depth is 11, but the local node has a doubling factor of 3, so the +// local radius will eventually drop to 8. The sampling must only consider chunks with proximity 11 to the anchor. func (db *DB) ReserveSample( ctx context.Context, anchor []byte, - storageRadius uint8, + commitedDepth uint8, consensusTime uint64, minBatchBalance *big.Int, ) (Sample, error) { @@ -139,13 +144,6 @@ func (db *DB) ReserveSample( allStats.BatchesBelowValueDuration = time.Since(t) - // If the node has doubled their capacity by some factor, sampling process need to only pertain to the - // chunks of the selected neighborhood as determined by the anchor and the "network" radius and NOT the whole reseve. - // The regular network storage radius of the network is the sum of the local radius and the doubling factor. - // For example, the regular radius is 11, but the local node has a doubling factor of 3, so the local radius will eventually drop to 8. - // So the sampling must only consider chunks with proximity 11 to the anchor. - neighborhoodProximity := storageRadius + uint8(db.reserveOptions.capacityDoubling) - // Phase 1: Iterate chunk addresses g.Go(func() error { start := time.Now() @@ -156,8 +154,8 @@ func (db *DB) ReserveSample( addStats(stats) }() - err := db.reserve.IterateChunksItems(storageRadius, func(ch *reserve.ChunkBinItem) (bool, error) { - if swarm.Proximity(ch.Address.Bytes(), anchor) < neighborhoodProximity { + err := db.reserve.IterateChunksItems(db.StorageRadius(), func(ch *reserve.ChunkBinItem) (bool, error) { + if swarm.Proximity(ch.Address.Bytes(), anchor) < commitedDepth { return false, nil } select { @@ -318,12 +316,12 @@ func (db *DB) ReserveSample( allStats.TotalDuration = time.Since(t) if err := g.Wait(); err != nil { - db.logger.Info("reserve sampler finished with error", "err", err, "duration", time.Since(t), "storage_radius", storageRadius, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) + db.logger.Info("reserve sampler finished with error", "err", err, "duration", time.Since(t), "storage_radius", commitedDepth, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) return Sample{}, fmt.Errorf("sampler: failed creating sample: %w", err) } - db.logger.Info("reserve sampler finished", "duration", time.Since(t), "storage_radius", storageRadius, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) + db.logger.Info("reserve sampler finished", "duration", time.Since(t), "storage_radius", commitedDepth, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) return Sample{Stats: *allStats, Items: sampleItems}, nil } diff --git a/pkg/storer/sample_test.go b/pkg/storer/sample_test.go index 28761f76a3..2f97aaab13 100644 --- a/pkg/storer/sample_test.go +++ b/pkg/storer/sample_test.go @@ -145,11 +145,11 @@ func TestReserveSamplerSisterNeighborhood(t *testing.T) { t.Parallel() const ( - chunkCountPerPO = 64 - maxPO = 6 - networkRadius uint8 = 5 - doublingFactor uint8 = 2 - localRadius uint8 = networkRadius - doublingFactor + chunkCountPerPO = 64 + maxPO = 6 + committedDepth uint8 = 5 + doubling uint8 = 2 + depthOfResponsibility uint8 = committedDepth - doubling ) randChunks := func(baseAddr swarm.Address, startingRadius int, timeVar uint64) []swarm.Chunk { @@ -175,7 +175,7 @@ func TestReserveSamplerSisterNeighborhood(t *testing.T) { count := 0 // local neighborhood timeVar := uint64(time.Now().UnixNano()) - chs := randChunks(baseAddr, int(networkRadius), timeVar) + chs := randChunks(baseAddr, int(committedDepth), timeVar) putter := st.ReservePutter() for _, ch := range chs { err := putter.Put(context.Background(), ch) @@ -185,10 +185,10 @@ func TestReserveSamplerSisterNeighborhood(t *testing.T) { } count += len(chs) - sisterAnchor := swarm.RandAddressAt(t, baseAddr, int(localRadius)) + sisterAnchor := swarm.RandAddressAt(t, baseAddr, int(depthOfResponsibility)) // chunks belonging to the sister neighborhood - chs = randChunks(sisterAnchor, int(networkRadius), timeVar) + chs = randChunks(sisterAnchor, int(committedDepth), timeVar) putter = st.ReservePutter() for _, ch := range chs { err := putter.Put(context.Background(), ch) @@ -201,12 +201,12 @@ func TestReserveSamplerSisterNeighborhood(t *testing.T) { t.Run("reserve size", reserveSizeTest(st.Reserve(), count)) t.Run("reserve sample", func(t *testing.T) { - sample, err := st.ReserveSample(context.TODO(), sisterAnchor.Bytes(), 0, timeVar, nil) + sample, err := st.ReserveSample(context.TODO(), sisterAnchor.Bytes(), doubling, timeVar, nil) if err != nil { t.Fatal(err) } - assertValidSample(t, sample, doublingFactor, baseAddr.Bytes()) + assertValidSample(t, sample, doubling, baseAddr.Bytes()) assertSampleNoErrors(t, sample) if sample.Stats.NewIgnored != 0 { @@ -215,17 +215,17 @@ func TestReserveSamplerSisterNeighborhood(t *testing.T) { }) t.Run("reserve sample 2", func(t *testing.T) { - sample, err := st.ReserveSample(context.TODO(), sisterAnchor.Bytes(), localRadius, timeVar, nil) + sample, err := st.ReserveSample(context.TODO(), sisterAnchor.Bytes(), committedDepth, timeVar, nil) if err != nil { t.Fatal(err) } - assertValidSample(t, sample, localRadius, baseAddr.Bytes()) + assertValidSample(t, sample, depthOfResponsibility, baseAddr.Bytes()) assertSampleNoErrors(t, sample) for _, s := range sample.Items { - if got := swarm.Proximity(s.ChunkAddress.Bytes(), baseAddr.Bytes()); got != localRadius { - t.Fatalf("promixity must be exactly %d, got %d", localRadius, got) + if got := swarm.Proximity(s.ChunkAddress.Bytes(), baseAddr.Bytes()); got != depthOfResponsibility { + t.Fatalf("promixity must be exactly %d, got %d", depthOfResponsibility, got) } }