Augmenting PCA functionality in Spark 1.5

Christos - Iraklis Tsatsoulis Dimensionality Reduction, Spark 7 Comments

Surprisingly enough, although the relatively new Spark ML library (not to be confused with Spark MLlib) includes a method for principal components analysis (PCA), there is no way to extract some very useful information regarding the PCA transformation, namely the resulting eigenvalues (check the Python API documentation); and, without the eigenvalues, one cannot compute the proportion of variance explained (PVE), which is a key diagnostic when utilizing PCA for dimensionality reduction. Simply speaking, the proportion of variance explained by the k first principal components is the ratio of the sum of the k first eigenvalues (sorted in decreasing order) to the sum of all eigenvalues (see here for the mathematical details); and this information, which is crucial during the analysis, is unfortunately not available through the Spark ML API (at least as of version 1.5.1).

Having realized the issue due to a question in Stack Overflow (which I answered), I expose here my workaround solution for the Python API of Spark (Pyspark). Let me notice that the solution is efficient and operational, since it takes fully into account the relevant Spark programming patterns (e.g. no collect() statements), so you can hopefully include it in your projects as-is, or with slight only modifications. It has been developed and tested with Spark 1.5.0 & 1.5.1.

The input to our pca procedure consists of a Spark dataframe, which includes a column named features containing the features as DenseVectors. The reason for this particular choice is the compatibility with the format preferred in the existing Spark ML PCA implementation; nevertheless, it can be easily modified so as to be suitable to different input formats.

In order to construct a pca procedure, we must first define an intermediate function for computing the covariance matrix, as follows:

import numpy as np

def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m)  # subtract the mean
    
    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

The above function returns the rectangular covariance matrix of the input variables, after having first normalized them by subtracting their respective mean values (this is a standard procedure when computing the PCA and, as we will see below, it is also included in the existing implementation, although not mentioned in the documentation).

Now, we can write our main pca function, as follows:

from numpy.linalg import eigh
from pyspark.mllib.linalg import *

def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, DF of DenseVector, np.ndarray): A tuple of (eigenvectors, DF of
            scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
            rows equals the length of the arrays in the input `DF` and the number of columns equals
            `k`.  The `DF` of scores has the same number of rows as `df` and consists of DenseVectors
            of length `k`.  Eigenvalues is an array of length d (the number of features).
    """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvalues
    score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    scoreDF = sqlContext.createDataFrame(score.map(lambda x: (DenseVector(x),)), ['pca_features'])
    # Return the `k` principal components, `k` scores, and all eigenvalues
    
    return components.T, scoreDF, eigVals

Testing

First, let’s have at look at the results provided by the existing implementation:

>>> pyspark
[...]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.7.6 (default, Mar 22 2014 22:59:38)
SparkContext available as sc, HiveContext available as sqlContext.
>>> from pyspark.ml.feature import PCA
>>> from pyspark.mllib.linalg import *
>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
...     (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
...     (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = sqlContext.createDataFrame(data,["features"])
>>> pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
>>> model = pca_extracted.fit(df)
>>> df_pca = model.transform(df)
>>> df_pca.collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
 Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
 Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]

Now, the resulting score (PCA features) using our implementation is:

>>> comp, score, eigVals = pca(df)
>>> score.collect()
[Row(pca_features=DenseVector([1.6486, 4.0133])),
 Row(pca_features=DenseVector([-4.6451, 1.1168])),
 Row(pca_features=DenseVector([-6.4289, 5.338]))]

Notice that the signs of our second column are all opposite from the ones derived by the existing method; but this is not an issue: according to the (freely downloadable & highly recommended) An Introduction to Statistical Learning, co-authored by Hastie & Tibshirani, p. 382,

Each principal component loading vector is unique, up to a sign flip. This means that two different software packages will yield the same principal component loading vectors, although the signs of those loading vectors may differ. The signs may differ because each principal component loading vector specifies a direction in p-dimensional space: flipping the sign has no effect as the direction does not change. […] Similarly, the score vectors are unique up to a sign flip, since the variance of Z is the same as the variance of −Z.

Our solution has an additional advantage over the existing implementation: the latter unnecessarily binds together in a dataframe both the existing features and the PCA scores, a not so wise choice, especially in a real-world situation, where we may have dataset records in the tens of millions; in such a situation, carrying around dataframes with all these columns seems an unnecessary waste of resources.

Proportion of variance explained

Now that we have our eigenvalues available, it is trivial to write a function for the proportion of variance explained:

def varianceExplained(df, k=1):
    """Calculate the fraction of variance explained by the top `k` eigenvectors.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k: The number of principal components to consider.

    Returns:
        float: A number between 0 and 1 representing the percentage of variance explained
            by the top `k` eigenvectors.
    """
    eigenvalues = pca(df,k)[2] 
    return sum(eigenvalues[0:k])/sum(eigenvalues)

Applying it to our example data, we get

>>> varianceExplained(df,1)
0.79439325322305299
>>> varianceExplained(df,2)
1.0

Having the 100% of variance explained with only 2 principal components (out of 5 in total) is not strange, if we look at the eigenvalues:

>>> eigVals
array([  1.20041647e+01,   3.10694640e+00,   1.12683068e-15,
        -3.63127567e-16,  -8.99228389e-16])

i.e. the first 2 eigenvalues are much bigger (15 orders of magnitude) than the remaining 3 ones.

Summary

We have presented methods to amend some missing functionality when performing PCA in Spark 1.5; specifically, with the methods presented we are able to:

  • explicitly provide the eigenvalues resulting from the PCA decomposition
  • estimate the proportion of variance explained

Additionally, our methods result in a lighter dataframe carrying only the PCA scores, instead of binding them with the original features (which sounds terribly impractical).

Since being able to compute the proportion of variance explained is of primary importance during PCA dimensionality reduction, we would strongly suggest that this functionality is included in future versions of Spark.

[EDIT Nov 6: see this Stack Overflow question for how to implement this functionality in Scala; I have also opened an issue at Spark JIRA: SPARK-11530]

[EDIT Dec 14: as a result of the JIRA issue I opened, the issue has now been resolved, and it will be included in Spark 2.0]

Christos - Iraklis Tsatsoulis
Latest posts by Christos - Iraklis Tsatsoulis (see all)
Subscribe
Notify of
7 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Leezewen
Leezewen
May 27, 2016 11:41

Do the Sprak MLlib of PCA fit the R language?

Leezewen
Leezewen
May 27, 2016 15:26

Yeah , I had already checked it and found nothing for PCA, So i guess that the SparkR do not support PCA currently, Thanks for your explain.

Manoj
Manoj
June 1, 2016 15:10

How the eigen vectors from different nodes are ensemble into the final set of eigen vectors as it is distributed framework and different nodes results in different set of eigen values?

Rajko
Rajko
June 9, 2016 13:02

Hi Christos, Thanks for this post, I find it very useful and it, indeed, caused added functionality in the Spark 2.0.0. At the beginning of the post you mentioned the data type “can be easily modified so as to be suitable to different input formats.” Could you please elaborate more on this or give me some references, as I cannot find a workaround for this – keep getting the error “Input column features must be a vector colum” when running PCA on my custom DataFrame with Spark 1.6.1. If the solution for this is not very straightforward (which I tend… Read more »