Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix repeatability, deadlocks, and reduced mixing for > 1 chain #30

Closed
wants to merge 9 commits into from
15 changes: 14 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
The MIT License (MIT)

Copyright (c) 2015 Justin Ellis
*** Copyright Notice ***
PTMCMCSampler Copyright (c) 2015 to 2021, Justin Ellis
All rights reserved.

PTMCMCSampler Copyright (c) 2022, Mark Forrer on behalf of
National Technology & Engineering Solutions of Sandia, LLC.
All rights reserved.

NOTICE. This Software was partially developed under funding from the U.S. Department
of Energy and the U.S. Government consequently retains certain rights. As such, the U.S.
Government has been granted for itself and others acting on its behalf a paid-up, nonexclusive,
irrevocable, worldwide license in the Software to reproduce, distribute copies to the public,
prepare derivative works, and perform publicly and display publicly, and to permit others to do so.
********

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
247 changes: 114 additions & 133 deletions PTMCMCSampler/PTMCMCSampler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import sys
import time
Expand All @@ -21,6 +22,8 @@
)
pass

logger = logging.getLogger(__name__)


class PTSampler(object):

Expand Down Expand Up @@ -410,12 +413,9 @@ def sample(
lp = self.logp(p0)

if lp == float(-np.inf):

lnprob0 = -np.inf
lnlike0 = -np.inf

else:

lnlike0 = self.logl(p0)
lnprob0 = 1 / self.temp * lnlike0 + lp

Expand All @@ -435,40 +435,43 @@ def sample(
# call PTMCMCOneStep
p0, lnlike0, lnprob0 = self.PTMCMCOneStep(p0, lnlike0, lnprob0, iter)

# compute effective number of samples
if iter % 1000 == 0 and iter > 2 * self.burn and self.MPIrank == 0:
try:
Neff = iter / max(
1,
np.nanmax(
[acor.acor(self._AMbuffer[self.burn : (iter - 1), ii])[0] for ii in range(self.ndim)]
),
)
# print('\n {0} effective samples'.format(Neff))
except NameError:
Neff = 0
pass

# stop if reached maximum number of iterations
if self.MPIrank == 0 and iter >= self.Niter - 1:
if self.verbose:
print("\nRun Complete")
runComplete = True

# stop if reached effective number of samples
if self.MPIrank == 0 and int(Neff) > self.neff:
if self.verbose:
print("\nRun Complete with {0} effective samples".format(int(Neff)))
runComplete = True

if self.MPIrank == 0 and runComplete:
for jj in range(1, self.nchain):
self.comm.send(runComplete, dest=jj, tag=55)

# check for other chains
if self.MPIrank > 0:
runComplete = self.comm.Iprobe(source=0, tag=55)
time.sleep(0.000001) # trick to get around
# for T=0: compute effective number of samples
if self.MPIrank == 0:
if iter % 1000 == 0 and iter > 2 * self.burn:
try:
Neff = iter / max(
1,
np.nanmax(
[acor.acor(self._AMbuffer[self.burn : (iter - 1), ii])[0] for ii in range(self.ndim)]
),
)
# print('\n {0} effective samples'.format(Neff))
except NameError:
Neff = 0
pass

# stop if reached maximum number of iterations
if iter >= self.Niter - 1:
if self.verbose:
# Print a message when the T=0 process
# decides overall completion.
elapsed = time.time() - self.tstart
print(f"\nRun Complete in {elapsed:.2f} s")
runComplete = True

# stop if reached effective number of samples
if int(Neff) > self.neff:
if self.verbose:
print(f"\nRun Complete with {int(Neff)} effective samples")
runComplete = True

# always inform other chains whether the run is complete,
# even if that's not the case
runComplete = self.comm.bcast(runComplete, root=0)

# Log status of the current chain.
elapsed = time.time() - self.tstart
logger.debug(f"Chain done in {elapsed:.2f} s")

def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter):
"""
Expand All @@ -484,57 +487,55 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter):
@return lnprob0: next value of posterior after one MCMC step

"""
# update covariance matrix
if (iter - 1) % self.covUpdate == 0 and (iter - 1) != 0 and self.MPIrank == 0:
self._updateRecursive(iter - 1, self.covUpdate)

# broadcast to other chains
[self.comm.send(self.cov, dest=rank + 1, tag=111) for rank in range(self.nchain - 1)]

# check for sent covariance matrix from T = 0 chain
getCovariance = self.comm.Iprobe(source=0, tag=111)
time.sleep(0.000001)
if getCovariance and self.MPIrank > 0:
self.cov[:, :] = self.comm.recv(source=0, tag=111)
for ct, group in enumerate(self.groups):
covgroup = np.zeros((len(group), len(group)))
for ii in range(len(group)):
for jj in range(len(group)):
covgroup[ii, jj] = self.cov[group[ii], group[jj]]

self.U[ct], self.S[ct], v = np.linalg.svd(covgroup)
getCovariance = 0
# Update covariance matrix
# Synchronous checks of runComplete in sample() guarantee that chains
# are executing the same step in parallel.
cov_update_step = (iter - 1) % self.covUpdate == 0 and (iter - 1) != 0
if cov_update_step:
cov = None
if self.MPIrank == 0:
self._updateRecursive(iter - 1, self.covUpdate)
cov = self.cov
cov = self.comm.bcast(cov, root=0)

if self.MPIrank != 0 and cov is not None:
self.cov[:, :] = cov
for ct, group in enumerate(self.groups):
covgroup = np.zeros((len(group), len(group)))
for ii in range(len(group)):
for jj in range(len(group)):
covgroup[ii, jj] = self.cov[group[ii], group[jj]]
self.U[ct], self.S[ct], v = np.linalg.svd(covgroup)

# update DE buffer
if (iter - 1) % self.burn == 0 and (iter - 1) != 0 and self.MPIrank == 0:
self._updateDEbuffer(iter - 1, self.burn)
# Synchronous checks of runComplete in sample() guarantee that chains
# are executing the same step in parallel.
burn_step = (iter - 1) % self.burn == 0 and (iter - 1) != 0
if burn_step:
buffer = None
if self.MPIrank == 0:
self._updateDEbuffer(iter - 1, self.burn)
buffer = self._DEbuffer

# broadcast to other chains
[self.comm.send(self._DEbuffer, dest=rank + 1, tag=222) for rank in range(self.nchain - 1)]
buffer = self.comm.bcast(buffer, root=0)

# check for sent DE buffer from T = 0 chain
getDEbuf = self.comm.Iprobe(source=0, tag=222)
time.sleep(0.000001)
if self.MPIrank > 0:
self._DEbuffer = buffer

if getDEbuf and self.MPIrank > 0:
self._DEbuffer = self.comm.recv(source=0, tag=222)
# randomize cycle
if self.DEJump not in self.propCycle:
self.addProposalToCycle(self.DEJump, self.DEweight)
self.randomizeProposalCycle()

# randomize cycle
if self.DEJump not in self.propCycle:
# after burn in, add DE jumps;
if self.MPIrank == 0 and (iter - 1) == self.burn:
if self.verbose:
print("Adding DE jump with weight {0}".format(self.DEweight))
self.addProposalToCycle(self.DEJump, self.DEweight)
self.randomizeProposalCycle()

# reset
getDEbuf = 0

# after burn in, add DE jumps
if (iter - 1) == self.burn and self.MPIrank == 0:
if self.verbose:
print("Adding DE jump with weight {0}".format(self.DEweight))
self.addProposalToCycle(self.DEJump, self.DEweight)

# randomize cycle
self.randomizeProposalCycle()
# randomize cycle
self.randomizeProposalCycle()

# jump proposal ###

Expand All @@ -552,11 +553,8 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter):
lp = self.logp(y)

