How To Create Dataframe From Rdd In Pyspark
5 Ways to add a new column in a PySpark Dataframe
And, all of them are useful
Too much data is getting generated day by day.
Although sometimes we can manage our big data using tools like Rapids or Parallelization , Spark is an excellent tool to have in your repertoire if you are working with Terabytes of data.
In my last post on Spark, I explained how to work with PySpark RDDs and Dataframes.
Although this post explains a lot on how to work with RDDs and basic Dataframe operations, I missed quite a lot when it comes to working with PySpark Dataframes.
And it is only when I required more functionality that I read up and came up with multiple solutions to do one single thing.
How to create a new column in spark?
Now, this might sound trivial, but believe me, it isn't. With so much you might want to do with your data, I am pretty sure you will end up using most of these column creation processes in your workflow. Sometimes to utilize Pandas functionality, or occasionally to use RDDs based partitioning or sometimes to make use of the mature python ecosystem.
This post is going to be about — "Multiple ways to create a new column in Pyspark Dataframe."
If you have PySpark installed, you can skip the Getting Started section below.
Getting Started with Spark
I know that a lot of you won't have spark installed in your system to try and learn. But installing Spark is a headache of its own.
Since we want to understand how it works and work with it, I would suggest that you use Spark on Databricks here online with the community edition. Don't worry, it is free, albeit fewer resources, but that works for us right now for learning purposes.
Once you register and login will be presented with the following screen.
You can start a new notebook here.
Select the Python notebook and give any name to your notebook.
Once you start a new notebook and try to execute any command, the notebook will ask you if you want to start a new cluster. Do it.
The next step will be to check if the sparkcontext is present. To check if the sparkcontext is present, you have to run this command:
sc
This means that we are set up with a notebook where we can run Spark.
Data
Here, I will work on the Movielens ml-100k.zip dataset. 100,000 ratings from 1000 users on 1700 movies. In this zipped folder, the file we will specifically work with is the rating file. This filename is kept as "u.data"
If you want to upload this data or any data, you can click on the Data tab in the left and then Add Data by using the GUI provided.
We can then load the data using the following commands:
ratings = spark.read.load("/FileStore/tables/u.data",format="csv", sep="\t", inferSchema="true", header="false") ratings = ratings.toDF(*['user_id', 'movie_id', 'rating', 'unix_timestamp'])
Here is how it looks:
ratings.show()
Ok, so now we are set up to begin the part we are interested in finally. How to create a new column in PySpark Dataframe?
1. Using Spark Native Functions
The most pysparkish way to create a new column in a PySpark DataFrame is by using built-in functions. This is the most performant programmatical way to create a new column, so this is the first place I go whenever I want to do some column manipulation.
We can use .withcolumn
along with PySpark SQL functions to create a new column. In essence, you can find String functions, Date functions, and Math functions already implemented using Spark functions. We can import spark functions as:
import pyspark.sql.functions as F
Our first function, the F.col
function gives us access to the column. So if we wanted to multiply a column by 2, we could use F.col
as:
ratings_with_scale10 = ratings.withColumn("ScaledRating", 2*F.col("rating")) ratings_with_scale10.show()
We can also use math functions like F.exp
function:
ratings_with_exp = ratings.withColumn("expRating", 2*F.exp("rating")) ratings_with_exp.show()
There are a lot of other functions provided in this module, which are enough for most simple use cases. You can check out the functions list here.
2. Spark UDFs
Sometimes we want to do complicated things to a column or multiple columns. This could be thought of as a map operation on a PySpark Dataframe to a single column or multiple columns. While Spark SQL functions do solve many use cases when it comes to column creation, I use Spark UDF whenever I want to use the more matured Python functionality.
To use Spark UDFs, we need to use the F.udf
function to convert a regular python function to a Spark UDF. We also need to specify the return type of the function. In this example the return type is StringType()
import pyspark.sql.functions as F
from pyspark.sql.types import * def somefunc(value):
if value < 3:
return 'low'
else:
return 'high' #convert to a UDF Function by passing in the function and return type of function udfsomefunc = F.udf(somefunc, StringType()) ratings_with_high_low = ratings.withColumn("high_low", udfsomefunc("rating")) ratings_with_high_low.show()
3. Using RDDs
Sometimes both the spark UDFs and SQL Functions are not enough for a particular use-case. You might want to utilize the better partitioning that you get with spark RDDs. Or you may want to use group functions in Spark RDDs. You can use this one, mainly when you need access to all the columns in the spark data frame inside a python function.
Whatever the case be, I find this way of using RDD to create new columns pretty useful for people who have experience working with RDDs that is the basic building block in the Spark ecosystem.
The process below makes use of the functionality to convert between Row
and pythondict
objects. We convert a row object to a dictionary. Work with the dictionary as we are used to and convert that dictionary back to row again.
import math
from pyspark.sql import Row def rowwise_function(row):
# convert row to dict:
row_dict = row.asDict()
# Add a new key in the dictionary with the new column name and value.
row_dict['Newcol'] = math.exp(row_dict['rating'])
# convert dict to row:
newrow = Row(**row_dict)
# return new row
return newrow # convert ratings dataframe to RDD
ratings_rdd = ratings.rdd
# apply our function to RDD
ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row))
# Convert RDD Back to DataFrame
ratings_new_df = sqlContext.createDataFrame(ratings_rdd_new) ratings_new_df.show()
4. Pandas UDF
This functionality was introduced in the Spark version 2.3.1. And this allows you to use pandas functionality with Spark. I generally use it when I have to run a groupby operation on a Spark dataframe or whenever I need to create rolling features and want to use Pandas rolling functions/window functions.
The way we use it is by using the F.pandas_udf
decorator. We assume here that the input to the function will be a pandas data frame. And we need to return a pandas dataframe in turn from this function.
The only complexity here is that we have to provide a schema for the output Dataframe. We can make that using the format below.
# Declare the schema for the output of our function
outSchema = StructType([StructField('user_id',IntegerType(),True),StructField('movie_id',IntegerType(),True),StructField('rating',IntegerType(),True),StructField('unix_timestamp',IntegerType(),True),StructField('normalized_rating',DoubleType(),True)]) # decorate our function with pandas_udf decorator
@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.rating
v = v - v.mean()
pdf['normalized_rating'] =v
return pdf rating_groupwise_normalization = ratings.groupby("movie_id").apply(subtract_mean) rating_groupwise_normalization.show()
We can also make use of this to train multiple individual models on each spark node. For that, we replicate our data and give each replication a key and some training params like max_depth, etc. Our function then takes the pandas Dataframe, runs the required model, and returns the result. The structure would look something like below.
# 0. Declare the schema for the output of our function
outSchema = StructType([StructField('replication_id',IntegerType(),True),StructField('RMSE',DoubleType(),True)]) # decorate our function with pandas_udf decorator
@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)
def run_model(pdf):
# 1. Get hyperparam values
num_trees = pdf.num_trees.values[0]
depth = pdf.depth.values[0]
replication_id = pdf.replication_id.values[0]
# 2. Train test split
Xtrain,Xcv,ytrain,ycv = train_test_split.....
# 3. Create model using the pandas dataframe
clf = RandomForestRegressor(max_depth = depth, num_trees=num_trees,....)
clf.fit(Xtrain,ytrain)
# 4. Evaluate the model
rmse = RMSE(clf.predict(Xcv,ycv)
# 5. return results as pandas DF
res =pd.DataFrame({'replication_id':replication_id,'RMSE':rmse})
return resresults = replicated_data.groupby("replication_id").apply(run_model)
Above is just an idea and not a working code. Though it should work with minor modifications.
5. Using SQL
For people who like SQL, there is a way even to create columns using SQL. For this, we need to register a temporary SQL table and then use simple select queries with an additional column. One might also use it to do joins.
ratings.registerTempTable('ratings_table')
newDF = sqlContext.sql('select *, 2*rating as newCol from ratings_table')
newDF.show()
Conclusion
And that is the end of this column(pun intended)
Hopefully, I've covered the column creation process well to help you with your Spark problems. If you need to learn more of spark basics, take a look at:
You can find all the code for this post at the GitHub repository or the published notebook on databricks.
Also, if you want to learn more about Spark and Spark DataFrames, I would like to call out an excellent course on Big Data Essentials , which is part of the Big Data Specialization provided by Yandex.
Thanks for the read. I am going to be writing more beginner-friendly posts in the future too. Follow me up at Medium or Subscribe to my blog to be informed about them. As always, I welcome feedback and constructive criticism and can be reached on Twitter @mlwhiz
Also, a small disclaimer — There might be some affiliate links in this post to relevant resources, as sharing knowledge is never a bad idea.
How To Create Dataframe From Rdd In Pyspark
Source: https://towardsdatascience.com/5-ways-to-add-a-new-column-in-a-pyspark-dataframe-4e75c2fd8c08
Posted by: dixonaname1987.blogspot.com
0 Response to "How To Create Dataframe From Rdd In Pyspark"
Post a Comment