Homework 03: Important Probability Distributions#

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import numpy as np
import pandas as pd

Problem 1#

Consider a sequence of \(n\) Bernoulli trials with success probabilty \(p\) per trial. A string of consecutive successes is known as a success run. Write a function that returns the counts for runs of length \(k\) for each \(k\) observed in a python dictionary.

For example: if the trials were [0, 1, 0, 1, 1, 0, 0, 0, 0, 1], the function should return {1: 2, 2: 1}

from collections import Counter

def count_runs(xs):
    """Count number of success runs of length k.

    Parameters
    ----------
    xs : array of shape (1, nx)
        Vector of Bernoulli trials

    Returns
    -------
    dict
        Returns a dictionary the counts (val) for runs of length k (key) for each k observed
    """

    # YOUR CODE HERE
    raise NotImplementedError()
cnt = count_runs([0, 1, 0, 1, 1, 0, 0, 0, 0, 1],)
assert [cnt[1],cnt[2]] == [2,1]

cnt = count_runs([0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1],)
assert [cnt[1],cnt[2],cnt[3]] == [1,1,1]

rng = np.random.default_rng(seed=1234)
cnt = count_runs(rng.integers(2,size=1000))
assert [cnt[1],cnt[2],cnt[3],cnt[4],cnt[5],cnt[6],cnt[7],cnt[8],cnt[9]] == [127,60,26,24,6,4,1,1,1]

Using count_runs above, write a method run_prob that returns the probability of observing at least one run of length k or more from n trials with success probabilty p per trial. This probability is estimated from expts simulated experinents.

def run_prob(expts, n, k, p, seed=1234):
    """Calculate the probability of observing at least one run of length `k` or more from `n` trials with success probabilty `p` per trial. This probability is estimated from `expts` simulated experinents.

    Parameters
    ----------
    expts : int
        Number of simulated experiments
    k: int
        Number of consecutive successes defining a success run
    n: int
        Number of trials per experiment
    p: float
        Success probability per trial
    seed : int
        Random number seed to use.

    Returns
    -------
    float
        Returns the probability of observing at least one run of length `k` or more
    """

    # Initialize random generator. Use this generator in your code below
    rng = np.random.default_rng(seed=seed)

    # YOUR CODE HERE
    raise NotImplementedError()

Determine the probability of observing at least one run of length \(k\)=5 or more when \(n\)=100 and \(p\)=0.5. Estimate this probability from 100,000 simulated experiments.

prob = run_prob(expts=100000, n=100, k=5, p=0.5)
print (np.round(prob*100,1),'%')
assert np.allclose(prob, 0.81044, rtol=0.001, atol=0.1)

Determine the probability of observing at least one run of length \(k\)=7 or more when \(n\)=100 and \(p\)=0.7. Estimate this probability from 100,000 simulated experiments.

prob = run_prob(expts=100000, n=100, k=7, p=0.7)
print (np.round(prob*100,1),'%')
assert np.allclose(prob, 0.9489, rtol=0.001, atol=0.1)

Problem 2#

The normal (aka Gaussian) distribution is one of the fundamental probability densities that we will encounter often.

Implement the function below using np.random.multivariate_normal to generate random samples from an arbitrary multidimensional normal distribution, for a specified random seed:

def generate_normal(mu, C, n, seed=123):
    """Generate random samples from a normal distribution.

    Parameters
    ----------
    mu : array
        1D array of mean values of length N.
    C : array
        2D array of covariances of shape (N, N). Must be a positive-definite matrix.
    n : int
        Number of random samples to generate.
    seed : int
        Random number seed to use.

    Returns
    -------
    array
        2D array of shape (n, N) where each row is a random N-dimensional sample.
    """
    assert len(mu.shape) == 1, 'mu must be 1D.'
    assert C.shape == (len(mu), len(mu)), 'C must be N x N.'
    assert np.all(np.linalg.eigvals(C) > 0), 'C must be positive definite.'
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
mu = np.array([-1., 0., +1.])
C = np.identity(3)
C[0, 1] = C[1, 0] = -0.9
Xa = generate_normal(mu, C, n=500, seed=1)
Xb = generate_normal(mu, C, n=500, seed=1)
Xc = generate_normal(mu, C, n=500, seed=2)
assert np.array_equal(Xa, Xb)
assert not np.array_equal(Xb, Xc)
X = generate_normal(mu, C, n=2000, seed=3)
assert np.allclose(np.mean(X, axis=0), mu, rtol=0.001, atol=0.1)
assert np.allclose(np.cov(X, rowvar=False), C, rtol=0.001, atol=0.1)

