Dataframes from CSV files in Spark 1.5: automatic schema extraction, neat summary statistics, & elementary data exploration

Christos - Iraklis TsatsoulisBig Data, Spark 25 Comments

In a previous post, we glimpsed briefly at creating and manipulating Spark dataframes from CSV files. In the couple of months since, Spark has already gone from version 1.3.0 to 1.5, with more than 100 built-in functions introduced in Spark 1.5 alone; so, we thought it is a good time for revisiting the subject, this time also utilizing the external package spark-csv, provided by Databricks. We’ll use the same CSV file with header as in the previous post, which you can download here.

In order to include the spark-csv package, we must start pyspark with the folowing argument:

$ pyspark --packages com.databricks:spark-csv_2.10:1.2.0

If this is the first time we use it, Spark will download the package from Databricks’ repository, and it will be subsequently available for inclusion in future sessions. So, after the numerous INFO messages, we get the welcome screen, and we proceed to import the necessary modules:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0
      /_/

Using Python version 2.7.6 (default, Mar 22 2014 22:59:38)
SparkContext available as sc, HiveContext available as sqlContext.
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

Automatic schema extraction

Since Spark 1.4, a new (and still experimental) interface class pyspark.sql.DataFrameReader has been introduced, specifically for loading dataframes from external storage systems. The syntax shown in the spark-csv provided examples for loading a CSV file is:

>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('file:///home/vagrant/data/nyctaxisub.csv')

However, we found this syntax rather complicated and unintuitive; and despite the sparsity of the relevant documentation, we were able to use the following, much clearer syntax, where we have also set the flag for inferring the schema (which slows down the process, as it requires one extra pass over the data, and is false by default):

>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
>>> df.count()
249999

Recall from the previous post that our CSV file has 250,000 rows, including the header; hence, our record count is indeed correct.
Let’s check how successful was the automatic schema extraction:

