Skip to content

Commit

Permalink
multi: Consolidate waitgroup logic.
Browse files Browse the repository at this point in the history
This switches the various subsystems over to use a new pattern that
consolidates the waitgroup logic in a single location.

This pattern is easier to reason about and less error prone since it's
trivial to see at a glance that the calls to Done are happening as
intended versus having to chase them down all over the code.
  • Loading branch information
davecgh committed Oct 25, 2023
1 parent ae64e51 commit 03846ae
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 144 deletions.
24 changes: 13 additions & 11 deletions connmgr/connmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ type ConnManager struct {
// with overall connection request count above.
assignIDMtx sync.Mutex

// The following fields are used for lifecycle management of the connection
// manager.
wg sync.WaitGroup
// quit is used for lifecycle management of the connection manager.
quit chan struct{}

// cfg specifies the configuration of the connection manager and is set at
Expand Down Expand Up @@ -458,7 +456,6 @@ out:
}
}

cm.wg.Done()
log.Trace("Connection handler done")
}

Expand Down Expand Up @@ -685,7 +682,6 @@ func (cm *ConnManager) listenHandler(ctx context.Context, listener net.Listener)
go cm.cfg.OnAccept(conn)
}

cm.wg.Done()
log.Tracef("Listener handler done for %s", listener.Addr())
}

Expand All @@ -696,8 +692,12 @@ func (cm *ConnManager) Run(ctx context.Context) {
log.Trace("Starting connection manager")

// Start the connection handler goroutine.
cm.wg.Add(1)
go cm.connHandler(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
cm.connHandler(ctx)
wg.Done()
}()

// Start all the listeners so long as the caller requested them and provided
// a callback to be invoked when connections are accepted.
Expand All @@ -706,8 +706,11 @@ func (cm *ConnManager) Run(ctx context.Context) {
listeners = cm.cfg.Listeners
}
for _, listener := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenHandler(ctx, listener)
wg.Add(1)
go func(listener net.Listener) {
cm.listenHandler(ctx, listener)
wg.Done()
}(listener)
}

// Start enough outbound connections to reach the target number when not
Expand All @@ -729,8 +732,7 @@ func (cm *ConnManager) Run(ctx context.Context) {
// to recover anyways.
_ = listener.Close()
}

cm.wg.Wait()
wg.Wait()
log.Trace("Connection manager stopped")
}

Expand Down
18 changes: 11 additions & 7 deletions internal/blockchain/indexers/indexsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ type IndexSubscriber struct {
mtx sync.Mutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
quit chan struct{}
}

