Skip to content

Commit

Permalink
WIP: clipped importance sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
han-ol committed May 31, 2024
1 parent 4dc22ef commit fbf07f4
Showing 1 changed file with 184 additions and 0 deletions.
184 changes: 184 additions & 0 deletions inverse_kinematics/benchmark/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,190 @@ def sample(self, n_samples, only_end=True, n_exclude_last=0):
coords = self.forward(prior_samples, only_end=only_end)
return prior_samples, coords

def sample_posterior_weights(
self, x, n_samples, max_n_batches=1000, clip_at_weight_factor=1e3, red_prior_n_samples_per_batch=100_000
):
""" "Sample from the posterior distribution with weights
Parameters
----------
x : array of dimension (n_dim,)
End point of the end effector
n_samples : int
number of samples to draw
max_n_batches : int, optional, default=1000
Upper limit to the number of batches computed. If sampling does
not succeed even though the end point is reachable by the arm,
increase this number.
clip_at_weight_factor : float, optional, default=1e3
Upper limit to the ratio of maximum weight by minimum weight.
If this limit is exceeded, lower weights are increased
to comply, leading to the proportionate rejection
of lower-weight samples.
Returns
-------
(posterior_samples, weights): tuple containing
1. a list of length n_components with an
array of shape (n_samples, n_params) for each component
2. a corresponding array of weights of shape (n_samples,)
"""

samples = [np.full((n_samples, c.n_params), np.nan) for c in self.components]
params = [np.zeros((0, c.n_params)) for c in self.components]
rel_weights = np.zeros((0,))
max_joint_weight = 1
n_reachable = 0

# Initial guess would be correct if no clipping is necessary
expected_missing = n_samples

for i in range(max_n_batches):
# Sample params_1, params_2 from the prior
accepted_red_prior_samples = self._sample_reduced_batched(
x,
min(expected_missing, 10_000_000),
n_samples_per_batch=red_prior_n_samples_per_batch,
max_n_batches=10_000,
)
n_reachable += accepted_red_prior_samples[0].shape[0]

# Explicitly calculate preimages param_a and param_b of x given params_1, params_2
new_params, _ = self._get_full_params(accepted_red_prior_samples, x)
del accepted_red_prior_samples

print("max_joint_weight", max_joint_weight)
# print("new_max_joint_weight from _get_full_params()", _)
# Calculate weights for new parameter samples relative to max_joint_weight
new_weights = self._get_weights(new_params)
new_max_joint_weight = new_weights.max()
new_rel_weights = new_weights / max_joint_weight
del new_weights
print("new_max_joint_weight", new_max_joint_weight)
print(new_rel_weights.max(), new_rel_weights.min())

# Join old and new samples and weights
for i in range(len(self.components)):
params[i] = np.concatenate([params[i], new_params[i]], axis=0)
rel_weights = np.concatenate([rel_weights, new_rel_weights], axis=0)

if new_max_joint_weight > max_joint_weight:
rel_weights = rel_weights * (max_joint_weight / new_max_joint_weight)
max_joint_weight = new_max_joint_weight
print(rel_weights.max(), rel_weights.min())

# Accept samples with rel_weight*clip_at_weight_factor according to rel_weight*clip_at_weight_factor
print(np.min(rel_weights * clip_at_weight_factor), np.max(rel_weights * clip_at_weight_factor))
weight_update = np.clip(rel_weights * clip_at_weight_factor, a_min=0, a_max=1)
accepted_mask = np.random.binomial(1, weight_update, size=(params[0].shape[0],)) == 1
for i in range(len(self.components)):
params[i] = params[i][accepted_mask]
# As a result of rejection, the relative weight (of accepted samples) is increased
rel_weights /= weight_update
rel_weights = rel_weights[accepted_mask]
print("new params", accepted_mask.shape, accepted_mask.sum())
del accepted_mask

num_accepted = params[0].shape[0]
acc_rate = num_accepted / n_reachable
expected_num = int(n_samples / acc_rate)
expected_missing = max(expected_num - n_reachable, 10_000)
print(expected_missing)
print(
f"Acceptance rate: {acc_rate}, {num_accepted} (goal: {n_samples}) in this run. Expected number of needed samples: {expected_missing}"
)
if num_accepted >= n_samples:
for i in range(len(samples)):
samples[i] = params[i][:n_samples]
rel_weights = rel_weights[:n_samples]
print("rel_weights.min()", rel_weights.min())
print("max_joint_weight", max_joint_weight)
return samples, rel_weights / rel_weights.min()
raise RuntimeError(
f"""Only {num_accepted} of {n_samples} samples could be produced. Please increase clip_at_weight_factor or max_n_batches.
"""
)

def _multinomial_resampling(self, weights, n_samples):
"""Sample with replacement from weighted distribution.
Parameters
----------
weights : list-like of float
list of weights as floats
Returns
-------
indexes : ndarray of ints
array of indexes into the weights defining the resample. i.e. the
index of the zeroth resample is indexes[0], etc.
"""
normalized_weights = weights / np.sum(weights)
cumulative_sum = np.cumsum(normalized_weights)
cumulative_sum[-1] = 1.0 # avoid round-off errors: ensures sum is exactly one
indexes = np.searchsorted(cumulative_sum, np.random.random(n_samples))

return indexes

def sample_posterior_robust(self, x, n_samples, clip_at_weight_factor=2, **kwargs):
"""Sample from the posterior distribution robustly
Based on clipped importance sampling followed by multinomial_resampling.
Parameters
----------
x : array of dimension (n_dim,)
End point of the end effector
n_samples : int
number of samples to draw
max_n_batches : int, optional, default=1000
Upper limit to the number of batches computed. If sampling does
not succeed even though the end point is reachable by the arm,
increase this number.
clip_at_weight_factor : float, optional, default=1e3
Upper limit to the ratio of maximum weight by minimum weight.
If this limit is exceeded, lower weights are increased
to comply, leading to the proportionate rejection
of lower-weight samples.
Returns
-------
posterior_samples: list of length n_components with an
array of shape (n_samples, n_params) for each component
"""
samples = [None for c in self.components]

# Obtain samples and weights with clipped importance sampling
unique_samples, weights = self.sample_posterior_weights(
x, n_samples, clip_at_weight_factor=clip_at_weight_factor, **kwargs
)

min_weight = weights.min()
min_weight_mask = np.equal(weights, min_weight)
print("min_weight_mask", min_weight_mask.sum())

# Without alteration use the unique samples with minimal weight
for i in range(len(samples)):
samples[i] = unique_samples[i][min_weight_mask]

# Resample anything with higher than minimal weight
n_resamples = np.round(weights[min_weight_mask == False].sum()).astype(int)
print("n_resamples", n_resamples)
indexes = self._multinomial_resampling(weights[min_weight_mask == False], n_resamples)
resampled = weights[min_weight_mask == False][indexes]

for i in range(len(samples)):
samples[i] = np.concatenate([samples[i], unique_samples[i][min_weight_mask == False][indexes]], axis=0)

# Shuffle all samples and cut down to n_samples
permutation = np.random.permutation(len(samples[0]))
for i in range(len(samples)):
samples[i] = samples[i][permutation][:n_samples]

return samples

def sample_posterior(self, x, n_samples, max_n_batches=1000, M=1000, red_prior_n_samples_per_batch=100_000):
"""Sample from the posterior distribution
Expand Down

0 comments on commit fbf07f4

Please sign in to comment.