CS6601 Assignment 5 submission

CS6601 Assignment 5 submission

Assignment 5 – Expectation Maximization¶

Automatic image processing is a key component to many AI systems, including facial recognition and video compression, instance segmentation of images and point cloud data. One basic method for processing is segmentation, by which we divide an image into a fixed number of components in order to simplify its representation. For example, we can train a mixture of Gaussians to represent an image, and segment it according to the simplified representation as shown in the images below.

Or we could perform a clustering of point cloud in order to separate different objects, backgrounds etc, as shown in the image below

In this assignment, you will learn to perform image compression and point cloud segmentation. To this end, you will implement Gaussian mixture models and iteratively improve their performance. First you will perform segmentation on the “Starry” (Starry.png) and at the end run your algorithm on 3D point cloud data.

To begin, you will implement several methods of image segmentation, with increasing complexity:

Implement k-means clustering to segment a color image.

Familiarize yourself with the algorithm by running it on simple dataset.

Build a Gaussian mixture model to be trained with expectation-maximization.

Experiment with varying the details of the Gaussian mixture model’s implementation.

Implement and test a new metric called the Bayesian information criterion, which guarantees a more robust image segmentation.

Part 0: NumPy¶
The concept of Vectorization was introduced in the last section of Assignment 4. For this assignment, please vectorize your code wherever possible using numpy arrays, instead of running for-loops over the images being processed.

For example of how this might be useful, consider the following array:
A = [12 34 1234 764 …(has a million values)… 91, 78]

Now you need to calculate another array B, which has the same dimensions as A above. Say each value in B is calculated as follows:
(each value in B) = square_root_of(some constants pi log(k) * (each value in A))/7

You might wish to use a for-loop to compute this. However, it will take really long to run on an array of this magnitude.
Alternatively, you may choose to use numpy and perform this calculation in a single line. You can pass A as a numpy array and the entire calculation will be done in a line, resulting in B being populated with the corresponding values that come out of this formula.

Check out Basic Operation section of the Numpy Tutorial if you are not familiar with numpy vector/matrix operations: https://docs.scipy.org/doc/numpy/user/quickstart.html#basic-operations. Please do check the resources linked in the assignment github readme.

Please note that numpy.vectorize DOES NOT perform vectorization, it only does a loop.¶

Let’s look at a few examples below.

Element wise multiply¶
Use np.multiply or operator * to perform element wise multiplication.

import numpy as np
x = np.array([[1,2,3,4],
[5,6,7,8]])
y = np.array([[1,2,3,4],
[5,6,7,8]])
output_multiply = np.multiply(x, y)
output_op = x * y
print(f’using np.multiply() \n {output_multiply}’)
print(f’using * \n {output_op}’)

# in this case we can also do np.power, which is an element wise function that raise each entry of the array to the power of given number. Note a and b defined above are the same.͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
output_power = np.power(x, 2)
print(f’using np.power \n {output_power}’)

Inner product¶
Use np.dot() or self.dot() to perform inner product between vectors. There are some nauance between 1d vector and higher dimension arrays.

p = np.array([1,2,3,4])
q = np.array([5,6,7,8])
output_dot = np.dot(p, q)
output_sdot = p.dot(q)
print(f’using np.dot() \n {output_dot}’)
print(f’using self.dot() \n {output_sdot}’)

m = np.array([[1,2,3,4]])
n = np.array([[5,6,7,8]])
output_ndot = np.dot(m, n.T)
output_nsdot = m.dot(n.T)
print(f’using np.dot() \n {output_ndot}’)
print(f’using self.dot() \n {output_nsdot}’)

Technically, in the above example, a and b are 1 x 4 matrices. So we have to perform a transpose on b to align the dimension for matrix multiplication. More examples on Matrix multiplication are in the following.

Matrix multiplication¶
There are many ways to do matrix multiplication in Numpy. Notebly you can do np.dot(), self.dot(), operator np.matmul, or einsum.

x = np.array([[1,2,3,4],
[5,6,7,8]])
y = np.array([[1,2,3,4],
[5,6,7,8]])
output_ndot = np.dot(x, y.T)
output_nsdot = x.dot(y.T)
output_nop = x @ y.T
output_matmul = np.matmul(x, y.T)
output_esum = np.einsum(‘ij, jk-> ik’, x, y.T)
print(f’using np.dot() \n {output_ndot}’)
print(f’using self.dot() \n {output_nsdot}’)
print(f’using operator @ \n {output_nop}’)
print(f’using np.matmul \n {output_matmul}’)
print(f’using np.einsum \n {output_esum}’)