Expand Down Expand Up @@ -345,7 +344,6 @@ func (s *IndexSubscriber) handleSyncSubscribers(ctx context.Context) {
for {
select {
case <-ctx.Done():
s.wg.Done()
return

case <-ticker.C:
Expand All @@ -367,7 +365,6 @@ func (s *IndexSubscriber) handleIndexUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
s.wg.Done()
return

case ntfn := <-s.c:
Expand All @@ -394,9 +391,16 @@ func (s *IndexSubscriber) handleIndexUpdates(ctx context.Context) {
//
// This should be run as a goroutine.
func (s *IndexSubscriber) Run(ctx context.Context) {
s.wg.Add(2)
go s.handleIndexUpdates(ctx)
go s.handleSyncSubscribers(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func() {
s.handleIndexUpdates(ctx)
wg.Done()
}()
go func() {
s.handleSyncSubscribers(ctx)
wg.Done()
}()

// Stop all the subscriptions and shutdown the subscriber when the context
// is cancelled.
Expand All @@ -411,6 +415,6 @@ func (s *IndexSubscriber) Run(ctx context.Context) {
}
s.mtx.Unlock()
s.cancel()
s.wg.Wait()
wg.Wait()
log.Tracef("Index subscriber stopped")
}
31 changes: 19 additions & 12 deletions internal/mining/bgblktmplgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func (wg *waitGroup) Wait() {
// - Block direct access while generating new templates that will make the
// current template stale (e.g. new parent or new votes)
type BgBlkTmplGenerator struct {
wg sync.WaitGroup
quit chan struct{}

// These fields are provided by the caller when the generator is created and
Expand Down Expand Up @@ -484,7 +483,6 @@ func (g *BgBlkTmplGenerator) notifySubscribersHandler(ctx context.Context) {
g.subscriptionMtx.Unlock()

case <-ctx.Done():
g.wg.Done()
return
}
}
Expand Down Expand Up @@ -559,7 +557,6 @@ func (g *BgBlkTmplGenerator) regenQueueHandler(ctx context.Context) {
}

case <-ctx.Done():
g.wg.Done()
return
}
}
Expand Down Expand Up @@ -1402,7 +1399,6 @@ func (g *BgBlkTmplGenerator) regenHandler(ctx context.Context) {
g.genTemplateAsync(ctx, TURNewParent)

case <-ctx.Done():
g.wg.Done()
return
}
}
Expand Down Expand Up @@ -1477,8 +1473,6 @@ func (g *BgBlkTmplGenerator) ForceRegen() {
//
// This must be run as a goroutine.
func (g *BgBlkTmplGenerator) initialStartupHandler(ctx context.Context) {
defer g.wg.Done()

// Wait until the chain is synced when unsynced mining is not allowed.
if !g.cfg.AllowUnsyncedMining && !g.cfg.IsCurrent() {
ticker := time.NewTicker(time.Second)
Expand Down Expand Up @@ -1515,14 +1509,27 @@ func (g *BgBlkTmplGenerator) initialStartupHandler(ctx context.Context) {
// necessary for it to function properly and blocks until the provided context
// is cancelled.
func (g *BgBlkTmplGenerator) Run(ctx context.Context) {
g.wg.Add(4)
go g.regenQueueHandler(ctx)
go g.regenHandler(ctx)
go g.notifySubscribersHandler(ctx)
go g.initialStartupHandler(ctx)
var wg sync.WaitGroup
wg.Add(4)
go func() {
g.regenQueueHandler(ctx)
wg.Done()
}()
go func() {
g.regenHandler(ctx)
wg.Done()
}()
go func() {
g.notifySubscribersHandler(ctx)
wg.Done()
}()
go func() {
g.initialStartupHandler(ctx)
wg.Done()
}()

// Shutdown the generator when the context is cancelled.
<-ctx.Done()
close(g.quit)
g.wg.Wait()
wg.Wait()
}
57 changes: 34 additions & 23 deletions internal/mining/cpuminer/cpuminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ type CPUMiner struct {
normalMining bool
discreteMining bool
submitBlockLock sync.Mutex
wg sync.WaitGroup
workerWg sync.WaitGroup
updateNumWorkers chan struct{}
queryHashesPerSec chan float64
speedStats map[uint64]*speedStats
Expand Down Expand Up @@ -194,7 +192,6 @@ out:
}
}

m.wg.Done()
log.Trace("CPU miner speed monitor done")
}

Expand Down Expand Up @@ -380,8 +377,6 @@ func (m *CPUMiner) solveBlock(ctx context.Context, header *wire.BlockHeader,
func (m *CPUMiner) solver(ctx context.Context, template *mining.BlockTemplate,
speedStats *speedStats, isBlake3PowActive bool) {

defer m.workerWg.Done()

for {
if ctx.Err() != nil {
return
Expand Down Expand Up @@ -464,10 +459,12 @@ func (m *CPUMiner) solver(ctx context.Context, template *mining.BlockTemplate,
// It must be run as a goroutine.
func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) {
log.Trace("Starting generate blocks worker")
defer func() {
m.workerWg.Done()
log.Trace("Generate blocks worker done")
}()
defer log.Trace("Generate blocks worker done")

// Separate waitgroup for solvers to ensure they are stopped prior to
// terminating the goroutine.
var solverWg sync.WaitGroup
defer solverWg.Wait()

// Subscribe for block template updates and ensure the subscription is
// stopped along with the worker.
Expand All @@ -488,7 +485,8 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) {
case templateNtfn := <-templateSub.C():
// Clean up the map that tracks the number of blocks mined on a
// given parent whenever a template is received due to a new parent.
prevHash := templateNtfn.Template.Block.Header.PrevBlock
template := templateNtfn.Template
prevHash := template.Block.Header.PrevBlock
if m.cfg.PermitConnectionlessMining {
if templateNtfn.Reason == mining.TURNewParent {
m.Lock()
Expand Down Expand Up @@ -516,9 +514,11 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) {

// Start another goroutine for the new template.
solverCtx, solverCancel = context.WithCancel(ctx)
m.workerWg.Add(1)
go m.solver(solverCtx, templateNtfn.Template, &speedStats,
isBlake3PowActive)
solverWg.Add(1)
go func() {
m.solver(solverCtx, template, &speedStats, isBlake3PowActive)
solverWg.Done()
}()

case <-ctx.Done():
// Ensure resources associated with the solver goroutine context are
Expand All @@ -541,6 +541,11 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) {
//
// It must be run as a goroutine.
func (m *CPUMiner) miningWorkerController(ctx context.Context) {
// Separate waitgroup for workers to ensure they are stopped prior to
// terminating the goroutine.
var workerWg sync.WaitGroup
defer workerWg.Wait()

// launchWorker groups common code to launch a worker for subscribing for
// template updates and solving blocks.
type workerState struct {
Expand All @@ -554,8 +559,11 @@ func (m *CPUMiner) miningWorkerController(ctx context.Context) {
cancel: wCancel,
})

m.workerWg.Add(1)
go m.generateBlocks(wCtx, curWorkerID)
workerWg.Add(1)
go func() {
m.generateBlocks(wCtx, curWorkerID)
workerWg.Done()
}()
curWorkerID++
}

Expand Down Expand Up @@ -603,10 +611,6 @@ out:
break out
}
}

// Wait until all workers shut down.
m.workerWg.Wait()
m.wg.Done()
}

// Run starts the CPU miner with zero workers which means it will be idle. It
Expand All @@ -617,14 +621,21 @@ out:
func (m *CPUMiner) Run(ctx context.Context) {
log.Trace("Starting CPU miner in idle state")

m.wg.Add(2)
go m.speedMonitor(ctx)
go m.miningWorkerController(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func() {
m.speedMonitor(ctx)
wg.Done()
}()
go func() {
m.miningWorkerController(ctx)
wg.Done()
}()

// Shutdown the miner when the context is cancelled.
<-ctx.Done()
close(m.quit)
m.wg.Wait()
wg.Wait()
log.Trace("CPU miner stopped")
}

Expand Down
15 changes: 8 additions & 7 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ func (state *headerSyncState) resetStallTimeout() {
// SyncManager provides a concurrency safe sync manager for handling all
// incoming blocks.
type SyncManager struct {
// The following fields are used for lifecycle management of the sync
// manager.
wg sync.WaitGroup
// quit is used for lifecycle management of the sync manager.
quit chan struct{}

// cfg specifies the configuration of the sync manager and is set at
Expand Down Expand Up @@ -1527,7 +1525,6 @@ out:
}
}

m.wg.Done()
log.Trace("Sync manager event handler done")
}

Expand Down Expand Up @@ -1792,13 +1789,17 @@ func (m *SyncManager) Run(ctx context.Context) {
log.Trace("Starting sync manager")

// Start the event handler goroutine.
m.wg.Add(1)
go m.eventHandler(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
m.eventHandler(ctx)
wg.Done()
}()

// Shutdown the sync manager when the context is cancelled.
<-ctx.Done()
close(m.quit)
m.wg.Wait()
wg.Wait()
log.Trace("Sync manager stopped")
}

Expand Down
Loading

0 comments on commit 03846ae

Please sign in to comment.