Augmenting PCA functionality in Spark 1.5

Christos - Iraklis Tsatsoulis Dimensionality Reduction, Spark 7 Comments

Surprisingly enough, although the relatively new Spark ML library (not to be confused with Spark MLlib) includes a method for principal components analysis (PCA), there is no way to extract some very useful information regarding the PCA transformation, namely the resulting eigenvalues (check the Python API documentation); and, without the eigenvalues, one cannot compute the proportion of variance explained (PVE), which is a key diagnostic when utilizing PCA for dimensionality reduction. Simply speaking, the proportion of variance explained by the k first principal components is the ratio of the sum of the k first eigenvalues (sorted in decreasing order) to the sum of all eigenvalues (see here for the mathematical details); and this information, which is crucial during the analysis, is unfortunately not available through the Spark ML API (at least as of version 1.5.1).

Having realized the issue due to a question in Stack Overflow (which I answered), I expose here my workaround solution for the Python API of Spark (Pyspark). Let me notice that the solution is efficient and operational, since it takes fully into account the relevant Spark programming patterns (e.g. no collect() statements), so you can hopefully include it in your projects as-is, or with slight only modifications. It has been developed and tested with Spark 1.5.0 & 1.5.1.

The input to our pca procedure consists of a Spark dataframe, which includes a column named features containing the features as DenseVectors. The reason for this particular choice is the compatibility with the format preferred in the existing Spark ML PCA implementation; nevertheless, it can be easily modified so as to be suitable to different input formats.

In order to construct a pca procedure, we must first define an intermediate function for computing the covariance matrix, as follows:

import numpy as np

def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m)  # subtract the mean
    
    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

The above function returns the rectangular covariance matrix of the input variables, after having first normalized them by subtracting their respective mean values (this is a standard procedure when computing the PCA and, as we will see below, it is also included in the existing implementation, although not mentioned in the documentation).

Now, we can write our main pca function, as follows:

from numpy.linalg import eigh
from pyspark.mllib.linalg import *

def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, DF of DenseVector, np.ndarray): A tuple of (eigenvectors, DF of
            scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
            rows equals the length of the arrays in the input `DF` and the number of columns equals
            `k`.  The `DF` of scores has the same number of rows as `df` and consists of DenseVectors
            of length `k`.  Eigenvalues is an array of length d (the number of features).
    """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvalues
    score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    scoreDF = sqlContext.createDataFrame(score.map(lambda x: (DenseVector(x),)), ['pca_features'])
    # Return the `k` principal components, `k` scores, and all eigenvalues
    
    return components.T, scoreDF, eigVals

Testing

First, let’s have at look at the results provided by the existing implementation:

>>> pyspark
[...]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.7.6 (default, Mar 22 2014 22:59:38)
SparkContext available as sc, HiveContext available as sqlContext.
>>> from pyspark.ml.feature import PCA
>>> from pyspark.mllib.linalg import *
>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
...     (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
...     (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = sqlContext.createDataFrame(data,["features"])
>>> pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
>>> model = pca_extracted.fit(df)
>>> df_pca = model.transform(df)
>>> df_pca.collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
 Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
 Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]

Now, the resulting score (PCA features) using our implementation is:

>>> comp, score, eigVals = pca(df)
>>> score.collect()
[Row(pca_features=DenseVector([1.6486, 4.0133])),
 Row(pca_features=DenseVector([-4.6451, 1.1168])),
 Row(pca_features=DenseVector([-6.4289, 5.338]))]

Notice that the signs of our second column are all opposite from the ones derived by the existing method; but this is not an issue: according to the (freely downloadable & highly recommended) An Introduction to Statistical Learning, co-authored by Hastie & Tibshirani, p. 382,

Each principal component loading vector is unique, up to a sign flip. This means that two different software packages will yield the same principal component loading vectors, although the signs of those loading vectors may differ. The signs may differ because each principal component loading vector specifies a direction in p-dimensional space: flipping the sign has no effect as the direction does not change. […] Similarly, the score vectors are unique up to a sign flip, since the variance of Z is the same as the variance of −Z.

Our solution has an additional advantage over the existing implementation: the latter unnecessarily binds together in a dataframe both the existing features and the PCA scores, a not so wise choice, especially in a real-world situation, where we may have dataset records in the tens of millions; in such a situation, carrying around dataframes with all these columns seems an unnecessary waste of resources.

Proportion of variance explained

Now that we have our eigenvalues available, it is trivial to write a function for the proportion of variance explained:

def varianceExplained(df, k=1):
    """Calculate the fraction of variance explained by the top `k` eigenvectors.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k: The number of principal components to consider.

    Returns:
        float: A number between 0 and 1 representing the percentage of variance explained
            by the top `k` eigenvectors.
    """
    eigenvalues = pca(df,k)[2] 
    return sum(eigenvalues[0:k])/sum(eigenvalues)

Applying it to our example data, we get

>>> varianceExplained(df,1)
0.79439325322305299
>>> varianceExplained(df,2)
1.0

Having the 100% of variance explained with only 2 principal components (out of 5 in total) is not strange, if we look at the eigenvalues:

>>> eigVals
array([  1.20041647e+01,   3.10694640e+00,   1.12683068e-15,
        -3.63127567e-16,  -8.99228389e-16])

i.e. the first 2 eigenvalues are much bigger (15 orders of magnitude) than the remaining 3 ones.

Summary

We have presented methods to amend some missing functionality when performing PCA in Spark 1.5; specifically, with the methods presented we are able to:

  • explicitly provide the eigenvalues resulting from the PCA decomposition
  • estimate the proportion of variance explained

Additionally, our methods result in a lighter dataframe carrying only the PCA scores, instead of binding them with the original features (which sounds terribly impractical).

Since being able to compute the proportion of variance explained is of primary importance during PCA dimensionality reduction, we would strongly suggest that this functionality is included in future versions of Spark.

[EDIT Nov 6: see this Stack Overflow question for how to implement this functionality in Scala; I have also opened an issue at Spark JIRA: SPARK-11530]

[EDIT Dec 14: as a result of the JIRA issue I opened, the issue has now been resolved, and it will be included in Spark 2.0]

Christos - Iraklis Tsatsoulis

Christos - Iraklis Tsatsoulis

Christos - Iraklis is one of our resident Data Scientists. He holds advanced graduate degrees in applied mathematics, engineering, and computing. He has been awarded both Chartered Engineer and Chartered Manager status in the UK, as well as Master status in Kaggle.com due to "consistent and stellar results" in predictive analytics contests.
Christos - Iraklis Tsatsoulis

Latest posts by Christos - Iraklis Tsatsoulis (see all)

7
Leave a Reply

avatar
3 Comment threads
4 Thread replies
2 Followers
 
Most reacted comment
Hottest comment thread
4 Comment authors
RajkoManojChristos - Iraklis TsatsoulisLeezewen Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
Leezewen
Guest
Leezewen

Do the Sprak MLlib of PCA fit the R language?

Manoj
Guest
Manoj

How the eigen vectors from different nodes are ensemble into the final set of eigen vectors as it is distributed framework and different nodes results in different set of eigen values?

Rajko
Guest
Rajko

Hi Christos, Thanks for this post, I find it very useful and it, indeed, caused added functionality in the Spark 2.0.0. At the beginning of the post you mentioned the data type “can be easily modified so as to be suitable to different input formats.” Could you please elaborate more on this or give me some references, as I cannot find a workaround for this – keep getting the error “Input column features must be a vector colum” when running PCA on my custom DataFrame with Spark 1.6.1. If the solution for this is not very straightforward (which I tend… Read more »