Visualize a generated 3D dataset using:

def visualize_3d():
    mu = np.array([-1., 0., +1.])
    C = np.identity(3)
    C[0, 1] = C[1, 0] = -0.9
    X = generate_normal(mu, C, n=2000, seed=3)
    df = pd.DataFrame(X, columns=('x0', 'x1', 'x2'))
    sns.pairplot(df)
visualize_3d()

Read about correlation and covariance, then implement the function below to create a 2x2 covariance matrix given values of \(\sigma_x\), \(\sigma_y\) and the correlation coefficient \(\rho\):

def create_2d_covariance(sigma_x, sigma_y, rho):
    """Create and return the 2x2 covariance matrix specified by the input args.
    """
    assert (sigma_x > 0) and (sigma_y > 0), 'sigmas must be > 0.'
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
assert np.array_equal(create_2d_covariance(1., 1.,  0.0), [[1.,  0.], [ 0., 1.]])
assert np.array_equal(create_2d_covariance(2., 1.,  0.0), [[4.,  0.], [ 0., 1.]])
assert np.array_equal(create_2d_covariance(2., 1.,  0.5), [[4.,  1.], [ 1., 1.]])
assert np.array_equal(create_2d_covariance(2., 1., -0.5), [[4., -1.], [-1., 1.]])

Run the following cell that uses your create_2d_covariance and generate_normal functions to compare the 2D normal distributions with \(\rho = 0\) (blue), \(\rho = +0.9\) (red) and \(\rho = -0.9\) (green):

def compare_rhos():
    mu = np.zeros(2)
    sigma_x, sigma_y = 2., 1.
    for rho, cmap in zip((0., +0.9, -0.9), ('Blues', 'Reds', 'Greens')):
        C = create_2d_covariance(sigma_x, sigma_y, rho)
        X = generate_normal(mu, C, 10000)
        sns.kdeplot(x=X[:, 0], y=X[:, 1], cmap=cmap)
    plt.xlim(-4, +4)
    plt.ylim(-2, +2)

compare_rhos()

Problem 3#

We showed that the conditional probability density of a multidimensional feature space can be calculated as: $\( f_{X|Y}(\vec{x}\mid \vec{y}) = \frac{f(\vec{x}, \vec{y})}{f(\vec{y})} \; , \)\( where features \)\vec{x}\( are conditioned on the values of features \)\vec{y}$.

For the ubiquitous multivariate normal probability density, we can split the joint mean \(\vec{\mu}_{X,Y}\) into separate means for the \(X\) and \(Y\) features, $\( \vec{\mu}_{X,Y} = \begin{bmatrix}\vec{\mu}_X \\ \vec{\mu}_Y\end{bmatrix} \)\( and, similarly, for the joint covariance: \)\( C_{X,Y} = \begin{bmatrix} C_{XX} & C_{XY} \\ C_{XY} & C_{YY} \end{bmatrix} \; , \)\( where \)C_{XX}\( is the submatrix of covariances for the \)X$ features, etc.

We can then explicitly calculate the resulting marginal mean: $\( \vec{\mu}_{X|Y} \equiv \langle \vec{x}\rangle_{X|Y} = \vec{\mu}_X + C_{XY} C_{YY}^{-1} \left(\vec{y} - \vec{\mu}_Y\right) \; , \)\( and covariance \)\( C_{X|Y} \equiv \langle \left( \vec{x} - \vec{\mu}_{X|Y}\right) \left( \vec{x} - \vec{\mu}_{X|Y}\right)^T \rangle_{X|Y} = C_{XX} - C_{XY} C_{YY}^{-1} C_{XY}^T \; . \)$

(see this worked out in detail here)

Note that \(\vec{\mu}_{X|Y}\) depends on the conditioned feature values \(\vec{y}\), but \(C_{X|Y}\) does not. These Gaussian conditional probability densities are central to the Factor Analysis (FA) and Gaussian Process (GP) methods.

Implement the function below to calculate these expressions:

