Dataframes from CSV files in Spark 1.5: automatic schema extraction, neat summary statistics, & elementary data exploration

Christos - Iraklis Tsatsoulis Big Data, Spark 25 Comments

In a previous post, we glimpsed briefly at creating and manipulating Spark dataframes from CSV files. In the couple of months since, Spark has already gone from version 1.3.0 to 1.5, with more than 100 built-in functions introduced in Spark 1.5 alone; so, we thought it is a good time for revisiting the subject, this time also utilizing the external package spark-csv, provided by Databricks. We’ll use the same CSV file with header as in the previous post, which you can download here.

In order to include the spark-csv package, we must start pyspark with the folowing argument:

$ pyspark --packages com.databricks:spark-csv_2.10:1.2.0

If this is the first time we use it, Spark will download the package from Databricks’ repository, and it will be subsequently available for inclusion in future sessions. So, after the numerous INFO messages, we get the welcome screen, and we proceed to import the necessary modules:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0
      /_/

Using Python version 2.7.6 (default, Mar 22 2014 22:59:38)
SparkContext available as sc, HiveContext available as sqlContext.
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

Automatic schema extraction

Since Spark 1.4, a new (and still experimental) interface class pyspark.sql.DataFrameReader has been introduced, specifically for loading dataframes from external storage systems. The syntax shown in the spark-csv provided examples for loading a CSV file is:

>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('file:///home/vagrant/data/nyctaxisub.csv')

However, we found this syntax rather complicated and unintuitive; and despite the sparsity of the relevant documentation, we were able to use the following, much clearer syntax, where we have also set the flag for inferring the schema (which slows down the process, as it requires one extra pass over the data, and is false by default):

>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
>>> df.count()
249999

Recall from the previous post that our CSV file has 250,000 rows, including the header; hence, our record count is indeed correct.
Let’s check how successful was the automatic schema extraction:

