Unexpected behavior of Spark dataframe filter method

Christos - Iraklis TsatsoulisBig Data, Spark 4 Comments

[EDIT: Thanks to this post, the issue reported here has been resolved since Spark 1.4.1 – see the comments below]

While writing the previous post on Spark dataframes, I encountered an unexpected behavior of the respective .filter method; but, on the one hand, I needed some more time to experiment and confirm it and, on the other hand, I knew that Spark 1.4 was before the gates, where possibly the issue would have been resolved. So, now that I have a working installation of Spark 1.4, I can confirm that the issue persists here, too.

The issue is raised when we use composite conditions (i.e. conditions composed from simpler ones using logical operators such as and & or) as arguments in a DataFrame.filter() call. I will demonstrate it below using just a toy example of a 1-D dataframe, but I will also include the findings from my previous post with a real world dataset, which can be replicated by interested readers (all code and data from the previous post have been provided).

After starting pyspark, we proceed to import the necessary modules, as shown below:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Python version 2.7.6 (default, Jun 23 2015 04:02:15)
SparkContext available as sc, HiveContext available as sqlContext.
>>> import numpy as np
>>> import pandas as pd
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

In what follows, we use uppercase names for our local variables (A, A_df) and lowercase ones for the respective RDD’s (a, a_df).

We first build a simple 1-D numpy array, and we use it to construct a pandas dataframe:

>>> A = np.arange(1,11)
>>> A_df = pd.DataFrame(A)
>>> A
array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10])
>>> A_df
    0
0   1
1   2
2   3
3   4
4   5
5   6
6   7
7   8
8   9
9  10

Contrary to what is implied in this introductory post to dataframes by Databricks, converting pandas dataframes to Spark ones is not as straightforward as spark_df = context.createDataFrame(pandas_df) (the example in the above post works because pandas_df itself comes from a Spark RDD); as detailed in pyspark.sql.SQLContext documentation, in case of a pandas dataframe, we have to define a schema first:

>>> schema = StructType([StructField("A", IntegerType(), True)]) # a single column named 'A'
>>> a_df = sqlContext.createDataFrame(A_df, schema)
>>> a_df.collect()
[Row(A=1), Row(A=2), Row(A=3), Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]

Let’s try a simple filter operation in our Spark dataframe, e.g. let’s take all entries with A > 3:

>>> a_df.filter(a_df.A > 3).collect()
[Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]

So far so good. But look at what happens if we try to take, say, entries with A > 3 and A < 9:

>>> a_df.filter(a_df.A > 3 and a_df.A < 9).collect()
[Row(A=1), Row(A=2), Row(A=3), Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8)]

What is actually happening is that filter takes into account only the second (single) condition, and ignores completely the first (without throwing an exception). Consider the following filtering operations, which should clearly return no entries:

>>> a_df.filter(a_df.A > 9 and a_df.A < 3).collect()
[Row(A=1), Row(A=2)]
>>> a_df.filter(a_df.A == 1 and a_df.A > 3).collect()
[Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]

The alternative way for querying, as well as the SQL-like API, both work OK:

>>> a_df.filter("A > 3 and A < 9").collect()
[Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8)]
>>> a_df.filter("A > 9 and A < 3").collect()
[]
>>> a_df.filter("A = 1 and A > 3").collect()
[]
>>> a_df.registerTempTable("df")
>>> sqlContext.sql("SELECT * FROM df WHERE A > 3 AND A < 9").show()
+-+
|A|
+-+
|4|
|5|
|6|
|7|
|8|
+-+

As we show below, this kind of composite conditions in the filter method is indeed valid for “simple” (i.e. non-dataframe) RDD’s:

>>> a = sc.parallelize(A) # parallelize the initial numpy array A
>>> a.filter(lambda x: x > 3 and x < 9).collect()
[4, 5, 6, 7, 8]
>>> a.filter(lambda x: x == 1 and x > 3).collect()
[]

As I mentioned in the beginning, although this was a toy dataframe, I have observed the same behavior in the “real-world” dataset used in my previous post; consider the following filter operations on the taxi_df dataframe we constructed there (tested with Spark 1.3.1):

>>> taxiFence1 = taxi_df.filter(taxi_df.pickup_lat > 40.7).filter(taxi_df.pickup_lat < 40.86) # correct result
taxiFence1.count()
232496L
taxiFence2 = taxi_df.filter("pickup_lat > 40.7 and pickup_lat < 40.86") # correct result
taxiFence2.count()
232496L
taxiFence3 = taxi_df.filter(taxi_df.pickup_lat > 40.7 and taxi_df.pickup_lat < 40.86)  # wrong result
taxiFence3.count()
249896L
taxiFence4 = taxi_df.filter(taxi_df.pickup_lat < 40.86) # same condition w/ the 2nd one above (and same result)
taxiFence4.count()
249896L

It is not at all clear in pyspark.sql.DataFrame.filter documentation that the use of such composite logical expressions is not valid; and indeed, this is not an “operational” issue (in the sense that workarounds exist, as demonstrated above). However, I guess everyone will agree that the combination of the facts that

  1. such expressions look valid
  2. they work as expected in the RDD filter method
  3. the method does not throw an exception and
  4. it returns some “results”

is potentially dangerous, if not known beforehand, in the sense that it can very easily lead to incorrect results. IMHO, if such expressions are not intended for use with the DataFrame.filter method, they should be explicitly forbidden (exception thrown), and this fact should be clearly indicated in the API documentation.

We all know that Spark dataframes is a new feature in a relatively new project (Spark itself); I am thankful to the good people at Databricks (and to all independent Spark contributors) for their wonderful work, and I hope that posts such as this one will help in resolving issues and making Spark even better.-

Christos - Iraklis Tsatsoulis
Latest posts by Christos - Iraklis Tsatsoulis (see all)
Subscribe
Notify of
4 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Reynold Xin
Reynold Xin
June 23, 2015 23:55

“and” actually doesn’t work since there is no way to override “and” in Python. As a result, both Pandas and Spark DF use bitwise and “&” for conjunction. However, I agree this behavior to turn “x > 5 and y > 6” automatically into “y > 6” is super confusing, even though it came from the Python language.

I looked into Pandas and found a way to throw a better exception. Ticket filed here: https://issues.apache.org/jira/browse/SPARK-8568

Reynold Xin
Reynold Xin
Reply to  Reynold Xin
June 24, 2015 00:24

Fix is out: https://github.com/apache/spark/pull/6961

It will be released as part of Spark 1.4.1 hopefully next week.

trackback
July 4, 2016 12:56

[…] a result of an issue we raised recently, expressions like the following will not work from Spark 1.4.1 […]