def gauss_conditional_predicted(y0, muX, muY, CXX, CXY, CYY):
    """Predicted conditional Gaussian means and covariances.

    Parameters
    ----------
    y0 : array of shape (ny,)
        Fixed y values used for conditioning.
    muX : array of shape (nx,)
        Mean value of X features.
    muY : array of shape (ny,)
        Mean value of Y features.
    CXX : array of shape (nx, nx)
        Covariances between X features.
    CXY : array of shape (nx, ny)
        Covariances between X and Y features.
    CYY : array of shape (ny, ny)
        Covariances between Y features.

    Returns
    -------
    tuple
        Tuple (mu, C) of arrays with shapes (nx,) and (nx,nx), respectively,
        giving the means and covariances of X features conditioned on Y=y0.
    """
    nx = len(muX)
    ny = len(muY)
    assert y0.shape == (ny,)
    assert CXX.shape == (nx, nx)
    assert CXY.shape == (nx, ny)
    assert CYY.shape == (ny, ny)
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
nx, ny = 2, 3
y0, muX, muY = np.ones(ny), np.zeros(nx), np.zeros(ny)
CXX, CXY, CYY = np.identity(nx), np.zeros((nx, ny)), np.identity(ny)
mu, C = gauss_conditional_predicted(y0, muX, muY, CXX, CXY, CYY)
assert np.allclose(mu, [0, 0])
assert np.allclose(C, [[1, 0], [0, 1]])

y0, muX, muY = np.array([1, 2]), np.array([3, 2, 1]), np.array([0, 0])
CXX, CXY, CYY = np.array([[2, 0, 1], [0, 1, 0], [1, 0, 1]]), np.zeros((3, 2)), np.array([[1, 1], [1, 2]])
mu, C = gauss_conditional_predicted(y0, muX, muY, CXX, CXY, CYY)
assert np.allclose(mu, [3, 2, 1])
assert np.allclose(C, [[ 2, 0, 1],  [0, 1, 0], [1, 0, 1]])

Implement the function below to calculate empirical estimates of the conditional Gaussian means and covariances. Since the requirement \(Y = y_0\) will likely not select any samples from a finite dataset, we relax this condition to:

\[ \left| \vec{y} - \vec{y}_0\right|^2 < \epsilon^2 \; , \]

and apply np.mean and np.cov to the resulting subset of samples. Hint: pay attention to the rowvar parameter of np.cov.

def gauss_conditional_measured(X, Y, y0, eps=1.5):
    """Measured conditional Gaussian means and covariances.

    Parameters
    ----------
    X : array of shape (N, nx)
        X feature values for dataset with N samples.
    Y : array of shape (N, ny)
        Y feature values for dataset with N samples.
    y0 : array of shape (ny,)
        Fixed y values used for conditioning.
    eps : float
        Tolerance for selecting samples with Y ~ y0.

    Returns
    -------
    tuple
        Tuple (mu, C) of arrays with shapes (nx,) and (nx,nx), respectively,
        giving the means and covariances of X features conditioned on Y=y0.
    """
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
# Repeat the first test above, but numerically this time.
gen = np.random.RandomState(seed=123)
nx, ny = 2, 3
y, muX, muY = np.ones(ny), np.zeros(nx), np.zeros(ny)
CXX, CXY, CYY = np.identity(nx), np.zeros((nx, ny)), np.identity(ny)
mu = np.hstack([muX, muY])
C = np.block([[CXX, CXY], [CXY.T, CYY]])
XY = gen.multivariate_normal(mu, C, size=1000000)
mu, C = gauss_conditional_measured(XY[:, :nx], XY[:, nx:], y)
assert np.allclose(np.round(mu, 2), [0, 0])
assert np.allclose(np.round(C, 2), [[1, 0], [0, 1]])

# Repeat the second test above, but numerically this time.
y, muX, muY = np.array([1, 2]), np.array([3, 2, 1]), np.array([0, 0])
CXX, CXY, CYY = np.array([[2, 0, 1], [0, 1, 0], [1, 0, 1]]), np.zeros((3, 2)), np.array([[1, 1], [1, 2]])
mu = np.hstack([muX, muY])
C = np.block([[CXX, CXY], [CXY.T, CYY]])
XY = gen.multivariate_normal(mu, C, size=1000000)
mu, C = gauss_conditional_measured(XY[:, :3], XY[:, 3:], y)
assert np.allclose(np.round(mu, 2), [3, 2, 1])
assert np.allclose(np.round(C, 2), [[ 2, 0, 1],  [0, 1, 0], [1, 0, 1]])