Code 10: Probabilistic Programming Languages#
This is a reference notebook for the book Bayesian Modeling and Computation in Python
The textbook is not needed to use or run this code, though the context and explanation is missing from this notebook.
If you’d like a copy it’s available from the CRC Press or from Amazon. ``
from scipy import stats
import pymc3 as pm
import theano
import numpy as np
import matplotlib.pyplot as plt
import arviz as az
import datetime
print(f"Last Run {datetime.datetime.now()}")
Last Run 2021-11-19 16:52:42.074557
az.style.use("arviz-grayscale")
plt.rcParams["figure.dpi"] = 300
Posterior Computation#
Code 10.1#
from jax import grad
simple_grad = grad(lambda x: x**2)
print(simple_grad(4.0))
WARNING:absl:No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.)
8.0
Code 10.2#
from jax import grad
from jax.scipy.stats import norm
def model(test_point, observed):
z_pdf = norm.logpdf(test_point, loc=0, scale=5)
x_pdf = norm.logpdf(observed, loc=test_point, scale=1)
logpdf = z_pdf + x_pdf
return logpdf
model_grad = grad(model)
observed, test_point = 5.0, 2.5
logp_val = model(test_point, observed)
grad = model_grad(test_point, observed)
print(f"log_p_val: {logp_val}")
print(f"grad: {grad}")
log_p_val: -6.697315216064453
grad: 2.4000000953674316
Code 10.3#
with pm.Model() as model:
z = pm.Normal("z", 0., 5.)
x = pm.Normal("x", mu=z, sd=1., observed=observed)
func = model.logp_dlogp_function()
func.set_extra_values({})
print(func(np.array([test_point])))
[array(-6.69731498), array([2.4])]
Code 10.4#
def fraud_detector(fraud_observations, non_fraud_observations, fraud_prior=8, non_fraud_prior=6):
"""Conjugate Beta Binomial model for fraud detection"""
expectation = (fraud_prior + fraud_observations) / (
fraud_prior + fraud_observations + non_fraud_prior + non_fraud_observations)
if expectation > .5:
return {"suspend_card":True}
%timeit fraud_detector(2, 0)
188 ns ± 6.11 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
PPL Driven Transformations#
Code 10.9#
observed = np.repeat(2, 2)
pdf = stats.norm(0, 1).pdf(observed)
np.prod(pdf, axis=0)
0.0029150244650281948
Code 10.10#
observed = np.repeat(2, 1000)
pdf = stats.norm(0, 1).pdf(observed)
np.prod(pdf, axis=0)
0.0
Code 10.11#
1.2 - 1
0.19999999999999996
pdf[0], np.prod(pdf, axis=0)
(0.05399096651318806, 0.0)
Code 10.12#
logpdf = stats.norm(0, 1).logpdf(observed)
np.log(pdf[0]), logpdf[0], logpdf.sum()
(-2.9189385332046727, -2.9189385332046727, -2918.9385332046736)
np.log(pdf[0])
-2.9189385332046727
Distribution Transforms#
Code 10.13#
lower, upper = -1, 2
domain = np.linspace(lower, upper, 5)
transform = np.log(domain - lower) - np.log(upper - domain)
print(f"Original domain: {domain}")
print(f"Transformed domain: {transform}")
Original domain: [-1. -0.25 0.5 1.25 2. ]
Transformed domain: [ -inf -1.09861229 0. 1.09861229 inf]
/var/folders/7p/srk5qjp563l5f9mrjtp44bh800jqsw/T/ipykernel_3646/2568761612.py:3: RuntimeWarning: divide by zero encountered in log
transform = np.log(domain - lower) - np.log(upper - domain)
Code 10.14#
with pm.Model() as model:
x = pm.Uniform("x", -1., 2.)
model.vars
[x_interval__ ~ TransformedDistribution]
Code 10.15#
print(model.logp({"x_interval__":-2}),
model.logp_nojac({"x_interval__":-2}))
print(model.logp({"x_interval__":1}),
model.logp_nojac({"x_interval__":1}))
-2.2538560220859454 -1.0986122886681098
-1.6265233750364456 -1.0986122886681098
Code 10.16#
import tensorflow_probability as tfp
tfd = tfp.distributions
tfb = tfp.bijectors
lognormal0 = tfd.LogNormal(0., 1.)
lognormal1 = tfd.TransformedDistribution(tfd.Normal(0., 1.), tfb.Exp())
x = lognormal0.sample(100)
np.testing.assert_array_equal(lognormal0.log_prob(x), lognormal1.log_prob(x))
Code 10.17#
y_observed = stats.norm(0, .01).rvs(20)
with pm.Model() as model_transform:
sd = pm.HalfNormal("sd", 5)
y = pm.Normal("y", mu=0, sigma=sd, observed=y_observed)
trace_transform = pm.sample(chains=1, draws=100000)
print(model_transform.vars)
print(f"Diverging: {trace_transform.get_sampler_stats('diverging').sum()}")
/var/folders/7p/srk5qjp563l5f9mrjtp44bh800jqsw/T/ipykernel_3646/2626254216.py:6: FutureWarning: In v4.0, pm.sample will return an `arviz.InferenceData` object instead of a `MultiTrace` by default. You can pass return_inferencedata=True or return_inferencedata=False to be safe and silence this warning.
trace_transform = pm.sample(chains=1, draws=100000)
Auto-assigning NUTS sampler...
INFO:pymc3:Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
INFO:pymc3:Initializing NUTS using jitter+adapt_diag...
Sequential sampling (1 chains in 1 job)
INFO:pymc3:Sequential sampling (1 chains in 1 job)
NUTS: [sd]
INFO:pymc3:NUTS: [sd]
100.00% [101000/101000 00:33<00:00 Sampling chain 0, 0 divergences]
Sampling 1 chain for 1_000 tune and 100_000 draw iterations (1_000 + 100_000 draws total) took 34 seconds.
INFO:pymc3:Sampling 1 chain for 1_000 tune and 100_000 draw iterations (1_000 + 100_000 draws total) took 34 seconds.
Only one chain was sampled, this makes it impossible to run some convergence checks
INFO:pymc3:Only one chain was sampled, this makes it impossible to run some convergence checks
[sd_log__ ~ TransformedDistribution]
Diverging: 0
Code 10.18#
with pm.Model() as model_no_transform:
sd = pm.HalfNormal("sd", 5, transform=None)
y = pm.Normal("y", mu=0, sigma=sd, observed=y_observed)
trace_no_transform = pm.sample(chains=1, draws=100000)
print(model_no_transform.vars)
print(f"Diverging: {trace_no_transform.get_sampler_stats('diverging').sum()}")
/var/folders/7p/srk5qjp563l5f9mrjtp44bh800jqsw/T/ipykernel_3646/2939584159.py:4: FutureWarning: In v4.0, pm.sample will return an `arviz.InferenceData` object instead of a `MultiTrace` by default. You can pass return_inferencedata=True or return_inferencedata=False to be safe and silence this warning.
trace_no_transform = pm.sample(chains=1, draws=100000)
Auto-assigning NUTS sampler...
INFO:pymc3:Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
INFO:pymc3:Initializing NUTS using jitter+adapt_diag...
Sequential sampling (1 chains in 1 job)
INFO:pymc3:Sequential sampling (1 chains in 1 job)
NUTS: [sd]
INFO:pymc3:NUTS: [sd]
100.00% [101000/101000 00:33<00:00 Sampling chain 0, 17 divergences]
Sampling 1 chain for 1_000 tune and 100_000 draw iterations (1_000 + 100_000 draws total) took 34 seconds.
INFO:pymc3:Sampling 1 chain for 1_000 tune and 100_000 draw iterations (1_000 + 100_000 draws total) took 34 seconds.
There were 17 divergences after tuning. Increase `target_accept` or reparameterize.
ERROR:pymc3:There were 17 divergences after tuning. Increase `target_accept` or reparameterize.
Only one chain was sampled, this makes it impossible to run some convergence checks
INFO:pymc3:Only one chain was sampled, this makes it impossible to run some convergence checks
[sd ~ HalfNormal]
Diverging: 17
Operation Graphs and Automatic Reparameterization#
x = 3
y = 1
x * y / x + 2
3.0
Code 10.19#
theano.config.compute_test_value = 'ignore'
x = theano.tensor.vector("x")
y = theano.tensor.vector("y")
out = x*(y/x) + 0
theano.printing.debugprint(out)
Elemwise{add,no_inplace} [id A] ''
|Elemwise{mul,no_inplace} [id B] ''
| |x [id C]
| |Elemwise{true_div,no_inplace} [id D] ''
| |y [id E]
| |x [id C]
|InplaceDimShuffle{x} [id F] ''
|TensorConstant{0} [id G]
Code 10.20#
fgraph = theano.function([x,y], [out])
theano.printing.debugprint(fgraph)
DeepCopyOp [id A] 'y' 0
|y [id B]
Code 10.21#
fgraph([1],[3])
[array([3.])]
Figure 10.1 and Figure 10.2#
theano.printing.pydotprint(
out, outfile="img/chp10/symbolic_graph_unopt.png",
var_with_name_simple=False, high_contrast=False, with_ids=True)
theano.printing.pydotprint(
fgraph,
outfile="img/chp10/symbolic_graph_opt.png",
var_with_name_simple=False, high_contrast=False, with_ids=True)
The output file is available at img/chp10/symbolic_graph_unopt.png
The output file is available at img/chp10/symbolic_graph_opt.png
Code 10.22#
with pm.Model() as model_normal:
x = pm.Normal("x", 0., 1.)
theano.printing.debugprint(model_normal.logpt)
Sum{acc_dtype=float64} [id A] '__logp'
|MakeVector{dtype='float64'} [id B] ''
|Sum{acc_dtype=float64} [id C] ''
|Sum{acc_dtype=float64} [id D] '__logp_x'
|Elemwise{switch,no_inplace} [id E] ''
|Elemwise{mul,no_inplace} [id F] ''
| |TensorConstant{1} [id G]
| |Elemwise{mul,no_inplace} [id H] ''
| |TensorConstant{1} [id I]
| |Elemwise{gt,no_inplace} [id J] ''
| |TensorConstant{1.0} [id K]
| |TensorConstant{0} [id L]
|Elemwise{true_div,no_inplace} [id M] ''
| |Elemwise{add,no_inplace} [id N] ''
| | |Elemwise{mul,no_inplace} [id O] ''
| | | |Elemwise{neg,no_inplace} [id P] ''
| | | | |TensorConstant{1.0} [id Q]
| | | |Elemwise{pow,no_inplace} [id R] ''
| | | |Elemwise{sub,no_inplace} [id S] ''
| | | | |x ~ Normal [id T]
| | | | |TensorConstant{0.0} [id U]
| | | |TensorConstant{2} [id V]
| | |Elemwise{log,no_inplace} [id W] ''
| | |Elemwise{true_div,no_inplace} [id X] ''
| | |Elemwise{true_div,no_inplace} [id Y] ''
| | | |TensorConstant{1.0} [id Q]
| | | |TensorConstant{3.141592653589793} [id Z]
| | |TensorConstant{2.0} [id BA]
| |TensorConstant{2.0} [id BB]
|TensorConstant{-inf} [id BC]
Effect handling#
Code 10.23#
import jax
import numpyro
from tensorflow_probability.substrates import jax as tfp_jax
tfp_dist = tfp_jax.distributions
numpyro_dist = numpyro.distributions
root = tfp_dist.JointDistributionCoroutine.Root
def tfp_model():
x = yield root(tfp_dist.Normal(loc=1.0, scale=2.0, name="x"))
z = yield root(tfp_dist.HalfNormal(scale=1., name="z"))
y = yield tfp_dist.Normal(loc=x, scale=z, name="y")
def numpyro_model():
x = numpyro.sample("x", numpyro_dist.Normal(loc=1.0, scale=2.0))
z = numpyro.sample("z", numpyro_dist.HalfNormal(scale=1.0))
y = numpyro.sample("y", numpyro_dist.Normal(loc=x, scale=z))
try:
print(tfp_model())
except:
pass
<generator object tfp_model at 0x7fec68bafac0>
try:
print(numpyro_model())
except:
pass
Code 10.24#
sample_key = jax.random.PRNGKey(52346)
# Draw samples
jd = tfp_dist.JointDistributionCoroutine(tfp_model)
tfp_sample = jd.sample(1, seed=sample_key)
predictive = numpyro.infer.Predictive(numpyro_model, num_samples=1)
numpyro_sample = predictive(sample_key)
# Evaluate log prob
log_likelihood_tfp = jd.log_prob(tfp_sample)
log_likelihood_numpyro = numpyro.infer.util.log_density(
numpyro_model, [], {},
# Samples returning from JointDistributionCoroutine is a
# Namedtuple like Python object, we convert it to a dictionary
# so that numpyro can recognize it.
params=tfp_sample._asdict())
# Validate that we get the same log prob
np.testing.assert_allclose(log_likelihood_tfp, log_likelihood_numpyro[0], rtol=1e-6)
Code 10.25#
# Condition z to .01 in TFP and sample
jd.sample(z=.01, seed=sample_key)
StructTuple(
x=DeviceArray(0.42507368, dtype=float32),
z=DeviceArray(0.01, dtype=float32),
y=DeviceArray(0.42213476, dtype=float32)
)
# Condition z to .01 in NumPyro and sample
predictive = numpyro.infer.Predictive(
numpyro_model, num_samples=1, params={"z": np.asarray(.01)})
predictive(sample_key)
{'x': DeviceArray([1.1232405], dtype=float32),
'y': DeviceArray([1.1318897], dtype=float32),
'z': array([0.01])}
Code 10.26#
# Conditioned z to .01 in TFP and construct conditional distributions
dist, value = jd.sample_distributions(z=.01, seed=sample_key)
assert dist.y.loc == value.x
assert dist.y.scale == value.z
# Conditioned z to .01 in NumPyro and construct conditional distributions
model = numpyro.handlers.substitute(numpyro_model, data={"z": .01})
with numpyro.handlers.seed(rng_seed=sample_key):
# Under the seed context, the default behavior of a NumPyro model is the
# same as in Pyro: drawing prior sample.
model_trace = numpyro.handlers.trace(numpyro_model).get_trace()
assert model_trace["y"]["fn"].loc == model_trace["x"]["value"]
assert model_trace["y"]["fn"].scale == model_trace["z"]["value"]
Designing a PPL#
Code 10.27#
Code 10.28#
random_variable_x = stats.norm(loc=1.0, scale=2.0)
x = random_variable_x.rvs(size=2, random_state=1234)
logp = random_variable_x.logpdf(x)
Code 10.29#
x = stats.norm(loc=1.0, scale=2.0)
y = stats.norm(loc=x, scale=0.1)
y.rvs()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/var/folders/7p/srk5qjp563l5f9mrjtp44bh800jqsw/T/ipykernel_3646/2580115495.py in <module>
1 x = stats.norm(loc=1.0, scale=2.0)
2 y = stats.norm(loc=x, scale=0.1)
----> 3 y.rvs()
/opt/miniconda3/envs/bmcp/lib/python3.9/site-packages/scipy/stats/_distn_infrastructure.py in rvs(self, size, random_state)
471 kwds = self.kwds.copy()
472 kwds.update({'size': size, 'random_state': random_state})
--> 473 return self.dist.rvs(*self.args, **kwds)
474
475 def sf(self, x):
/opt/miniconda3/envs/bmcp/lib/python3.9/site-packages/scipy/stats/_distn_infrastructure.py in rvs(self, *args, **kwds)
1092 vals = self._rvs(*args, size=size, random_state=random_state)
1093
-> 1094 vals = vals * scale + loc
1095
1096 # do not forget to restore the _random_state
TypeError: unsupported operand type(s) for +: 'float' and 'rv_frozen'
Code 10.30#
class RandomVariable:
def __init__(self, distribution):
self.distribution = distribution
def __array__(self):
return np.asarray(self.distribution.rvs())
x = RandomVariable(stats.norm(loc=1.0, scale=2.0))
z = RandomVariable(stats.halfnorm(loc=0., scale=1.))
y = RandomVariable(stats.norm(loc=x, scale=z))
for i in range(5):
print(np.asarray(y))
1.1571257036684732
0.5129008281470354
-0.2750327073812393
-0.1797237136258827
-2.7978765955524114
Code 10.31#
class RandomVariable:
def __init__(self, distribution, value=None):
self.distribution = distribution
self.set_value(value)
def __repr__(self):
return f"{self.__class__.__name__}(value={self.__array__()})"
def __array__(self, dtype=None):
if self.value is None:
return np.asarray(self.distribution.rvs(), dtype=dtype)
return self.value
def set_value(self, value=None):
self.value = value
def log_prob(self, value=None):
if value is not None:
self.set_value(value)
return self.distribution.logpdf(np.array(self))
x = RandomVariable(stats.norm(loc=1.0, scale=2.0))
z = RandomVariable(stats.halfnorm(loc=0., scale=1.))
y = RandomVariable(stats.norm(loc=x, scale=z))
Code 10.32#
for i in range(3):
print(y)
print(f" Set x=5 and z=0.1")
x.set_value(np.asarray(5))
z.set_value(np.asarray(0.05))
for i in range(3):
print(y)
print(f" Reset z")
z.set_value(None)
for i in range(3):
print(y)
RandomVariable(value=2.113889264473519)
RandomVariable(value=-0.9153866580169316)
RandomVariable(value=2.7839356392110224)
Set x=5 and z=0.1
RandomVariable(value=4.971704202268103)
RandomVariable(value=4.925771989212217)
RandomVariable(value=5.024323806499401)
Reset z
RandomVariable(value=4.732558936959928)
RandomVariable(value=4.998079910235703)
RandomVariable(value=4.905656937449344)
Code 10.33#
Code 10.34#
def log_prob(xval, zval, yval=5):
x_dist = stats.norm(loc=1.0, scale=2.0)
z_dist = stats.halfnorm(loc=0., scale=1.)
y_dist = stats.norm(loc=xval, scale=zval)
return x_dist.logpdf(xval) + z_dist.logpdf(zval) + y_dist.logpdf(yval)
log_prob(0, 1)
-15.881815599614018
Code 10.35#
def prior_sample():
x = stats.norm(loc=1.0, scale=2.0).rvs()
z = stats.halfnorm(loc=0., scale=1.).rvs()
y = stats.norm(loc=x, scale=z).rvs()
return x, z, y
prior_sample()
(-0.6274777874169912, 0.9358428814217807, 0.029987051428045586)
Shape handling#
Code 10.36#
def prior_sample(size):
x = stats.norm(loc=1.0, scale=2.0).rvs(size=size)
z = stats.halfnorm(loc=0., scale=1.).rvs(size=size)
y = stats.norm(loc=x, scale=z).rvs()
return x, z, y
print([x.shape for x in prior_sample(size=(2))])
print([x.shape for x in prior_sample(size=(2, 3, 5))])
[(2,), (2,), (2,)]
[(2, 3, 5), (2, 3, 5), (2, 3, 5)]
Code 10.37#
n_row, n_feature = 1000, 5
X = np.random.randn(n_row, n_feature)
def lm_prior_sample0():
intercept = stats.norm(loc=0, scale=10.0).rvs()
beta = stats.norm(loc=np.zeros(n_feature), scale=10.0).rvs()
sigma = stats.halfnorm(loc=0., scale=1.).rvs()
y_hat = X @ beta + intercept
y = stats.norm(loc=y_hat, scale=sigma).rvs()
return intercept, beta, sigma, y
def lm_prior_sample(size=10):
if isinstance(size, int):
size = (size,)
else:
size = tuple(size)
intercept = stats.norm(loc=0, scale=10.0).rvs(size=size)
beta = stats.norm(loc=np.zeros(n_feature), scale=10.0).rvs(
size=size + (n_feature,))
sigma = stats.halfnorm(loc=0., scale=1.).rvs(size=size)
y_hat = np.squeeze(X @ beta[..., None]) + intercept[..., None]
y = stats.norm(loc=y_hat, scale=sigma[..., None]).rvs()
return intercept, beta, sigma, y
print([x.shape for x in lm_prior_sample0()])
[(), (5,), (), (1000,)]
print([x.shape for x in lm_prior_sample(size=())])
print([x.shape for x in lm_prior_sample(size=10)])
print([x.shape for x in lm_prior_sample(size=[10, 3])])
[(), (5,), (), (1000,)]
[(10,), (10, 5), (10,), (10, 1000)]
[(10, 3), (10, 3, 5), (10, 3), (10, 3, 1000)]
# def lm_prior_sample2(size=10):
# intercept = stats.norm(loc=0, scale=10.0).rvs(size=size)
# beta = stats.multivariate_normal(
# mean=np.zeros(n_feature), cov=10.0).rvs(size=size)
# sigma = stats.halfnorm(loc=0., scale=1.).rvs(size=size)
# y_hat = np.einsum('ij,...j->...i', X, beta) + intercept[..., None]
# y = stats.norm(loc=y_hat, scale=sigma[..., None]).rvs()
# return intercept, beta, sigma, y
# print([x.shape for x in lm_prior_sample2(size=())])
# print([x.shape for x in lm_prior_sample2(size=10)])
# print([x.shape for x in lm_prior_sample2(size=(10, 3))])
Code 10.38#
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
jd = tfd.JointDistributionSequential([
tfd.Normal(0, 10),
tfd.Sample(tfd.Normal(0, 10), n_feature),
tfd.HalfNormal(1),
lambda sigma, beta, intercept: tfd.Independent(
tfd.Normal(
loc=tf.einsum("ij,...j->...i", X, beta) + intercept[..., None],
scale=sigma[..., None]),
reinterpreted_batch_ndims=1,
name="y")
])
print(jd)
n_sample = [3, 2]
for log_prob_part in jd.log_prob_parts(jd.sample(n_sample)):
assert log_prob_part.shape == n_sample
tfp.distributions.JointDistributionSequential("JointDistributionSequential", batch_shape=[[], [], [], []], event_shape=[[], [5], [], [1000]], dtype=[float32, float32, float32, float32])