Note einsum can do a lot more. For full documentation, see https://numpy.org/doc/stable/reference/generated/numpy.einsum.html. Some good examples are listed in https://stackoverflow.com/questions/26089893/understanding-numpys-einsum, https://rockt.github.io/2018/04/30/einsum.

# a and b are defined same as before͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
m, n = 1000, 500

x = np.random.rand(m, n)
y = np.random.rand(m, n)
# trace of the matrix multiplication͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
output_trace_fast = np.einsum(‘ij, ji->’, x, y.T)
print(f’Trace of the product of x and y: {output_trace_fast}’)
# compare time spent on computing͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# trace of the matrix multiplication͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# vanilla͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
%timeit np.trace(x.dot(y.T))
# einsum͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
%timeit output_trace_fast = np.einsum(‘ij, ji->’, x, y.T)

Additional useful NumPy tricks and functions¶

# axis wise͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
x = np.random.rand(50, 50, 50)
y = x[0, :, :]

# reverse order͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
x = np.arange(10)
# general rule: start:end:step, a negative step means reverse͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
print(x[::-1])

# conditional indexing and np.where͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
x = np.arange(10)
print(x==1)
print(np.where(x==1))
# replace the entry where x has value 1, with a new value of 11͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# note the second and third arguments are for values where the condition fails !(x!=1) = x==1͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
print(np.where(x!=1, x, 11))

A list of useful functions and operators¶
np.concatenate() – let you put together two arrays along certain axis
np.zeros(), np.ones() – create array of zeros and ones with specified dimension
x[start:end] – slicing an array from ‘start’ position to ‘end’ position, can be used for different axises
np.diagonal() – create a matrix with diagonal element specified (a 1-D array). Will return diagonal elements when passing high dimensional arraies.
np.reshape() – reshape the dimension of your array

Part 1: K-means Clustering (19 pts)¶
One easy method for image segmentation is to simply cluster all similar data points together and then replace their values with the mean value. Thus, we’ll warm up using k-means clustering. This will also provide a baseline to compare with your segmentation. Please note that clustering will come in handy later.

Fill out get_initial_means(), k_means_step() functions below.

In get_initial_means(), you should choose k random points from the data (without replacement) to use as initial cluster means.

Your code will be unit tested automatically when you run the cell (Cell > Run Cells OR Shift + Enter).

Try to vectorize the code for it to run faster. Without vectorization it takes 25-30 minutes for the code to run.¶

%load_ext autoreload
%autoreload 2
# Run this cell and check if you have all necessary modules͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
from ipywidgets import *
import mixture_tests as tests
import matplotlib.pyplot as plt
import matplotlib.patches as pat
from scipy.stats import norm
import numpy as np
from helper_functions import *
# Please don’t modify this cell͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

import numpy as np
from helper_functions import *

def get_initial_means(array, k):
Picks k random points from the 2D array
(without replacement) to use as initial
cluster means

array = numpy.ndarray[numpy.ndarray[float]] – m x n | datapoints x features

initial_means = numpy.ndarray[numpy.ndarray[float]]
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.K_means_test().test_initial_means(get_initial_means)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

def k_means_step(X, k, means):
A single update/step of the K-means algorithm
Based on a input X and current mean estimate,
predict clusters for each of the pixels and
calculate new means.
X = numpy.ndarray[numpy.ndarray[float]] – m x n | pixels x features (already flattened)
means = numpy.ndarray[numpy.ndarray[float]] – k x n

(new_means, clusters)
new_means = numpy.ndarray[numpy.ndarray[float]] – k x n
clusters = numpy.ndarray[int] – m sized vector
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.K_means_test().test_k_means_step(k_means_step)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

K-means – Visualizing the results¶
Now that you are done with the K-means step implementation lets try to visualize what’s happening if you repeat these steps multiple times.

You don’t need to be implementing anything in the next cells until Image Segmentation Section, but you are highly encouraged to play with parameters and datasets, to get a sense of what is happening at every algorithm iteration step.

Feel free to explore and improve the function below, it will be used for visualizing K-means progress
but it’s not required and WON’T effect your grade.

