Homework 05: Kernel Density Estimation#
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import numpy as np
import pandas as pd
import os.path
import subprocess
import scipy.stats
from sklearn import neighbors, cluster
Helpers for Getting, Loading and Locating Data
def wget_data(url: str):
local_path = './tmp_data'
p = subprocess.Popen(["wget", "-nc", "-P", local_path, url], stderr=subprocess.PIPE, encoding='UTF-8')
rc = None
while rc is None:
line = p.stderr.readline().strip('\n')
if len(line) > 0:
print(line)
rc = p.poll()
def locate_data(name, check_exists=True):
local_path='./tmp_data'
path = os.path.join(local_path, name)
if check_exists and not os.path.exists(path):
raise RuxntimeError('No such data file: {}'.format(path))
return path
wget_data('https://raw.githubusercontent.com/illinois-dap/DataAnalysisForPhysicists/main/data/blobs_data.hf5')
blobs_data = pd.read_hdf(locate_data('blobs_data.hf5'))
Problem 1#
A density estimator should provide a probability density function \(P(\vec{x})\) that is normalized over its feature space \(\vec{x}\) $\( \int d\vec{x}\, P(\vec{x}) = 1 \; . \)$ In this problem you will verify this normalization for KDE using two different numerical approaches for the integral.
First, implement the function below to accept a 1D KDE fit object and estimate its normalization integral using the trapezoid rule with the specified grid. Hint: the np.trapz
function will be useful.
def check_grid_normalization(fit, xlo, xhi, ngrid):
"""Check 1D denstity estimator fit result normlization using grid quadrature.
Parameters
----------
fit : neighbors.KernelDensity fit object
Result of fit to 1D dataset.
xlo : float
Low edge of 1D integration range.
xhi : float
High edge of 1D integration range.
ngrid : int
Number of equally spaced grid points covering [xlo, xhi],
including both end points.
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x1', 'x2']).values)
assert np.round(check_grid_normalization(fit, 0, 15, 5), 3) == 1.351
assert np.round(check_grid_normalization(fit, 0, 15, 10), 3) == 1.019
assert np.round(check_grid_normalization(fit, 0, 15, 20), 3) == 0.986
assert np.round(check_grid_normalization(fit, 0, 15, 100), 3) == 1.000
fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x0', 'x2']).values)
assert np.round(check_grid_normalization(fit, -4, 12, 5), 3) == 1.108
assert np.round(check_grid_normalization(fit, -4, 12, 10), 3) == 0.993
assert np.round(check_grid_normalization(fit, -4, 12, 20), 3) == 0.971
assert np.round(check_grid_normalization(fit, -4, 12, 100), 3) == 1.000
fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x0', 'x1']).values)
assert np.round(check_grid_normalization(fit, 2, 18, 5), 3) == 1.311
assert np.round(check_grid_normalization(fit, 2, 18, 10), 3) == 0.954
assert np.round(check_grid_normalization(fit, 2, 18, 20), 3) == 1.028
assert np.round(check_grid_normalization(fit, 2, 18, 100), 3) == 1.000
Next, implement the function below to estimate a multidimensional fit normalization using Monte Carlo integration: $\( \int d\vec{x}\, P(\vec{x}) \simeq \frac{V}{N_{mc}}\, \sum_{j=1}^{N_{mc}} P(\vec{x}_j) = V \langle P\rangle \; , \)\( where the \)\vec{x}_j\( are uniformly distributed over the integration domain and \)V\( is the integration domain volume. Note that `trapz` gives more accurate results for a fixed number of \)P(\vec{x})$ evaluations, but MC integration is much easier to generalize to higher dimensions.
def check_mc_normalization(fit, xlo, xhi, nmc, seed=123):
"""Check denstity estimator fit result normlization using MC integration.
Parameters
----------
fit : neighbors.KernelDensity fit object
Result of fit to arbitrary dataset of dimension D.
xlo : array
1D array of length D with low limits of integration domain along each dimension.
xhi : array
1D array of length D with high limits of integration domain along each dimension.
nmc : int
Number of random MC integration points within the domain to use.
"""
xlo = np.asarray(xlo)
xhi = np.asarray(xhi)
assert xlo.shape == xhi.shape
assert np.all(xhi > xlo)
D = len(xlo)
gen = np.random.RandomState(seed=seed)
# Use gen.uniform() in your solution, not gen.rand(), for consistent random numbers.
# YOUR CODE HERE
raise NotImplementedError()
##### A correct solution should pass these tests.
fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x1', 'x2']).values)
assert np.round(check_mc_normalization(fit, [0], [15], 10), 3) == 1.129
assert np.round(check_mc_normalization(fit, [0], [15], 100), 3) == 1.022
assert np.round(check_mc_normalization(fit, [0], [15], 1000), 3) == 1.010
assert np.round(check_mc_normalization(fit, [0], [15], 10000), 3) == 0.999
fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x2']).values)
assert np.round(check_mc_normalization(fit, [0, -4], [15, 12], 10), 3) == 1.754
assert np.round(check_mc_normalization(fit, [0, -4], [15, 12], 100), 3) == 1.393
assert np.round(check_mc_normalization(fit, [0, -4], [15, 12], 1000), 3) == 0.924
assert np.round(check_mc_normalization(fit, [0, -4], [15, 12], 10000), 3) == 1.019
fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.values)
assert np.round(check_mc_normalization(fit, [0, -4, 2], [15, 12, 18], 10), 3) == 2.797
assert np.round(check_mc_normalization(fit, [0, -4, 2], [15, 12, 18], 100), 3) == 0.613
assert np.round(check_mc_normalization(fit, [0, -4, 2], [15, 12, 18], 1000), 3) == 1.316
assert np.round(check_mc_normalization(fit, [0, -4, 2], [15, 12, 18], 10000), 3) == 1.139