I continue to share example codes related with my “Spark with Python” presentation. In my last blog post, I showed how we use RDDs (the core data structures of Spark). This time, I will use DataFrames instead of RDDs. DataFrames are distributed collection of data organized into named columns (in a structured way). They are similar to tables in relational databases. They also provide a domain specific language API to manipulate your distributed data, so it’s easier to use.
DataFrames are provided by Spark SQL module, and they are used as primarily API for Spark’s Machine Learning lib and structured streaming modules. Spark developers recommend to use DataFrames instead of RDDs, because the Catalyst (Spark Optimizer) will optimize your execution plan and generate better code to process the data.
I will use the “u.user” file file of MovieLens 100K Dataset again (like I did in my previous blog post), and calculate the number of men and women in the users data. I recommend you to compare these codes with the previous ones (which I used RDDs) to see the difference.
from pyspark import SparkContext from pyspark.sql import SparkSession sc = SparkContext.getOrCreate() spark = SparkSession(sc) spark.read.load( "users.csv", format="csv", sep="|" ) \ .toDF( "id","age","gender","occupation","zip" ) \ .groupby( "gender" ) \ .count().show() sc.stop()
Here are the step by step explanation of the code:
Line 1) Each Spark application needs a Spark Context object to access Spark APIs. So we start with importing SparkContext library.
Line 2) Because I’ll use DataFrames, I also import SparkSession library.
Line 4) I create a Spark Context object (as “sc”)
Line 5) I create a Spark Session object (based on Spark Context) – If you will run this code in PySpark client or in a notebook such as Zeppelin, you should ignore these steps (importing SparkContext, SparkSession and creating sc and spark objects), because the they are already defined. You should also ignore the last line because you don’t need to stop the Spark context.
Line 7) I use DataFrameReader object of spark (spark.read) to load CSV data. As you can see, I don’t need to write a mapper to parse the CSV file.
Line 8) If the CSV file has headers, DataFrameReader can use them but our sample CSV has no headers so I give the column names.
Line 9) Instead of reduceByKey, I use groupby method to group the data.
Line 10) I calculate the counts and add them to the grouped data, and show method prints the output.
Line 12) sc.stop will stop the context – as I said it’s not necessary for pyspark client or notebooks such as Zeppelin.
What if we want to group the users based on their occupations:
from pyspark import SparkContext from pyspark.sql import SparkSession sc = SparkContext.getOrCreate() spark = SparkSession(sc) spark.read.load( "users.csv", format="csv", sep="|" ) \ .toDF( "id","age","gender","occupation","zip" ) \ .where( "occupation != 'other'" ) \ .groupby( "occupation" ) \ .count().sort("count", ascending=0) \ .show() sc.stop()
Line 1-8,14) I already explained them in previous code.
Line 9) where is an alias for filter (but it sounds more SQL-ish so I use it). I use “where” method to select the rows which occupation is not others.
Line 10) I group the users based on occupation.
Line 11) Count them, and sort the output ascending based on counts.
Line 12) I use show to print the result
Please compare these scripts with RDD versions. You’ll see that using DataFrames are simpler especially when analyzing data. If you have Oracle Cloud account, you can download and import example #2 notebook to test the scripts.