# This cell contains a code for loading a dataset from the `data` folder͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# Each of these datasets contains synthtic (generated) data͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# You can simply run this cell for now and come back to it later if you want to make changes͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# Make sure you implemented everything in cells above and passed the unittests͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
def K_means_2D_dataset(dataset_index, K):
# Load the dataset from data folder͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
X = np.loadtxt(“data/%d_dataset_X.csv” % dataset_index, delimiter=”,”)
print(“The dataset is of a size:”, X.shape)

# Load the labels͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# Clustering is unsupervised method, where no labels are provided͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# However, since we generated the data outselves we know the clusters,͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# and load them for illustration purposes.͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
y = np.int16(np.loadtxt(“data/%d_dataset_y.csv” % dataset_index, delimiter=”,”))

# Feel free to edit the termination condition for the K-means algorithm͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# Currently is just runs for n_iterations, before terminating͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
n_iterations = 10
m,n = X.shape
means = get_initial_means(X,K)
clusters = np.zeros([n])
# keeping track of how clusters and means changed, for visualization purposes͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
means_history = [means]
clusters_history = [clusters]
for iteration_i in range(n_iterations):
means, clusters = k_means_step(X, K, means)
clusters_history.append(clusters)

return X, y, means_history, clusters_history

# Things to try:͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# – Try different initialization to see check initialization robustness͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# – Improve the termination condition (you will be able to reuse later as well!)͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# – Try creating you own dataset in the `data/` folder͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

# RUN – TRY DIFFERENT PARAMETERS – REPEAT͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
dataset_index = 2 # for different dataset change it to number from [0,4]
K = 5 # Number of clusters – play with this number

X, y, means_history, clusters_history = K_means_2D_dataset(dataset_index, K)

# This is an interactive cell to see the progress of training your K-means algorithm.͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# Feel free to improve the visualization code and share it with your classmates on Piazza͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
def get_cluster(i):
clusters = clusters_history[i] # Get the clusters from K-means’ i-th iteration
plt.figure(None, figsize=(15,6)) # Set the plot size
plt.suptitle(‘Drag the slider to see the algorthm training progress’)
ax1=plt.subplot(1, 2, 1)
ax1.set_title(‘K-means clusters – step %d’ % i)
for k in range(K):
plt.plot(X[clusters==k,0], X[clusters==k,1], ‘.’) #
# Try to plot the centers of the clusters͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# You can access them by calling means_history[i]͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# How could you plot the area that belong to that cluster?͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

# Just to get a flavour of how the data looks like͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
ax2=plt.subplot(1, 2, 2)
ax2.set_title(‘Ground truth clusters’)
for i in np.unique(y):
ax2.plot(X[y==i,0],X[y==i,1],’.’)

plt.show()

interactive(get_cluster, i=(1,len(clusters_history)-1,1))

Image segmentation¶
2D data clustering is all cool and all but now it’s time to use K-means for the image compression!

Fill in the k_means_segment() function below, you will find your k_means_step() and get_initial_means() very handy here.

You will separate the provided RGB values into k clusters using the k-means algorithm, then return an updated version of the image with the original values replaced with the corresponding cluster center values.

Your convergence test should be whether the assigned clusters stop changing. Note that this convergence test is rather slow. When no initial cluster means (initial_means) are provided, you need to initialize them yourself, based on the given k.

For this part of the assignment, since clustering is best used on multidimensional data, we will be using the color image Starry.png.

Please pay close attention to the dimensions of the data. In the k_means_step() you were working with m x n data, here the input is an image (image_values) which has a shape of rows x columns x color_channels.

The function should return an updated version of the image with the original values replaced with the corresponding cluster values.

def k_means_segment(image_values, k=3, initial_means=None):
Separate the provided RGB values into
k separate clusters using the k-means algorithm,
then return an updated version of the image
with the original values replaced with
the corresponding cluster values.

image_values = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] – r x c x ch
initial_means = numpy.ndarray[numpy.ndarray[float]] or None

updated_image_values = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] – r x c x ch
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.K_means_test().test_k_means(k_means_segment)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

Visulizing K-means segmentation results¶

k=5 # number of clusters – feel free to play with it

image_values = image_to_matrix(‘images/Starry.png’)
# Play with the K value below to see the effect number of clusters have͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
new_image = k_means_segment(image_values, k=k)

plt.figure(None,figsize=(9,12))
plt.imshow(new_image)
plt.show()