if lp == -np.inf:

newlnprob = -np.inf

else:

newlnlike = self.logl(y)
newlnprob = 1 / self.temp * newlnlike + lp

Expand All @@ -571,8 +569,9 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter):
self.naccepted += 1
self.jumpDict[jump_name][1] += 1

# temperature swap
swapReturn, p0, lnlike0, lnprob0 = self.PTswap(p0, lnlike0, lnprob0, iter)
# temperature swap.
# note PTswap() assumes iter is 0-indexed, but it's 1-indexed here
swapReturn, p0, lnlike0, lnprob0 = self.PTswap(p0, lnlike0, lnprob0, iter - 1)

# check return value
if swapReturn != 0:
Expand Down Expand Up @@ -604,73 +603,55 @@ def PTswap(self, p0, lnlike0, lnprob0, iter):
"""

# initialize variables
readyToSwap = 0
swapAccepted = 0
swapProposed = 0

# if Tskip is reached, block until next chain in ladder is ready for
# swap proposal
if iter % self.Tskip == 0 and self.MPIrank < self.nchain - 1:
swapProposed = 1

# send current likelihood for swap proposal
self.comm.send(lnlike0, dest=self.MPIrank + 1, tag=18)

# determine if swap was accepted
swapAccepted = self.comm.recv(source=self.MPIrank + 1, tag=888)

# perform swap
if swapAccepted:

# exchange likelihood
lnlike0 = self.comm.recv(source=self.MPIrank + 1, tag=18)
swapProposed = (iter % self.Tskip) == 0 and iter != 0

# exchange parameters
pnew = np.empty(self.ndim)
self.comm.Sendrecv(
p0, dest=self.MPIrank + 1, sendtag=19, recvbuf=pnew, source=self.MPIrank + 1, recvtag=19
)
p0 = pnew
# Only exchange swap messages when a swap will be proposed.
# Synchronous checks of runComplete in sample() guarantee that chains
# are executing the same step in parallel.
if swapProposed:
if self.MPIrank < self.nchain - 1:
hotter_chain = self.MPIrank + 1
# send current likelihood for swap proposal
self.comm.send(lnlike0, dest=hotter_chain, tag=18)

# calculate new posterior values
lnprob0 = 1 / self.temp * lnlike0 + self.logp(p0)
# determine if swap was accepted
swapAccepted = self.comm.recv(source=hotter_chain, tag=888)

# check if next lowest temperature is ready to swap
elif self.MPIrank > 0:
# perform swap
if swapAccepted:
# exchange likelihood
lnlike0 = self.comm.recv(source=hotter_chain, tag=18)

readyToSwap = self.comm.Iprobe(source=self.MPIrank - 1, tag=18)
# trick to get around processor using 100% cpu while waiting
time.sleep(0.000001)
# exchange parameters
pnew = np.empty(self.ndim)
self.comm.Sendrecv(p0, dest=hotter_chain, sendtag=19, recvbuf=pnew, source=hotter_chain, recvtag=19)
p0 = pnew

# hotter chain decides acceptance
if readyToSwap:
newlnlike = self.comm.recv(source=self.MPIrank - 1, tag=18)
# calculate new posterior values
lnprob0 = 1 / self.temp * lnlike0 + self.logp(p0)

# determine if swap is accepted and tell other chain
logChainSwap = (1 / self.ladder[self.MPIrank - 1] - 1 / self.ladder[self.MPIrank]) * (
lnlike0 - newlnlike
)
# check if swap is accepted and inform lower temp chain
# (hotter chain decides acceptance)
if self.MPIrank > 0:
cooler_chain = self.MPIrank - 1
newlnlike = self.comm.recv(source=cooler_chain, tag=18)

if logChainSwap > np.log(np.random.rand()):
swapAccepted = 1
else:
swapAccepted = 0
logChainSwap = (1 / self.ladder[cooler_chain] - 1 / self.ladder[self.MPIrank]) * (lnlike0 - newlnlike)
swapAccepted = logChainSwap > np.log(np.random.rand())

# send out result
self.comm.send(swapAccepted, dest=self.MPIrank - 1, tag=888)
self.comm.send(swapAccepted, dest=cooler_chain, tag=888)

# perform swap
if swapAccepted:

# exchange likelihood
self.comm.send(lnlike0, dest=self.MPIrank - 1, tag=18)
self.comm.send(lnlike0, dest=cooler_chain, tag=18)
lnlike0 = newlnlike

# exchange parameters
pnew = np.empty(self.ndim)
self.comm.Sendrecv(
p0, dest=self.MPIrank - 1, sendtag=19, recvbuf=pnew, source=self.MPIrank - 1, recvtag=19
)
self.comm.Sendrecv(p0, dest=cooler_chain, sendtag=19, recvbuf=pnew, source=cooler_chain, recvtag=19)
p0 = pnew

# calculate new posterior values
Expand Down
Loading