diff --git a/LICENSE b/LICENSE index 4da85a2..eceaa94 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,19 @@ The MIT License (MIT) -Copyright (c) 2015 Justin Ellis +*** Copyright Notice *** +PTMCMCSampler Copyright (c) 2015 to 2021, Justin Ellis +All rights reserved. + +PTMCMCSampler Copyright (c) 2022, Mark Forrer on behalf of +National Technology & Engineering Solutions of Sandia, LLC. +All rights reserved. + +NOTICE. This Software was partially developed under funding from the U.S. Department +of Energy and the U.S. Government consequently retains certain rights. As such, the U.S. +Government has been granted for itself and others acting on its behalf a paid-up, nonexclusive, +irrevocable, worldwide license in the Software to reproduce, distribute copies to the public, +prepare derivative works, and perform publicly and display publicly, and to permit others to do so. +******** Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/PTMCMCSampler/PTMCMCSampler.py b/PTMCMCSampler/PTMCMCSampler.py index ef8a705..b50cba3 100755 --- a/PTMCMCSampler/PTMCMCSampler.py +++ b/PTMCMCSampler/PTMCMCSampler.py @@ -1,3 +1,4 @@ +import logging import os import sys import time @@ -21,6 +22,8 @@ ) pass +logger = logging.getLogger(__name__) + class PTSampler(object): @@ -410,12 +413,9 @@ def sample( lp = self.logp(p0) if lp == float(-np.inf): - lnprob0 = -np.inf lnlike0 = -np.inf - else: - lnlike0 = self.logl(p0) lnprob0 = 1 / self.temp * lnlike0 + lp @@ -435,40 +435,43 @@ def sample( # call PTMCMCOneStep p0, lnlike0, lnprob0 = self.PTMCMCOneStep(p0, lnlike0, lnprob0, iter) - # compute effective number of samples - if iter % 1000 == 0 and iter > 2 * self.burn and self.MPIrank == 0: - try: - Neff = iter / max( - 1, - np.nanmax( - [acor.acor(self._AMbuffer[self.burn : (iter - 1), ii])[0] for ii in range(self.ndim)] - ), - ) - # print('\n {0} effective samples'.format(Neff)) - except NameError: - Neff = 0 - pass - - # stop if reached maximum number of iterations - if self.MPIrank == 0 and iter >= self.Niter - 1: - if self.verbose: - print("\nRun Complete") - runComplete = True - - # stop if reached effective number of samples - if self.MPIrank == 0 and int(Neff) > self.neff: - if self.verbose: - print("\nRun Complete with {0} effective samples".format(int(Neff))) - runComplete = True - - if self.MPIrank == 0 and runComplete: - for jj in range(1, self.nchain): - self.comm.send(runComplete, dest=jj, tag=55) - - # check for other chains - if self.MPIrank > 0: - runComplete = self.comm.Iprobe(source=0, tag=55) - time.sleep(0.000001) # trick to get around + # for T=0: compute effective number of samples + if self.MPIrank == 0: + if iter % 1000 == 0 and iter > 2 * self.burn: + try: + Neff = iter / max( + 1, + np.nanmax( + [acor.acor(self._AMbuffer[self.burn : (iter - 1), ii])[0] for ii in range(self.ndim)] + ), + ) + # print('\n {0} effective samples'.format(Neff)) + except NameError: + Neff = 0 + pass + + # stop if reached maximum number of iterations + if iter >= self.Niter - 1: + if self.verbose: + # Print a message when the T=0 process + # decides overall completion. + elapsed = time.time() - self.tstart + print(f"\nRun Complete in {elapsed:.2f} s") + runComplete = True + + # stop if reached effective number of samples + if int(Neff) > self.neff: + if self.verbose: + print(f"\nRun Complete with {int(Neff)} effective samples") + runComplete = True + + # always inform other chains whether the run is complete, + # even if that's not the case + runComplete = self.comm.bcast(runComplete, root=0) + + # Log status of the current chain. + elapsed = time.time() - self.tstart + logger.debug(f"Chain done in {elapsed:.2f} s") def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): """ @@ -484,57 +487,55 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): @return lnprob0: next value of posterior after one MCMC step """ - # update covariance matrix - if (iter - 1) % self.covUpdate == 0 and (iter - 1) != 0 and self.MPIrank == 0: - self._updateRecursive(iter - 1, self.covUpdate) - - # broadcast to other chains - [self.comm.send(self.cov, dest=rank + 1, tag=111) for rank in range(self.nchain - 1)] - - # check for sent covariance matrix from T = 0 chain - getCovariance = self.comm.Iprobe(source=0, tag=111) - time.sleep(0.000001) - if getCovariance and self.MPIrank > 0: - self.cov[:, :] = self.comm.recv(source=0, tag=111) - for ct, group in enumerate(self.groups): - covgroup = np.zeros((len(group), len(group))) - for ii in range(len(group)): - for jj in range(len(group)): - covgroup[ii, jj] = self.cov[group[ii], group[jj]] - - self.U[ct], self.S[ct], v = np.linalg.svd(covgroup) - getCovariance = 0 + # Update covariance matrix + # Synchronous checks of runComplete in sample() guarantee that chains + # are executing the same step in parallel. + cov_update_step = (iter - 1) % self.covUpdate == 0 and (iter - 1) != 0 + if cov_update_step: + cov = None + if self.MPIrank == 0: + self._updateRecursive(iter - 1, self.covUpdate) + cov = self.cov + cov = self.comm.bcast(cov, root=0) + + if self.MPIrank != 0 and cov is not None: + self.cov[:, :] = cov + for ct, group in enumerate(self.groups): + covgroup = np.zeros((len(group), len(group))) + for ii in range(len(group)): + for jj in range(len(group)): + covgroup[ii, jj] = self.cov[group[ii], group[jj]] + self.U[ct], self.S[ct], v = np.linalg.svd(covgroup) # update DE buffer - if (iter - 1) % self.burn == 0 and (iter - 1) != 0 and self.MPIrank == 0: - self._updateDEbuffer(iter - 1, self.burn) + # Synchronous checks of runComplete in sample() guarantee that chains + # are executing the same step in parallel. + burn_step = (iter - 1) % self.burn == 0 and (iter - 1) != 0 + if burn_step: + buffer = None + if self.MPIrank == 0: + self._updateDEbuffer(iter - 1, self.burn) + buffer = self._DEbuffer # broadcast to other chains - [self.comm.send(self._DEbuffer, dest=rank + 1, tag=222) for rank in range(self.nchain - 1)] + buffer = self.comm.bcast(buffer, root=0) - # check for sent DE buffer from T = 0 chain - getDEbuf = self.comm.Iprobe(source=0, tag=222) - time.sleep(0.000001) + if self.MPIrank > 0: + self._DEbuffer = buffer - if getDEbuf and self.MPIrank > 0: - self._DEbuffer = self.comm.recv(source=0, tag=222) + # randomize cycle + if self.DEJump not in self.propCycle: + self.addProposalToCycle(self.DEJump, self.DEweight) + self.randomizeProposalCycle() - # randomize cycle - if self.DEJump not in self.propCycle: + # after burn in, add DE jumps; + if self.MPIrank == 0 and (iter - 1) == self.burn: + if self.verbose: + print("Adding DE jump with weight {0}".format(self.DEweight)) self.addProposalToCycle(self.DEJump, self.DEweight) - self.randomizeProposalCycle() - - # reset - getDEbuf = 0 - # after burn in, add DE jumps - if (iter - 1) == self.burn and self.MPIrank == 0: - if self.verbose: - print("Adding DE jump with weight {0}".format(self.DEweight)) - self.addProposalToCycle(self.DEJump, self.DEweight) - - # randomize cycle - self.randomizeProposalCycle() + # randomize cycle + self.randomizeProposalCycle() # jump proposal ### @@ -552,11 +553,8 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): lp = self.logp(y) if lp == -np.inf: - newlnprob = -np.inf - else: - newlnlike = self.logl(y) newlnprob = 1 / self.temp * newlnlike + lp @@ -571,8 +569,9 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): self.naccepted += 1 self.jumpDict[jump_name][1] += 1 - # temperature swap - swapReturn, p0, lnlike0, lnprob0 = self.PTswap(p0, lnlike0, lnprob0, iter) + # temperature swap. + # note PTswap() assumes iter is 0-indexed, but it's 1-indexed here + swapReturn, p0, lnlike0, lnprob0 = self.PTswap(p0, lnlike0, lnprob0, iter - 1) # check return value if swapReturn != 0: @@ -604,73 +603,55 @@ def PTswap(self, p0, lnlike0, lnprob0, iter): """ # initialize variables - readyToSwap = 0 swapAccepted = 0 - swapProposed = 0 - - # if Tskip is reached, block until next chain in ladder is ready for - # swap proposal - if iter % self.Tskip == 0 and self.MPIrank < self.nchain - 1: - swapProposed = 1 - - # send current likelihood for swap proposal - self.comm.send(lnlike0, dest=self.MPIrank + 1, tag=18) - - # determine if swap was accepted - swapAccepted = self.comm.recv(source=self.MPIrank + 1, tag=888) - - # perform swap - if swapAccepted: - - # exchange likelihood - lnlike0 = self.comm.recv(source=self.MPIrank + 1, tag=18) + swapProposed = (iter % self.Tskip) == 0 and iter != 0 - # exchange parameters - pnew = np.empty(self.ndim) - self.comm.Sendrecv( - p0, dest=self.MPIrank + 1, sendtag=19, recvbuf=pnew, source=self.MPIrank + 1, recvtag=19 - ) - p0 = pnew + # Only exchange swap messages when a swap will be proposed. + # Synchronous checks of runComplete in sample() guarantee that chains + # are executing the same step in parallel. + if swapProposed: + if self.MPIrank < self.nchain - 1: + hotter_chain = self.MPIrank + 1 + # send current likelihood for swap proposal + self.comm.send(lnlike0, dest=hotter_chain, tag=18) - # calculate new posterior values - lnprob0 = 1 / self.temp * lnlike0 + self.logp(p0) + # determine if swap was accepted + swapAccepted = self.comm.recv(source=hotter_chain, tag=888) - # check if next lowest temperature is ready to swap - elif self.MPIrank > 0: + # perform swap + if swapAccepted: + # exchange likelihood + lnlike0 = self.comm.recv(source=hotter_chain, tag=18) - readyToSwap = self.comm.Iprobe(source=self.MPIrank - 1, tag=18) - # trick to get around processor using 100% cpu while waiting - time.sleep(0.000001) + # exchange parameters + pnew = np.empty(self.ndim) + self.comm.Sendrecv(p0, dest=hotter_chain, sendtag=19, recvbuf=pnew, source=hotter_chain, recvtag=19) + p0 = pnew - # hotter chain decides acceptance - if readyToSwap: - newlnlike = self.comm.recv(source=self.MPIrank - 1, tag=18) + # calculate new posterior values + lnprob0 = 1 / self.temp * lnlike0 + self.logp(p0) - # determine if swap is accepted and tell other chain - logChainSwap = (1 / self.ladder[self.MPIrank - 1] - 1 / self.ladder[self.MPIrank]) * ( - lnlike0 - newlnlike - ) + # check if swap is accepted and inform lower temp chain + # (hotter chain decides acceptance) + if self.MPIrank > 0: + cooler_chain = self.MPIrank - 1 + newlnlike = self.comm.recv(source=cooler_chain, tag=18) - if logChainSwap > np.log(np.random.rand()): - swapAccepted = 1 - else: - swapAccepted = 0 + logChainSwap = (1 / self.ladder[cooler_chain] - 1 / self.ladder[self.MPIrank]) * (lnlike0 - newlnlike) + swapAccepted = logChainSwap > np.log(np.random.rand()) # send out result - self.comm.send(swapAccepted, dest=self.MPIrank - 1, tag=888) + self.comm.send(swapAccepted, dest=cooler_chain, tag=888) # perform swap if swapAccepted: - # exchange likelihood - self.comm.send(lnlike0, dest=self.MPIrank - 1, tag=18) + self.comm.send(lnlike0, dest=cooler_chain, tag=18) lnlike0 = newlnlike # exchange parameters pnew = np.empty(self.ndim) - self.comm.Sendrecv( - p0, dest=self.MPIrank - 1, sendtag=19, recvbuf=pnew, source=self.MPIrank - 1, recvtag=19 - ) + self.comm.Sendrecv(p0, dest=cooler_chain, sendtag=19, recvbuf=pnew, source=cooler_chain, recvtag=19) p0 = pnew # calculate new posterior values diff --git a/examples/gaussian_mpi_example.py b/examples/gaussian_mpi_example.py new file mode 100644 index 0000000..42d2b2e --- /dev/null +++ b/examples/gaussian_mpi_example.py @@ -0,0 +1,201 @@ +""" +A Python script that repackages PTMCMCSampler's "simple" example to run from the command line using +mpirun. https://github.com/jellis18/PTMCMCSampler/blob/master/examples/simple.ipynb +------------------ +Runtime environment: +This script requires a Python runtime environment with installed dependencies, and also a working +MPI installation. + +------------------ +Sample commands: + +Show help for the script: +mpi4py -m gaussian_mpi_example -h + +Testing the script (one chain): +python -m gaussian_mpi_example [output_dir] + +Basic use (multiple chains): +mpirun -np [n_chains] python -m gaussian_mpi_example [output_dir] + +Running 2 chains with repeatable results: +mpirun -np 2 python -m gaussian_mpi_example --seeds 10 11 /code/results/2-chains + +Result files are written to the configured output directory, overwriting any previous results +with the same names. Clients are responsible to clean up result files generated by the script +(from the PTMCMCSampler library). +See also supporting library PTMCMCSampler https://github.com/jellis18/PTMCMCSampler +""" +import argparse +import logging +import random + +import numpy as np +from mpi4py import MPI + +from PTMCMCSampler import PTMCMCSampler + +logger = logging.getLogger(__name__) + + +class GaussianLikelihood: + def __init__(self, ndim=2, pmin=-10, pmax=10): + + self.a = np.ones(ndim) * pmin + self.b = np.ones(ndim) * pmax + + # get means + self.mu = np.random.uniform(pmin, pmax, ndim) + + # ... and a positive definite, non-trivial covariance matrix. + cov = 0.5 - np.random.rand(ndim**2).reshape((ndim, ndim)) + cov = np.triu(cov) + cov += cov.T - np.diag(cov.diagonal()) + self.cov = np.dot(cov, cov) + + # Invert the covariance matrix first. + self.icov = np.linalg.inv(self.cov) + + def lnlikefn(self, x): + diff = x - self.mu + return -np.dot(diff, np.dot(self.icov, diff)) / 2.0 + + def lnpriorfn(self, x): + + if np.all(self.a <= x) and np.all(self.b >= x): + return 0.0 + else: + return -np.inf + + +class UniformJump: + def __init__(self, pmin, pmax): + """Draw random parameters from pmin, pmax""" + self.pmin = pmin + self.pmax = pmax + + def jump(self, x, it, beta): + """ + Function prototype must read in parameter vector x, + sampler iteration number it, and inverse temperature beta + """ + + # log of forward-backward jump probability + lqxy = 0 + + # uniformly drawm parameters + q = np.random.uniform(self.pmin, self.pmax, len(x)) + + return q, lqxy + + +def parallel_tempering_opt(output_dir, verbose=False): + """ + Runs the parallel tempering example based on code from PTMCMCSampler's simple.ipynb. + Note this function purposefully omits a random seed parameter. For repeatable results when + calling it directly, call np.random.seed() in client code (and only once per process, + since otherwise it may cause random numbers to be repeated). + :param output_dir: the output directory for results + """ + + # Setup Gaussian model class + ndim = 20 + pmin, pmax = 0.0, 10 + glo = GaussianLikelihood(ndim=ndim, pmin=pmin, pmax=pmax) + + # Setup sampler + # Set the start position and the covariance + p0 = np.random.uniform(pmin, pmax, ndim) + cov = np.eye(ndim) * 0.1**2 + + sampler = PTMCMCSampler.PTSampler(ndim, glo.lnlikefn, glo.lnpriorfn, np.copy(cov), outDir=output_dir) + + # Add custom jump + ujump = UniformJump(pmin, pmax) + sampler.addProposalToCycle(ujump.jump, 5) + + # Run Sampler for 100,000 steps + sampler.sample( + p0, + 100000, + burn=500, + thin=1, + covUpdate=500, + SCAMweight=20, + AMweight=20, + DEweight=20, + ) + + +if __name__ == "__main__": + """ + Command line program to run a single parallel tempering chain. + """ + + # Get info about the MPI runtime environment + comm = MPI.COMM_WORLD + n_chains = comm.Get_size() # total number of chains / MPI processes + process_rank = comm.Get_rank() # ID of *this* chain / MPI process (zero-indexed) + + # Configure program arguments. + parser = argparse.ArgumentParser( + description="A wrapper script to run PTMCMCSampler's parallel tempering example using " "mpirun" + ) + + parser.add_argument( + "output_dir", + type=str, + default=None, + help="Path of the output directory for run artifacts", + ) + parser.add_argument( + "-s", + "--seeds", + type=int, + default=None, + # If any random seeds are provided, require a seed for each chain. + nargs=n_chains, + help="A list of integer random seeds to pass to each temperature chain, " + "respectively. Values must be convertible to 32-bit unsigned " + "integers for compatability with NumPy's random.seed()", + ) + parser.add_argument("-v", "--verbose", action="store_true", default=False) + parser.add_argument("-d", "--debug", action="store_true", default=False) + + # Parse the input. + args = parser.parse_args() + + # Seed random state for this process/chain. + seeds = args.seeds + if seeds: + # Since PTMCMCSampler uses np.random*() functions to get random numbers, and we're + # running each chain in a separate Python process, we need to seed each process to get + # predictable results. + # Note we only set random state here since any Python client importing this module directly + # should retain control of random state, and only seed it once per process. + + # extract & set the seed for this process + seed = seeds[process_rank] + np.random.seed(seed) + random.seed(seed) + + # Configure logging for this process. + debug = args.debug + if debug: + # Get info about the MPI runtime environment + comm = MPI.COMM_WORLD + n_chains = comm.Get_size() # total number of chains / MPI processes + process_rank = comm.Get_rank() # ID of *this* chain / MPI process (zero-indexed) + + # Configure logging in this process to DEBUG level + # & prepend the chain # to all messages + chain = f"Chain {process_rank + 1} of {n_chains}:" + logging.basicConfig( + format=f"%(levelname)s:%(asctime)s:{chain}:%(name)s: %(message)s", + level=logging.DEBUG, + datefmt="%H:%M:%S", + ) + + # Do the work! + logger.debug("Starting") + parallel_tempering_opt(args.output_dir, args.verbose)