Unexpected behavior of Spark dataframe filter method

Christos - Iraklis Tsatsoulis Big Data, Spark 4 Comments

[EDIT: Thanks to this post, the issue reported here has been resolved since Spark 1.4.1 – see the comments below]

While writing the previous post on Spark dataframes, I encountered an unexpected behavior of the respective .filter method; but, on the one hand, I needed some more time to experiment and confirm it and, on the other hand, I knew that Spark 1.4 was before the gates, where possibly the issue would have been resolved. So, now that I have a working installation of Spark 1.4, I can confirm that the issue persists here, too.

The issue is raised when we use composite conditions (i.e. conditions composed from simpler ones using logical operators such as and & or) as arguments in a DataFrame.filter() call. I will demonstrate it below using just a toy example of a 1-D dataframe, but I will also include the findings from my previous post with a real world dataset, which can be replicated by interested readers (all code and data from the previous post have been provided).

After starting pyspark, we proceed to import the necessary modules, as shown below:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Python version 2.7.6 (default, Jun 23 2015 04:02:15)
SparkContext available as sc, HiveContext available as sqlContext.
>>> import numpy as np
>>> import pandas as pd
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

In what follows, we use uppercase names for our local variables (A, A_df) and lowercase ones for the respective RDD’s (a, a_df).

We first build a simple 1-D numpy array, and we use it to construct a pandas dataframe:

>>> A = np.arange(1,11)
>>> A_df = pd.DataFrame(A)
>>> A
array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10])
>>> A_df
    0
0   1
1   2
2   3
3   4
4   5
5   6
6   7
7   8
8   9
9  10

Contrary to what is implied in this introductory post to dataframes by Databricks, converting pandas dataframes to Spark ones is not as straightforward as spark_df = context.createDataFrame(pandas_df) (the example in the above post works because pandas_df itself comes from a Spark RDD); as detailed in pyspark.sql.SQLContext documentation, in case of a pandas dataframe, we have to define a schema first:

>>> schema = StructType([StructField("A", IntegerType(), True)]) # a single column named 'A'
>>> a_df = sqlContext.createDataFrame(A_df, schema)
>>> a_df.collect()
[Row(A=1), Row(A=2), Row(A=3), Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]

Let’s try a simple filter operation in our Spark dataframe, e.g. let’s take all entries with A > 3:

>>> a_df.filter(a_df.A > 3).collect()
[Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]

So far so good. But look at what happens if we try to take, say, entries with A > 3 and A < 9:

>>> a_df.filter(a_df.A > 3 and a_df.A < 9).collect()
[Row(A=1), Row(A=2), Row(A=3), Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8)]

What is actually happening is that filter takes into account only the second (single) condition, and ignores completely the first (without throwing an exception). Consider the following filtering operations, which should clearly return no entries:

>>> a_df.filter(a_df.A > 9 and a_df.A < 3).collect()
[Row(A=1), Row(A=2)]
>>> a_df.filter(a_df.A == 1 and a_df.A > 3).collect()
[Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]

The alternative way for querying, as well as the SQL-like API, both work OK:

>>> a_df.filter("A > 3 and A < 9").collect()
[Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8)]
>>> a_df.filter("A > 9 and A < 3").collect()
[]
>>> a_df.filter("A = 1 and A > 3").collect()
[]
>>> a_df.registerTempTable("df")
>>> sqlContext.sql("SELECT * FROM df WHERE A > 3 AND A < 9").show()
+-+
|A|
+-+
|4|
|5|
|6|
|7|
|8|
+-+

As we show below, this kind of composite conditions in the filter method is indeed valid for “simple” (i.e. non-dataframe) RDD’s:

>>> a = sc.parallelize(A) # parallelize the initial numpy array A
>>> a.filter(lambda x: x > 3 and x < 9).collect()
[4, 5, 6, 7, 8]
>>> a.filter(lambda x: x == 1 and x > 3).collect()
[]

As I mentioned in the beginning, although this was a toy dataframe, I have observed the same behavior in the “real-world” dataset used in my previous post; consider the following filter operations on the taxi_df dataframe we constructed there (tested with Spark 1.3.1):

>>> taxiFence1 = taxi_df.filter(taxi_df.pickup_lat > 40.7).filter(taxi_df.pickup_lat < 40.86) # correct result
taxiFence1.count()
232496L
taxiFence2 = taxi_df.filter("pickup_lat > 40.7 and pickup_lat < 40.86") # correct result
taxiFence2.count()
232496L
taxiFence3 = taxi_df.filter(taxi_df.pickup_lat > 40.7 and taxi_df.pickup_lat < 40.86)  # wrong result
taxiFence3.count()
249896L
taxiFence4 = taxi_df.filter(taxi_df.pickup_lat < 40.86) # same condition w/ the 2nd one above (and same result)
taxiFence4.count()
249896L

It is not at all clear in pyspark.sql.DataFrame.filter documentation that the use of such composite logical expressions is not valid; and indeed, this is not an “operational” issue (in the sense that workarounds exist, as demonstrated above). However, I guess everyone will agree that the combination of the facts that

  1. such expressions look valid
  2. they work as expected in the RDD filter method
  3. the method does not throw an exception and
  4. it returns some “results”

is potentially dangerous, if not known beforehand, in the sense that it can very easily lead to incorrect results. IMHO, if such expressions are not intended for use with the DataFrame.filter method, they should be explicitly forbidden (exception thrown), and this fact should be clearly indicated in the API documentation.

We all know that Spark dataframes is a new feature in a relatively new project (Spark itself); I am thankful to the good people at Databricks (and to all independent Spark contributors) for their wonderful work, and I hope that posts such as this one will help in resolving issues and making Spark even better.-

Christos - Iraklis Tsatsoulis

Christos - Iraklis Tsatsoulis

Christos - Iraklis is one of our resident Data Scientists. He holds advanced graduate degrees in applied mathematics, engineering, and computing. He has been awarded both Chartered Engineer and Chartered Manager status in the UK, as well as Master status in Kaggle.com due to "consistent and stellar results" in predictive analytics contests.
Christos - Iraklis Tsatsoulis

Latest posts by Christos - Iraklis Tsatsoulis (see all)

4
Leave a Reply

avatar
2 Comment threads
2 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
2 Comment authors
Dataframes from CSV files in Spark 1.5: automatic schema extraction, neat summary statistics, & elementary data exploration - NodalpointChristos - Iraklis TsatsoulisReynold Xin Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
Reynold Xin
Guest
Reynold Xin

“and” actually doesn’t work since there is no way to override “and” in Python. As a result, both Pandas and Spark DF use bitwise and “&” for conjunction. However, I agree this behavior to turn “x > 5 and y > 6” automatically into “y > 6” is super confusing, even though it came from the Python language.

I looked into Pandas and found a way to throw a better exception. Ticket filed here: https://issues.apache.org/jira/browse/SPARK-8568

Reynold Xin
Guest
Reynold Xin

Fix is out: https://github.com/apache/spark/pull/6961

It will be released as part of Spark 1.4.1 hopefully next week.

trackback

[…] a result of an issue we raised recently, expressions like the following will not work from Spark 1.4.1 […]