>>> df.dtypes
[('_id', 'string'),
 ('_rev', 'string'),
 ('dropoff_datetime', 'string'),
 ('dropoff_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'string'),
 ('pickup_latitude', 'double'),
 ('pickup_longitude', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

Well, not bad at all! It seems that, apart from the two datetime columns, all other column types have been recognized correctly.

We are already a long way since the previous post: recall that there, we had to prepare the schema (column headers and types) of our dataframe using a long (and possibly error-prone) “manual” chain of .map operations in the initial RDD, including special parse settings for each column, and without any handy reference to the column names (which had to be manipulated separately). And there are more good news: although we still have some columns with incorrect data types, there is now a much more straightforward and easier way for such operations, directly on the newly created dataframe (instead of the “raw” RDD). In the following snippet, we will use the pyspark.sql.DataFrame class methods withColumn and withColumnRenamed in order to:

  1. Correctly set the types for the two datetime columns
  2. Rename the first two columns, by removing the annoying leading underscores
  3. Rename also the longitude/latitude columns to long/lat, for brevity:
>>> df = (df.withColumn('dropoff_datetime', df.dropoff_datetime.cast('timestamp'))
       .withColumn('pickup_datetime', df.pickup_datetime.cast('timestamp'))
       .withColumnRenamed('_id', 'id')
       .withColumnRenamed('_rev', 'rev')
       .withColumnRenamed('dropoff_latitude', 'dropoff_lat')
       .withColumnRenamed('dropoff_longitude', 'dropoff_long')
       .withColumnRenamed('pickup_latitude', 'pickup_lat')
       .withColumnRenamed('pickup_longitude', 'pickup_long'))
>>> df.dtypes
[('id', 'string'),
 ('rev', 'string'),
 ('dropoff_datetime', 'timestamp'),
 ('dropoff_lat', 'double'),
 ('dropoff_long', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'timestamp'),
 ('pickup_lat', 'double'),
 ('pickup_long', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

Neat summary statistics

One of the new Spark SQL functions introduced in version 1.3.1 was describe(), providing a quick summary of the dataframe numeric columns. Let’s try it here:

>>> df.describe().show()
+-------+------------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+-----------------+
|summary|       dropoff_lat|      dropoff_long|   passenger_count|       pickup_lat|       pickup_long|          rate_code|    trip_distance|trip_time_in_secs|
+-------+------------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+-----------------+
|  count|            249999|            249999|            249999|           249999|            249999|             249999|           249999|           249999|
|   mean| 39.94324114547672| -72.5620767207705|1.7390429561718246|39.98826507731672|-72.57300293825547| 1.0568522274089096|4.790398561594238|1319.435301741207|
| stddev|12.606727776625927|11.520652216641365|1.4045560098412289|5.520016838901558|10.065188912895966|0.30606569862073096|4.248816817765342|  672.14462665616|
|    min|        -3481.1343|        -1814.2775|                 0|              0.0|        -98.150002|                  0|              0.0|              751|
|    max|         404.95761|            2084.3|                 6|        73.989571|               0.0|                  6|             86.3|            10680|
+-------+------------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+-----------------+

Although the function is definitely a welcome addition, especially for exploratory data analysis, the output definitely feels cluttered, due to the unnecessary high precision displayed, even for integer-type columns (passenger_count and trip_time_in_secs). Could we have it in a more concise form?
It turns out that the output of df.describe() is itself a Spark dataframe with string-type columns:

>>> type(df.describe())
pyspark.sql.dataframe.DataFrame
>>> df.describe().dtypes
[('summary', 'string'),
 ('dropoff_lat', 'string'),
 ('dropoff_long', 'string'),
 ('passenger_count', 'string'),
 ('pickup_lat', 'string'),
 ('pickup_long', 'string'),
 ('rate_code', 'string'),
 ('trip_distance', 'string'),
 ('trip_time_in_secs', 'string')]

Moreover, it is by definition a small Spark dataframe, i.e. one that we can safely convert to pandas format and manipulate further as we wish, without fearing that it may not fit in the main memory.
Given the above, and keeping in mind that we only want to intervene on the mean and stddev fields of the summary description, it is not difficult to come up with a simple function providing a prettier summary:

def prettySummary(df):
    """ Neat summary statistics of a Spark dataframe
    Args:
        pyspark.sql.dataframe.DataFrame (df): input dataframe
    Returns:
        pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
    """
    import pandas as pd
    temp = df.describe().toPandas()
    temp.iloc[1:3,1:] = temp.iloc[1:3,1:].convert_objects(convert_numeric=True)
    pd.options.display.float_format = '{:,.2f}'.format
    return temp
>>> prettySummary(df)
  summary dropoff_lat dropoff_long passenger_count pickup_lat pickup_long  \
0   count      249999       249999          249999     249999      249999
1    mean       39.94       -72.56            1.74      39.99      -72.57
2  stddev       12.61        11.52            1.40       5.52       10.07
3     min  -3481.1343   -1814.2775               0        0.0  -98.150002
4     max   404.95761       2084.3               6  73.989571         0.0

  rate_code trip_distance trip_time_in_secs
0    249999        249999            249999
1      1.06          4.79          1,319.44
2      0.31          4.25            672.14
3         0           0.0               751
4         6          86.3             10680

From the summary statistics displayed it is apparent already that we have a data quality issue, as the min and max values for several columns are obviously unrealistic (and we have not even touched the non-numeric variables yet!).

Elementary data exploration

We can explore further roughly how many records exist with unrealistic values, for example:

>>> df.filter(df.dropoff_lat < 0).count() 
3 
>>> df.filter(df.dropoff_lat < 10).count() 
4715 
>>> df.filter(df.trip_distance == 0.0).count()
290
>>> df.filter(df.dropoff_long > -50).count()
4751

Although not apparent from the documentation, numeric column indices work, too; here is trip_distance (column #13):

>>> df.filter(df[13] == 0.0).count()
290

As a result of an issue we raised recently, expressions like the following will not work from Spark 1.4.1 on:

>>> df.filter(df.dropoff_lat < 10 and df.dropoff_long > -50).count()
[...]
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

The error message guides us to use & instead of and, but this simple replacement will again lead to a different error (not shown); it turns out that we also need to use an extra set of parentheses around our conditions:

>>> df.filter( (df.dropoff_lat < 10) & (df.dropoff_long > -50) ).count()
4710
>>> df.filter(df.dropoff_lat < 10).filter(df.dropoff_long > -50).count()  # confirm the above result
4710

Max and min column values can be computed for timestamp data, too:

>>> from pyspark.sql.functions import *
>>> df.select(max("pickup_datetime")).show()
+--------------------+
|max(pickup_datetime)|
+--------------------+
|2013-11-26 23:46:...|
+--------------------+
>>> df.select(max("dropoff_datetime")).show()
+---------------------+
|max(dropoff_datetime)|
+---------------------+
| 2013-11-26 23:59:...|
+---------------------+
>>> df.select(min("pickup_datetime")).show()
+--------------------+
|min(pickup_datetime)|
+--------------------+
|2013-01-10 21:27:...|
+--------------------+
>>> df.select(min("dropoff_datetime")).show()
+---------------------+
|min(dropoff_datetime)|
+---------------------+
| 2013-01-11 00:00:...|
+---------------------+

Knowing that our data are between January and December 2013, the above values indicate that, apart from having no records for December, the max/min limits for our timestamp columns look OK.

Let’s do another data quality check: we expect that, in each record, pickup_datetime should be strictly less than dropoff_datetime. To check if this is the case, we will first create a new boolean column, pickup_1st, based on the two datetime columns (creating new columns from existing ones in Spark dataframes is a frequently raised question – see Patrick’s comment in our previous post); then, we will check in how many records this is false (i.e. dropoff seems to happen before pickup). And although not necessary, we will also grab the opportunity to demonstrate how to create a new dataframe from selected columns of an existing one. So, our steps will be:

  1. Create a new dataframe, df_dates, consisting only from the datetime columns of the existing one, using select()
  2. Create a new boolean column in the df_dates dataframe, using withColumn()
  3. Check in how many records this boolean column is False, indicating a data quality issue:
>>> df_dates = df.select(df['pickup_datetime'], df['dropoff_datetime'])
>>> df_dates.show(5, truncate=False)
+---------------------+---------------------+
|pickup_datetime      |dropoff_datetime     |
+---------------------+---------------------+
|2013-01-11 21:48:00.0|2013-01-11 22:03:00.0|
|2013-01-11 04:07:00.0|2013-01-11 04:28:00.0|
|2013-01-11 21:46:00.0|2013-01-11 22:02:00.0|
|2013-01-11 09:44:00.0|2013-01-11 10:03:00.0|
|2013-01-11 21:48:00.0|2013-01-11 22:02:00.0|
+---------------------+---------------------+
only showing top 5 rows

>>> df_dates = df_dates.withColumn('pickup_1st', df_dates.pickup_datetime < df_dates.dropoff_datetime) 
>>> df_dates.filter(~df_dates.pickup_1st).count()  # '~' for 'not'
0

That is, we indeed have no records with this data quality issue.

We wouldn’t like to close this post without mentioning, however briefly, at least one of the new Spark SQL functions introduced in version 1.5; so, let’s demonstrate quickly the month() function, which extracts the month of a given date as an integer (a whole bunch of functions were introduced in Spark 1.5 to handle datetime data); here we will use it in conjunction with the groupBy() function, in order to get the number of records per month:

>>> df.groupBy(month(df.dropoff_datetime)).count().show()
+-----------------------+-----+
|month(dropoff_datetime)|count|
+-----------------------+-----+
|                      1|91075|
|                      2|64917|
|                     11|94007|
+-----------------------+-----+

Well, it certainly seems that the advertised description of our data was not exactly accurate: in fact, we only have data for three months (January, February, and November) of 2013…!

A note on show() versus collect()

Dataframe method show(), introduced in Spark 1.3, is meant to be used in an interactive fashion; we cannot use it if we want to extract the value programmatically (e.g. in order to use it further downstream in an automated pipeline), as it returns a value of type None:

>>> type(df.select(max("pickup_datetime")).show() )
+--------------------+
|max(pickup_datetime)|
+--------------------+
|2013-11-26 23:46:...|
+--------------------+

<type 'NoneType'>

In order to store and use the returned value programmatically, we have to use the dataframe method collect(), which returns all the records as a list of type Row:

>>> df.select(max("pickup_datetime")).collect()
[Row(max(pickup_datetime)=datetime.datetime(2013, 11, 26, 23, 46, 38))]
>>> type(df.select(max("pickup_datetime")).collect())
<type 'list'>

Now, since we have an 1-element list of an 1-element Row, we can simply get the first (and only) element, as we would do for any similar situation in pure Python:

>>> max_pickup = df.select(max("pickup_datetime")).collect()[0][0]
>>> max_pickup
datetime.datetime(2013, 11, 26, 23, 46, 38)
>>> from datetime import *
>>> max_pickup < datetime(2013,12,31) # is it before end of year 2013?
True

Perhaps not unexpectedly, the same exact situation holds also for the SQL-like interface (remember that table and column names are case-sensitive):

>>> df.registerTempTable("taxi")
>>> sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").show()
+---------+------+
|vendor_id|   _c1|
+---------+------+
|      CMT|114387|
|      VTS|135612|
+---------+------+
>>> sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").collect()
[Row(vendor_id=u'CMT', _c1=114387), Row(vendor_id=u'VTS', _c1=135612)]
>>> type( sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").collect() )
<type 'list'>
>>> sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").withColumnRenamed('_c1', 'count').show()
+---------+------+
|vendor_id| count|
+---------+------+
|      CMT|114387|
|      VTS|135612|
+---------+------+

where the last command shows how methods like withColumnRenamed can be used in this context, too.

Summary

Summarizing, we have touched on several things in this post regarding Spark dataframes; we have demonstrated how to:

  • Automatically extract (part of) the schema of a CSV file using the spark-csv package
  • Change the data type of selected columns
  • Rename selected columns
  • Create a more concise summary of a dataframe
  • Create a new dataframe from selected columns of an existing one
  • Create new columns in a dataframe, based on the existing ones
  • Extract summary statistics programmatically

As always, comments and feedback most welcome. Stay tuned as we will continue our Spark exploration.

Christos - Iraklis Tsatsoulis

Christos - Iraklis Tsatsoulis

Christos - Iraklis is one of our resident Data Scientists. He holds advanced graduate degrees in applied mathematics, engineering, and computing. He has been awarded both Chartered Engineer and Chartered Manager status in the UK, as well as Master status in Kaggle.com due to "consistent and stellar results" in predictive analytics contests.
Christos - Iraklis Tsatsoulis

Latest posts by Christos - Iraklis Tsatsoulis (see all)

25
Leave a Reply

avatar
7 Comment threads
18 Thread replies
4 Followers
 
Most reacted comment
Hottest comment thread
9 Comment authors
ThanasisjlayelPaulo MonizRaghavendraArun Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
Tristan
Guest
Tristan

Thank you for your nice blog! I really enjoy this kind of hands-on practical information.

The reason for needing parentheses for compound comparisons within filter operations is operator precedence. This is familiar from pandas, which has the same syntax and hence we run into the same issue there. When you use a bitwise \’&\’ to combine `a > b & c (b & c) < d`.

Best,

Ajay
Guest
Ajay

Hello, Nice Blog. I want to know how to handle categorical Features/Labels in MLlib. For example I want to do classification on IRIS data. Can you please tell me how to do it

ebisa
Guest
ebisa

Thanks for this tutorial. It has worked for me very well. I have one question though. Each entry of one of the columns of my CSV file has multiple lines, and the csv reader assumes them as different rows. I would like to know how I can read multiple lines as single column element. Any suggestion?

Ajay Jadhav
Guest
Ajay Jadhav

Its working properly here this is my csv file having 2 columns ID Subject 1 Thanks for this tutorial. It has worked for me very well. 2 I have one question though. Each entry of one of the columns of my CSV file has multiple lines 3 and the csv reader assumes them as different rows. I would like to know how I can read multiple lines as single >>> from pyspark.sql import SQLContext >>> from pyspark.sql.types import * >>> sqlContext = SQLContext(sc) >>> df = sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’true’).load(‘/home/ajadhav/Desktop/temp.csv’) >>> df.show() +—+——————–+ | ID| Subject| +—+——————–+ | 1|Thanks for this t…| |… Read more »

ebisa
Guest
ebisa

Thank you for your reply! My problem is not a single data spanning multiple lines though, but it is a data of a single column with newline characters. Can you suggest a fix for that?

Arun
Guest
Arun

I have the same issue! One of my fields has multiple new line characters in it
I am giving below an example of one row:

Field1 | Field2 | Field3 | “String1 \n String2 \n String3”

So, the contents from Field1 to String1 are considered to be one row, String2 is considered as another row and String3 is considered as another row.

Is there any way to imply that one of the fields has new line characters in it?

trackback

[…] blog DataFrame Spark 1.5 from csv file – NodalPoint encourage to use the spark-csv library from […]

Paulo Moniz
Guest
Paulo Moniz

Thank you for your nice blog! I would like to receive your help. I am using cloudera VM 5.10, Spark 1.6.0 Python 3.5.1 and am trying to do this exercise using the jupyter notebook.

It turns out that the command df = sqlContext.read.load (‘file: ///home/cloudera/Downloads/nyctaxisub.csv’,format=’com.databricks.spark.csv’, header = ‘true’, inferSchema = ‘ True
This error is displayed in the jupyter notebook, however the prompt cloudera works perfectly well.

I already researched and could not get any tips that mention some kind of configuration for the jupyter notebook.

Congratulations for the post.

Thank you very much in advance.

jlayel
Guest
jlayel

Please can someone guess with me why this error “Path does not exist: file:/home/vagrant/data/nyctaxisub.csv;'” where can i find this file

Thanasis
Guest
Thanasis

Thank you for the post. Helped a lot.