You can reuse the K-means visualization code from previous section to show the training progress on the image for different iterations and even numbers of clusters.

Part 2: Implementing a Multivariate Gaussian Mixture Model (48 pts)¶
Next, we will step beyond clustering and implement a complete Gaussian mixture model.

But, before you dive into the code, you are highly encouraged to go over read/gaussians.pdf file before you start, to familiarize yourself with multivariate case of the Gaussian distribution.

In addition to that, there is a great ~17 minute video where Alexander Ihler goes over nuts and bolds of the multivariate EM algorithm details on Youtube:

Another resource you can refer to is the read/em.pdf document attached, which is a chapter from Pattern Recognition and Machine Learning book by Christopher M. Bishop.

Now, it’s time to complete the implementation of the functions below what will later assemble into a Multivariate Gaussian Expectation Maximization algorithm:

Calculate the probability of a given data point (e.g. rgb value of a pixel) of belonging to a specific Gaussian component. (7 points)

Use expectation-maximization (EM) to train the model to represent the image as a mixture of Gaussians. (20 points)

To initialize EM, set each component’s mean to the means value of randomly chosen pixels (same as for K-means) and calculate covariances based on the selected means, and set the mixing coefficients to a uniform distribution.

We’ve set the convergence condition for you in default_convergence() (see helper_functions.py file): if the new likelihood is within 10% of the previous likelihood for 10 consecutive iterations, the model has converged.

Note: there are packages that can run EM automagically, but you have to implement your own version of EM without using these extra packages. It also means that you are not allowed to look into any implementations of the algorithms, e.g scikit-learn and many others. NumPy is your only tool here.

Calculate the log likelihood of the trained model. (7 points)
Segment the image according to the trained model. (7 points)
Determine the best segmentation by iterating over model training and scoring, since EM isn’t guaranteed to converge to the global maximum. (7 points)

It’d be helpful to implement the above functions in the following order –

initialize_parameters
likelihood
train_model
best_segment

Warning: You may lose all marks for this part if your code runs for too long.¶
You will need to vectorize your code in this part. Specifically, the method E_step() and M_step() which make up the train_model(), perform operations using numpy arrays. These are time-sensitive functions and will be called over and over as you proceed with this assignment.

For the synthetic data test which we provide to check if your training is working, the set is too small and it won’t make a difference. But with the actual image that we use ahead, for-loops won’t do good. Vectorized code would take under 30 seconds to converge which would typically involve about 15-20 iterations with the convergence function we have here. Inefficient code that uses loops or iterates over each pixel value sequentially, will take hours to run. You don’t want to do that.

The following cell (compute_sigma) will not be graded, but we highly recommend using this function and paired test to make sure your covariance matrix implementation is correct. Computing the covariance matrix incorrectly can result in problems that become extremely hard to debug later in the assignment so please take advantage of this section.

Make sure to put #export (first line in this cell) only
if you call/use this function elsewhere in the code
def compute_sigma(X, MU):
Calculate covariance matrix, based in given X and MU values

X = numpy.ndarray[numpy.ndarray[float]] – m x n
MU = numpy.ndarray[numpy.ndarray[float]] – k x n

SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] – k x n x n
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.GMMTests().test_gmm_covariance(compute_sigma)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

Same as in K-means you will be working with the data of size (m x n).

def initialize_parameters(X, k):
Return initial values for training of the GMM
Set component mean to a random
pixel’s value (without replacement),
based on the mean calculate covariance matrices,
and set each component mixing coefficient (PIs)
to a uniform values
(e.g. 4 components -> [0.25,0.25,0.25,0.25]).

X = numpy.ndarray[numpy.ndarray[float]] – m x n

(MU, SIGMA, PI)
MU = numpy.ndarray[numpy.ndarray[float]] – k x n
SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] – k x n x n
PI = numpy.ndarray[float] – k
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
# Hint: for initializing SIGMA you could choose to use compute_sigma͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.GMMTests().test_gmm_initialization(initialize_parameters)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

Be careful when coding up prob() below. It is fine for prob() to take the vectorized approach, but you may have to adjust your implementation to handle both cases. Specifically the case where x is a single datapoint and where x is an entire array of datapoints.¶
Note: prob function below gives a probability density estimate which we loosely (and definitely not accurately) call probability. As you can imagine the density function can take any value and can certainly be greater than 1. The prob function isn’t really calculating probability but probability density.

