[EDIT: Thanks to this post, the issue reported here has been resolved since Spark 1.4.1 – see the comments below]
While writing the previous post on Spark dataframes, I encountered an unexpected behavior of the respective
.filter method; but, on the one hand, I needed some more time to experiment and confirm it and, on the other hand, I knew that Spark 1.4 was before the gates, where possibly the issue would have been resolved. So, now that I have a working installation of Spark 1.4, I can confirm that the issue persists here, too.
The issue is raised when we use composite conditions (i.e. conditions composed from simpler ones using logical operators such as
or) as arguments in a
DataFrame.filter() call. I will demonstrate it below using just a toy example of a 1-D dataframe, but I will also include the findings from my previous post with a real world dataset, which can be replicated by interested readers (all code and data from the previous post have been provided).
After starting pyspark, we proceed to import the necessary modules, as shown below:
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Python version 2.7.6 (default, Jun 23 2015 04:02:15) SparkContext available as sc, HiveContext available as sqlContext. >>> import numpy as np >>> import pandas as pd >>> from pyspark.sql import SQLContext >>> from pyspark.sql.types import * >>> sqlContext = SQLContext(sc)
In what follows, we use uppercase names for our local variables (A, A_df) and lowercase ones for the respective RDD’s (a, a_df).
We first build a simple 1-D numpy array, and we use it to construct a pandas dataframe:
>>> A = np.arange(1,11) >>> A_df = pd.DataFrame(A) >>> A array([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) >>> A_df 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10
Contrary to what is implied in this introductory post to dataframes by Databricks, converting pandas dataframes to Spark ones is not as straightforward as
spark_df = context.createDataFrame(pandas_df) (the example in the above post works because
pandas_df itself comes from a Spark RDD); as detailed in pyspark.sql.SQLContext documentation, in case of a pandas dataframe, we have to define a schema first:
>>> schema = StructType([StructField("A", IntegerType(), True)]) # a single column named 'A' >>> a_df = sqlContext.createDataFrame(A_df, schema) >>> a_df.collect() [Row(A=1), Row(A=2), Row(A=3), Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]
Let’s try a simple filter operation in our Spark dataframe, e.g. let’s take all entries with
A > 3:
>>> a_df.filter(a_df.A > 3).collect() [Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]
So far so good. But look at what happens if we try to take, say, entries with
A > 3 and
A < 9:
>>> a_df.filter(a_df.A > 3 and a_df.A < 9).collect() [Row(A=1), Row(A=2), Row(A=3), Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8)]
What is actually happening is that
filter takes into account only the second (single) condition, and ignores completely the first (without throwing an exception). Consider the following filtering operations, which should clearly return no entries:
>>> a_df.filter(a_df.A > 9 and a_df.A < 3).collect() [Row(A=1), Row(A=2)] >>> a_df.filter(a_df.A == 1 and a_df.A > 3).collect() [Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8), Row(A=9), Row(A=10)]
The alternative way for querying, as well as the SQL-like API, both work OK:
>>> a_df.filter("A > 3 and A < 9").collect() [Row(A=4), Row(A=5), Row(A=6), Row(A=7), Row(A=8)] >>> a_df.filter("A > 9 and A < 3").collect()  >>> a_df.filter("A = 1 and A > 3").collect()  >>> a_df.registerTempTable("df") >>> sqlContext.sql("SELECT * FROM df WHERE A > 3 AND A < 9").show() +-+ |A| +-+ |4| |5| |6| |7| |8| +-+
As we show below, this kind of composite conditions in the
filter method is indeed valid for “simple” (i.e. non-dataframe) RDD’s:
>>> a = sc.parallelize(A) # parallelize the initial numpy array A >>> a.filter(lambda x: x > 3 and x < 9).collect() [4, 5, 6, 7, 8] >>> a.filter(lambda x: x == 1 and x > 3).collect() 
As I mentioned in the beginning, although this was a toy dataframe, I have observed the same behavior in the “real-world” dataset used in my previous post; consider the following filter operations on the taxi_df dataframe we constructed there (tested with Spark 1.3.1):
>>> taxiFence1 = taxi_df.filter(taxi_df.pickup_lat > 40.7).filter(taxi_df.pickup_lat < 40.86) # correct result taxiFence1.count() 232496L taxiFence2 = taxi_df.filter("pickup_lat > 40.7 and pickup_lat < 40.86") # correct result taxiFence2.count() 232496L taxiFence3 = taxi_df.filter(taxi_df.pickup_lat > 40.7 and taxi_df.pickup_lat < 40.86) # wrong result taxiFence3.count() 249896L taxiFence4 = taxi_df.filter(taxi_df.pickup_lat < 40.86) # same condition w/ the 2nd one above (and same result) taxiFence4.count() 249896L
It is not at all clear in pyspark.sql.DataFrame.filter documentation that the use of such composite logical expressions is not valid; and indeed, this is not an “operational” issue (in the sense that workarounds exist, as demonstrated above). However, I guess everyone will agree that the combination of the facts that
- such expressions look valid
- they work as expected in the RDD
- the method does not throw an exception and
- it returns some “results”
is potentially dangerous, if not known beforehand, in the sense that it can very easily lead to incorrect results. IMHO, if such expressions are not intended for use with the
DataFrame.filter method, they should be explicitly forbidden (exception thrown), and this fact should be clearly indicated in the API documentation.
We all know that Spark dataframes is a new feature in a relatively new project (Spark itself); I am thankful to the good people at Databricks (and to all independent Spark contributors) for their wonderful work, and I hope that posts such as this one will help in resolving issues and making Spark even better.-
- Streaming data from Raspberry Pi to Oracle NoSQL via Node-RED - February 13, 2017
- Dynamically switch Keras backend in Jupyter notebooks - January 10, 2017
- sparklyr: a test drive on YARN - November 7, 2016
“and” actually doesn’t work since there is no way to override “and” in Python. As a result, both Pandas and Spark DF use bitwise and “&” for conjunction. However, I agree this behavior to turn “x > 5 and y > 6” automatically into “y > 6” is super confusing, even though it came from the Python language.
I looked into Pandas and found a way to throw a better exception. Ticket filed here: https://issues.apache.org/jira/browse/SPARK-8568
Fix is out: https://github.com/apache/spark/pull/6961
It will be released as part of Spark 1.4.1 hopefully next week.
Many thanks for the lightning-fast response, Reynold! Highly appreciated…
[…] a result of an issue we raised recently, expressions like the following will not work from Spark 1.4.1 […]