>>> df.dtypes
[('_id', 'string'),
 ('_rev', 'string'),
 ('dropoff_datetime', 'string'),
 ('dropoff_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'string'),
 ('pickup_latitude', 'double'),
 ('pickup_longitude', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

Well, not bad at all! It seems that, apart from the two datetime columns, all other column types have been recognized correctly.

We are already a long way since the previous post: recall that there, we had to prepare the schema (column headers and types) of our dataframe using a long (and possibly error-prone) “manual” chain of .map operations in the initial RDD, including special parse settings for each column, and without any handy reference to the column names (which had to be manipulated separately). And there are more good news: although we still have some columns with incorrect data types, there is now a much more straightforward and easier way for such operations, directly on the newly created dataframe (instead of the “raw” RDD). In the following snippet, we will use the pyspark.sql.DataFrame class methods withColumn and withColumnRenamed in order to:

  1. Correctly set the types for the two datetime columns
  2. Rename the first two columns, by removing the annoying leading underscores
  3. Rename also the longitude/latitude columns to long/lat, for brevity:
>>> df = (df.withColumn('dropoff_datetime', df.dropoff_datetime.cast('timestamp'))
       .withColumn('pickup_datetime', df.pickup_datetime.cast('timestamp'))
       .withColumnRenamed('_id', 'id')
       .withColumnRenamed('_rev', 'rev')
       .withColumnRenamed('dropoff_latitude', 'dropoff_lat')
       .withColumnRenamed('dropoff_longitude', 'dropoff_long')
       .withColumnRenamed('pickup_latitude', 'pickup_lat')
       .withColumnRenamed('pickup_longitude', 'pickup_long'))
>>> df.dtypes
[('id', 'string'),
 ('rev', 'string'),
 ('dropoff_datetime', 'timestamp'),
 ('dropoff_lat', 'double'),
 ('dropoff_long', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'timestamp'),
 ('pickup_lat', 'double'),
 ('pickup_long', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

Neat summary statistics

One of the new Spark SQL functions introduced in version 1.3.1 was describe(), providing a quick summary of the dataframe numeric columns. Let’s try it here:

>>> df.describe().show()
+-------+------------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+-----------------+
|summary|       dropoff_lat|      dropoff_long|   passenger_count|       pickup_lat|       pickup_long|          rate_code|    trip_distance|trip_time_in_secs|
+-------+------------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+-----------------+
|  count|            249999|            249999|            249999|           249999|            249999|             249999|           249999|           249999|
|   mean| 39.94324114547672| -72.5620767207705|1.7390429561718246|39.98826507731672|-72.57300293825547| 1.0568522274089096|4.790398561594238|1319.435301741207|
| stddev|12.606727776625927|11.520652216641365|1.4045560098412289|5.520016838901558|10.065188912895966|0.30606569862073096|4.248816817765342|  672.14462665616|
|    min|        -3481.1343|        -1814.2775|                 0|              0.0|        -98.150002|                  0|              0.0|              751|
|    max|         404.95761|            2084.3|                 6|        73.989571|               0.0|                  6|             86.3|            10680|
+-------+------------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+-----------------+

Although the function is definitely a welcome addition, especially for exploratory data analysis, the output definitely feels cluttered, due to the unnecessary high precision displayed, even for integer-type columns (passenger_count and trip_time_in_secs). Could we have it in a more concise form?
It turns out that the output of df.describe() is itself a Spark dataframe with string-type columns:

>>> type(df.describe())
pyspark.sql.dataframe.DataFrame
>>> df.describe().dtypes
[('summary', 'string'),
 ('dropoff_lat', 'string'),
 ('dropoff_long', 'string'),
 ('passenger_count', 'string'),
 ('pickup_lat', 'string'),
 ('pickup_long', 'string'),
 ('rate_code', 'string'),
 ('trip_distance', 'string'),
 ('trip_time_in_secs', 'string')]

Moreover, it is by definition a small Spark dataframe, i.e. one that we can safely convert to pandas format and manipulate further as we wish, without fearing that it may not fit in the main memory.
Given the above, and keeping in mind that we only want to intervene on the mean and stddev fields of the summary description, it is not difficult to come up with a simple function providing a prettier summary:

def prettySummary(df):
    """ Neat summary statistics of a Spark dataframe
    Args:
        pyspark.sql.dataframe.DataFrame (df): input dataframe
    Returns:
        pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
    """
    import pandas as pd
    temp = df.describe().toPandas()
    temp.iloc[1:3,1:] = temp.iloc[1:3,1:].convert_objects(convert_numeric=True)
    pd.options.display.float_format = '{:,.2f}'.format
    return temp
>>> prettySummary(df)
  summary dropoff_lat dropoff_long passenger_count pickup_lat pickup_long  \
0   count      249999       249999          249999     249999      249999
1    mean       39.94       -72.56            1.74      39.99      -72.57
2  stddev       12.61        11.52            1.40       5.52       10.07
3     min  -3481.1343   -1814.2775               0        0.0  -98.150002
4     max   404.95761       2084.3               6  73.989571         0.0

  rate_code trip_distance trip_time_in_secs
0    249999        249999            249999
1      1.06          4.79          1,319.44
2      0.31          4.25            672.14
3         0           0.0               751
4         6          86.3             10680

From the summary statistics displayed it is apparent already that we have a data quality issue, as the min and max values for several columns are obviously unrealistic (and we have not even touched the non-numeric variables yet!).

Elementary data exploration

We can explore further roughly how many records exist with unrealistic values, for example:

>>> df.filter(df.dropoff_lat < 0).count() 
3 
>>> df.filter(df.dropoff_lat < 10).count() 
4715 
>>> df.filter(df.trip_distance == 0.0).count()
290
>>> df.filter(df.dropoff_long > -50).count()
4751

Although not apparent from the documentation, numeric column indices work, too; here is trip_distance (column #13):

>>> df.filter(df[13] == 0.0).count()
290

As a result of an issue we raised recently, expressions like the following will not work from Spark 1.4.1 on:

>>> df.filter(df.dropoff_lat < 10 and df.dropoff_long > -50).count()
[...]
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

The error message guides us to use & instead of and, but this simple replacement will again lead to a different error (not shown); it turns out that we also need to use an extra set of parentheses around our conditions:

>>> df.filter( (df.dropoff_lat < 10) & (df.dropoff_long > -50) ).count()
4710
>>> df.filter(df.dropoff_lat < 10).filter(df.dropoff_long > -50).count()  # confirm the above result
4710

Max and min column values can be computed for timestamp data, too:

>>> from pyspark.sql.functions import *
>>> df.select(max("pickup_datetime")).show()
+--------------------+
|max(pickup_datetime)|
+--------------------+
|2013-11-26 23:46:...|
+--------------------+
>>> df.select(max("dropoff_datetime")).show()
+---------------------+
|max(dropoff_datetime)|
+---------------------+
| 2013-11-26 23:59:...|
+---------------------+
>>> df.select(min("pickup_datetime")).show()
+--------------------+
|min(pickup_datetime)|
+--------------------+
|2013-01-10 21:27:...|
+--------------------+
>>> df.select(min("dropoff_datetime")).show()
+---------------------+
|min(dropoff_datetime)|
+---------------------+
| 2013-01-11 00:00:...|
+---------------------+

Knowing that our data are between January and December 2013, the above values indicate that, apart from having no records for December, the max/min limits for our timestamp columns look OK.

Let’s do another data quality check: we expect that, in each record, pickup_datetime should be strictly less than dropoff_datetime. To check if this is the case, we will first create a new boolean column, pickup_1st, based on the two datetime columns (creating new columns from existing ones in Spark dataframes is a frequently raised question – see Patrick’s comment in our previous post); then, we will check in how many records this is false (i.e. dropoff seems to happen before pickup). And although not necessary, we will also grab the opportunity to demonstrate how to create a new dataframe from selected columns of an existing one. So, our steps will be:

  1. Create a new dataframe, df_dates, consisting only from the datetime columns of the existing one, using select()
  2. Create a new boolean column in the df_dates dataframe, using withColumn()
  3. Check in how many records this boolean column is False, indicating a data quality issue:
>>> df_dates = df.select(df['pickup_datetime'], df['dropoff_datetime'])
>>> df_dates.show(5, truncate=False)
+---------------------+---------------------+
|pickup_datetime      |dropoff_datetime     |
+---------------------+---------------------+
|2013-01-11 21:48:00.0|2013-01-11 22:03:00.0|
|2013-01-11 04:07:00.0|2013-01-11 04:28:00.0|
|2013-01-11 21:46:00.0|2013-01-11 22:02:00.0|
|2013-01-11 09:44:00.0|2013-01-11 10:03:00.0|
|2013-01-11 21:48:00.0|2013-01-11 22:02:00.0|
+---------------------+---------------------+
only showing top 5 rows

>>> df_dates = df_dates.withColumn('pickup_1st', df_dates.pickup_datetime < df_dates.dropoff_datetime) 
>>> df_dates.filter(~df_dates.pickup_1st).count()  # '~' for 'not'
0

That is, we indeed have no records with this data quality issue.

We wouldn’t like to close this post without mentioning, however briefly, at least one of the new Spark SQL functions introduced in version 1.5; so, let’s demonstrate quickly the month() function, which extracts the month of a given date as an integer (a whole bunch of functions were introduced in Spark 1.5 to handle datetime data); here we will use it in conjunction with the groupBy() function, in order to get the number of records per month:

>>> df.groupBy(month(df.dropoff_datetime)).count().show()
+-----------------------+-----+
|month(dropoff_datetime)|count|
+-----------------------+-----+
|                      1|91075|
|                      2|64917|
|                     11|94007|
+-----------------------+-----+

Well, it certainly seems that the advertised description of our data was not exactly accurate: in fact, we only have data for three months (January, February, and November) of 2013…!

A note on show() versus collect()

Dataframe method show(), introduced in Spark 1.3, is meant to be used in an interactive fashion; we cannot use it if we want to extract the value programmatically (e.g. in order to use it further downstream in an automated pipeline), as it returns a value of type None:

>>> type(df.select(max("pickup_datetime")).show() )
+--------------------+
|max(pickup_datetime)|
+--------------------+
|2013-11-26 23:46:...|
+--------------------+

<type 'NoneType'>

In order to store and use the returned value programmatically, we have to use the dataframe method collect(), which returns all the records as a list of type Row:

>>> df.select(max("pickup_datetime")).collect()
[Row(max(pickup_datetime)=datetime.datetime(2013, 11, 26, 23, 46, 38))]
>>> type(df.select(max("pickup_datetime")).collect())
<type 'list'>

Now, since we have an 1-element list of an 1-element Row, we can simply get the first (and only) element, as we would do for any similar situation in pure Python:

>>> max_pickup = df.select(max("pickup_datetime")).collect()[0][0]
>>> max_pickup
datetime.datetime(2013, 11, 26, 23, 46, 38)
>>> from datetime import *
>>> max_pickup < datetime(2013,12,31) # is it before end of year 2013?
True

Perhaps not unexpectedly, the same exact situation holds also for the SQL-like interface (remember that table and column names are case-sensitive):

>>> df.registerTempTable("taxi")
>>> sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").show()
+---------+------+
|vendor_id|   _c1|
+---------+------+
|      CMT|114387|
|      VTS|135612|
+---------+------+
>>> sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").collect()
[Row(vendor_id=u'CMT', _c1=114387), Row(vendor_id=u'VTS', _c1=135612)]
>>> type( sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").collect() )
<type 'list'>
>>> sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").withColumnRenamed('_c1', 'count').show()
+---------+------+
|vendor_id| count|
+---------+------+
|      CMT|114387|
|      VTS|135612|
+---------+------+

where the last command shows how methods like withColumnRenamed can be used in this context, too.

Summary

Summarizing, we have touched on several things in this post regarding Spark dataframes; we have demonstrated how to:

  • Automatically extract (part of) the schema of a CSV file using the spark-csv package
  • Change the data type of selected columns
  • Rename selected columns
  • Create a more concise summary of a dataframe
  • Create a new dataframe from selected columns of an existing one
  • Create new columns in a dataframe, based on the existing ones
  • Extract summary statistics programmatically

As always, comments and feedback most welcome. Stay tuned as we will continue our Spark exploration.

Christos - Iraklis Tsatsoulis
Latest posts by Christos - Iraklis Tsatsoulis (see all)
Subscribe
Notify of
25 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Tristan
Tristan
November 7, 2015 00:06

Thank you for your nice blog! I really enjoy this kind of hands-on practical information.

The reason for needing parentheses for compound comparisons within filter operations is operator precedence. This is familiar from pandas, which has the same syntax and hence we run into the same issue there. When you use a bitwise \’&\’ to combine `a > b & c (b & c) < d`.

Best,

Ajay
Ajay
February 17, 2016 12:55

Hello, Nice Blog. I want to know how to handle categorical Features/Labels in MLlib. For example I want to do classification on IRIS data. Can you please tell me how to do it

Ajay
Ajay
Reply to  Christos - Iraklis Tsatsoulis
February 18, 2016 09:01

Hi, Thanks for sharing the stuff. Yes you are right. to convert features/labels from categorical to Numeric it requires OneHotEncoder, StringIndexer. But I didnt got any source how to do it in Pyspark. What I did is import os from pyspark import SparkContext from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.linalg import DenseVector os.environ[\”SPARK_HOME\”] = \”/home/ajadhav/Documents/spark-1.6.0-bin-hadoop2.6\” sc = SparkContext() documents = sc.textFile(\”/media/ajadhav/New Volume/Downloads/iris.csv\”) csv_data = documents.map(lambda x: x.split(\”,\”)) header = csv_data.first() csv_data = csv_data.filter(lambda x: x!= header) a = csv_data.map(lambda x: LabeledPoint(x[4], DenseVector(x[1:4]))) print a.first() >>>(0.2,[5.1,3.5,1.4]) But If i take categorical label a = csv_data.map(lambda x: LabeledPoint(x[5], DenseVector(x[1:4]))) print a.first() ValueError: could… Read more »

Ajay
Ajay
Reply to  Ajay
February 18, 2016 12:08

Hey, I implemented your suggestions. It works for me. Thanks a lot. Keep posting. I like it very much. Nice. 1 thing i want to ask I ran my code in which i used sqlContext.read.load() it works fine on Terminal where I stated terminal using command ./ pyspark –packages com.databricks:spark-csv_2.10:1.2.0. But i tried same code on Eclipse Pydev also on Spyder. its not working because of error py4j.protocol.Py4JJavaError: An error occurred while calling o21.load. : java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv. Please find packages at http://spark-packages.org Can you tell me how to add this repository to Eclipse or tell… Read more »

Ajay
Ajay
Reply to  Christos - Iraklis Tsatsoulis
February 22, 2016 07:25

Cooool. It works for me.

Just Added
spark.driver.extraClassPath /home/ajadhav/.ivy2/cache/com.databricks/spark-csv_2.10/jars/spark-csv_2.10-1.2.0.jar:/home/ajadhav/.ivy2/cache/org.apache.commons/commons-csv/jars/commons-csv-1.1.jar
this command in spark-defaults.conf file. Thanks a lot. Thank you very much. No need to 1st step. Just edited spark-defaults.conf file.

1 more question.
For now My Response variable is at 6th position (Last Column)
So my syntax for Label Point is
parsedData = encoded.map(lambda x: LabeledPoint(x[6], DenseVector(x[1:5])))

But what if my response variable is at 3rd column. and my features are column number 1,2,4,5,6

parsedData = encoded.map(lambda x: LabeledPoint(x[3], DenseVector(x[? ?])))

How to do this ?????

Thanks in advance

Raghavendra
Raghavendra
Reply to  Ajay
July 8, 2016 20:16

Hi, I am facing same issue as
Failed to find data source: com.databricks.spark.csv. Please find packages at http://spark-packages.org.

Caused by: java.lang.ClassNotFoundException: com.databricks.spark.csv.DefaultSource

when i ran below :

ubuntu > spark-submit –class AutoSchemaDiscovery –master local[*] /home/user/IdeaProjects/SchemaAutoDiscover/target/scala-2.10/schemaautodiscover_2.10-1.0.jar

I added below in spark-defaults.conf file:

spark.driver.extraClassPath /home/user/.ivy2/cache/com.databricks/spark-csv_2.10/spark-csv_2.10-1.4.0.jar:/home/user/.ivy2/cache/org.apache.commons/commons-csv/commons-csv-1.1.jar

still i am getting same error. after adding that, should i need to do any thing. kindly,please tell me . do you have any other post regarding this.

Raghavendra
Raghavendra
Reply to  Christos - Iraklis Tsatsoulis
July 9, 2016 22:04

Thanks for reply. i have tried this “spark-submit…–packages com.databricks:spark-csv_2.10:1.4.0” at first, which works fine. But, with out this argument –packages com.databricks:spark-csv_2.10:1.4.0, its not working. then , i found your comment. as you mentioned, in 1st point, i checked that jar locations . i found that jars : spark-csv_2.10-1.4.0.jar & commons-csv-1.1.jar. As per 2nd point, i added in spark-defaults.conf file. but, it is not working in my case(using Scala 2.10, SBT 0.13.11, Spark 1.6.1). when i did ‘spark-submit’ with that –package argument , i found one more dependency called “com.univocity#univocity-parsers;1.5.1”. i added that also to spark-defaults.conf file. but not working. i… Read more »

ebisa
ebisa
June 15, 2016 11:56

Thanks for this tutorial. It has worked for me very well. I have one question though. Each entry of one of the columns of my CSV file has multiple lines, and the csv reader assumes them as different rows. I would like to know how I can read multiple lines as single column element. Any suggestion?

Ajay Jadhav
Ajay Jadhav
Reply to  ebisa
June 15, 2016 13:06

Its working properly here this is my csv file having 2 columns ID Subject 1 Thanks for this tutorial. It has worked for me very well. 2 I have one question though. Each entry of one of the columns of my CSV file has multiple lines 3 and the csv reader assumes them as different rows. I would like to know how I can read multiple lines as single >>> from pyspark.sql import SQLContext >>> from pyspark.sql.types import * >>> sqlContext = SQLContext(sc) >>> df = sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’true’).load(‘/home/ajadhav/Desktop/temp.csv’) >>> df.show() +—+——————–+ | ID| Subject| +—+——————–+ | 1|Thanks for this t…| |… Read more »

ebisa
ebisa
Reply to  Ajay Jadhav
June 15, 2016 13:57

Thank you for your reply! My problem is not a single data spanning multiple lines though, but it is a data of a single column with newline characters. Can you suggest a fix for that?

Ajay Jadhav
Ajay Jadhav
Reply to  Christos - Iraklis Tsatsoulis
June 15, 2016 14:34

if possible send me your csv file or duplicate file on my mail id. avj171991@gmail.com
I will try on that

ebisa
ebisa
Reply to  Ajay Jadhav
June 16, 2016 17:17

Thanks Ajay for your offer. I have done some preprocessing on my CSV files to remove the new lines in each column element. Thanks again!

Arun
Arun
Reply to  ebisa
June 17, 2016 20:33

I have the same issue! One of my fields has multiple new line characters in it
I am giving below an example of one row:

Field1 | Field2 | Field3 | “String1 \n String2 \n String3”

So, the contents from Field1 to String1 are considered to be one row, String2 is considered as another row and String3 is considered as another row.

Is there any way to imply that one of the fields has new line characters in it?

trackback
July 6, 2016 17:33

[…] blog DataFrame Spark 1.5 from csv file – NodalPoint encourage to use the spark-csv library from […]

Paulo Moniz
Paulo Moniz
August 20, 2017 23:40

Thank you for your nice blog! I would like to receive your help. I am using cloudera VM 5.10, Spark 1.6.0 Python 3.5.1 and am trying to do this exercise using the jupyter notebook.

It turns out that the command df = sqlContext.read.load (‘file: ///home/cloudera/Downloads/nyctaxisub.csv’,format=’com.databricks.spark.csv’, header = ‘true’, inferSchema = ‘ True
This error is displayed in the jupyter notebook, however the prompt cloudera works perfectly well.

I already researched and could not get any tips that mention some kind of configuration for the jupyter notebook.

Congratulations for the post.

Thank you very much in advance.

jlayel
jlayel
July 5, 2018 11:49

Please can someone guess with me why this error “Path does not exist: file:/home/vagrant/data/nyctaxisub.csv;'” where can i find this file

Thanasis
Thanasis
August 17, 2018 17:12

Thank you for the post. Helped a lot.