def prob(x, mu, sigma):
“””Calculate the probability of x (a single
data point or an array of data points) under the
component with the given mean and covariance.
The function is intended to compute multivariate
normal distribution, which is given by N(x;MU,SIGMA).

x = numpy.ndarray[float] (for single datapoint)
or numpy.ndarray[numpy.ndarray[float]] (for array of datapoints)
mu = numpy.ndarray[float]
sigma = numpy.ndarray[numpy.ndarray[float]]

probability = float (for single datapoint)
or numpy.ndarray[float] (for array of datapoints)
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.GMMTests().test_gmm_prob(prob)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

def E_step(X,MU,SIGMA,PI,k):
E-step – Expectation
Calculate responsibility for each
of the data points, for the given
MU, SIGMA and PI.

X = numpy.ndarray[numpy.ndarray[float]] – m x n
MU = numpy.ndarray[numpy.ndarray[float]] – k x n
SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] – k x n x n
PI = numpy.ndarray[float] – k

responsibility = numpy.ndarray[numpy.ndarray[float]] – k x m
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.GMMTests().test_gmm_e_step(E_step)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

def M_step(X, r, k):
M-step – Maximization
Calculate new MU, SIGMA and PI matrices
based on the given responsibilities.

X = numpy.ndarray[numpy.ndarray[float]] – m x n
r = numpy.ndarray[numpy.ndarray[float]] – k x m

(new_MU, new_SIGMA, new_PI)
new_MU = numpy.ndarray[numpy.ndarray[float]] – k x n
new_SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] – k x n x n
new_PI = numpy.ndarray[float] – k
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.GMMTests().test_gmm_m_step(M_step)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

def loglikelihood(X, PI, MU, SIGMA, k):
“””Calculate a log likelihood of the
trained model based on the following
formula for posterior probability:

log(Pr(X | mixing, mean, stdev)) = sum((i=1 to m), log(sum((j=1 to k),
mixing_j * N(x_i | mean_j,stdev_j))))

Make sure you are using natural log, instead of log base 2 or base 10.

X = numpy.ndarray[numpy.ndarray[float]] – m x n
MU = numpy.ndarray[numpy.ndarray[float]] – k x n
SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] – k x n x n
PI = numpy.ndarray[float] – k

log_likelihood = float
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.GMMTests().test_gmm_likelihood(loglikelihood)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

def train_model(X, k, convergence_function, initial_values = None):
Train the mixture model using the
expectation-maximization algorithm.
E.g., iterate E and M steps from
above until convergence.
If the initial_values are None, initialize them.
Else it’s a tuple of the format (MU, SIGMA, PI).
Convergence is reached when convergence_function
returns terminate as True,
see default convergence_function example
in `helper_functions.py`

X = numpy.ndarray[numpy.ndarray[float]] – m x n
convergence_function = func
initial_values = None or (MU, SIGMA, PI)

(new_MU, new_SIGMA, new_PI, responsibility)
new_MU = numpy.ndarray[numpy.ndarray[float]] – k x n
new_SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] – k x n x n
new_PI = numpy.ndarray[float] – k
responsibility = numpy.ndarray[numpy.ndarray[float]] – k x m
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.GMMTests().test_gmm_train(train_model, loglikelihood)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

def cluster(r):
Based on a given responsibilities matrix
return an array of cluster indices.
Assign each datapoint to a cluster based,
on component with a max-likelihood
(maximum responsibility value).

r = numpy.ndarray[numpy.ndarray[float]] – k x m – responsibility matrix

clusters = numpy.ndarray[int] – m x 1
# TODO: finish this͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON’T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
tests.GMMTests().test_gmm_cluster(cluster)
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄

def segment(X, MU, k, r):
Segment the X matrix into k components.
Returns a matrix where each data point is
replaced with its max-likelihood component mean.
E.g., return the original matrix where each pixel’s
intensity replaced with its max-likelihood
component mean. (the shape is still mxn, not
original image size)

X = numpy.ndarray[numpy.ndarray[float]] – m x n
MU = numpy.ndarray[numpy.ndarray[float]] – k x n
r = numpy.ndarray[numpy.ndarray[float]] – k x m – responsibility matrix

new_X = numpy.ndarray[numpy.ndarray[float]] – m x n
# TODO: finish this function͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
raise NotImplementedError()

########## DON’T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄁͏󠄅͏󠄄
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST