Sunday, November 11, 2018

(T) Using Apache Spark in ETL tasks - Part I

Using Apache Spark in ETL's tasks
In this post, I’ll show you how to use Apache Spark to do a basic ETL (extract, load and transform) operation. Apache Spark™ is a fast and general engine for large-scale data processing. And you can use it in your BI or Big Data Projects.
If you want to know more about it, please visit the site.
Let’s start with the Spark Shell in Scala:
$ cd $SPARK_HOME
$ spark-shell
Let’s check if the SparkContext was initialized:
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@733fae8
Using the spark-shell in Scala let’s load a file into a variable with the help of SparkContext:
scala> var lines = sc.textFile("/home/df/Temp/alerts_news.csv")
lines: org.apache.spark.rdd.RDD[String] = /home/df/Temp/alerts_news.csv MapPartitionsRDD[1] at textFile at :24
SparkContext is a class defined in the Spark library. It is the main entry point into the Spark library. It represents a connection to a Spark cluster. It is also required to create other important objects provided by the Spark API. A Spark application must create an instance of the SparkContext class. Currently, an application can have only one active instance of SparkContext. To create another instance, it must first stop the active instance. The SparkContext class provides multiple constructors. The simplest one does not take any arguments.
The textFile method creates an RDD from a text file. It can read a file or multiple files in a directory stored on a local file system, HDFS, Amazon S3, or any other Hadoop-supported storage system. It returns an RDD of Strings, where each element represents a line in the input file.
After we’ll split each line into “pieces” of separated word:
scala> var pieces = lines.flatMap(s => s.split(" "))
pieces: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at :26
The flatMap method of a Scala collection is similar to the map. It takes a function as input, applies it to each element in a collection, and returns another collection as a result. However, the function passed to flatMap generates a collection for each element in the original collection. Thus, the result of applying the input function is a collection of collections. If the same input function were passed to the map method, it would return a collection of collections. The flatMap method instead returns a flattened collection.
Append one with each word:
scala> var tokens = pieces.map(s => (s,1))
tokens: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :28
The map method of a Scala collection applies its input function to all the elements in the collection and returns another collection. The returned collection has the exact same number of elements as the collection on which map was called. However, the elements in the returned collection need not be of the same type as that in the original collection.
Let’s calculate the frequency of each word by adding all the one’s against one word:
scala> var sumEachWord = tokens.reduceByKey((a, b) => a + b)
sumEachWord: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :30
The higher-order reduceByKey method takes an associative binary operator as input and reduces values with the same key to a single value using the specified binary operator. A binary operator takes two values as input and returns a single value as output. An associative operator returns the same result regardless of the grouping of the operands. The reduceByKey method can be used for aggregating values by key.
Let’s check out the output:
scala> sumEachWord.collect()
res1: Array[(String, Int)] = Array((PÇA,2), (VERBAIS,1), (TJRJ,4), (PERITOS,1), (11DP,1), (RESISTIU,2), (HAVER,1), (GOVERNO,21), (G1,,1), (ALEGAÇÕES,1), (SOB,2), (POSSE:,1), (ASSEMBLEIA,1), (EXCLUSIVO,1), (INFECÇÃO,1), (AUTORIZOU,1), (RIO,,3), (TUDO,3), (CARGA,1), (APRESENTOU,1), (LAGOA,,1), (AÇÃO,15), (PULMÃO,,1), (DETIDOS,1), (LANÇADO,1), (DIÁRIO,1), (TRATAR,1), (RODOANEL,2), (QUARTO,3), (AULAS,18), (IRMÃO,5), (CONCESSÃO,1), (DELE,2), (INDICIADO,2), (DETRAN,2), (ONIBUS,1), (CONTROLARAM,1), (MÉDIA,2), (AMANHÃ,,12), (-É,1), (AMANHÃ",1), (LACERDA,,1), (IDOSO,1), (7,1,1), (RECONHECER,1), (OCEÂNICO,1), (PF,,1), (INFORMA,7), (ESCORIAÇÕES,1), (31,5), (BARRA,8), (A.I.,1), (ANDERSON,1), (33,2), (CORRIGINDO,1), (RECREATIVO,1), (8H,5), (MARCO,3), (UPA,2), (METROS,2), (FAZENDINHA,,1), (NITERÓI,9)...
The collect method returns the elements in the source RDD as an array. This method should be used with caution since it moves data from all the worker nodes to the driver program. It can crash the driver program if called on a very large RDD.
Instead of print output on console, let’s save the output to a file in the local file system:
scala> sumEachWord.saveAsTextFile("/home/df/Temp/scala_news.csv")
The saveAsTextFile method saves the elements of the source RDD in the specified directory on any Hadoop-supported file system. Each RDD element is converted to its string representation and stored as a line of text.
Now let’s check out the file in the path and to confirm the content:
$ cd /home/df/Temp/scala_news.csv/
$ ls -la
$ head part-00000
$ less part-00000
That’s it!
Well, as you can see in a few minutes and some lines we did an ETL task:
  • Extract [E] - we extracted the information from a text file, using a SparkContext.
  • Transform [T] - we transformed the data into a key-value pair using the methods: flatMap, map, and reduceByKey.
  • Load [L] - we loaded the final result on the console and then we did the persistence, saving it to a local file system, using the methods: collect and saveAsTextFile.
Apache Spark is a computer platform designed to be fast and general-purpose, and easy to use. The ease of use with Spark enables you to quickly pick it up using simple APIs for Scala, Python, and Java.
Now, let’s do the same operation, but at this time we will use the spark-shell in Python instead of Scala as the programming language.
First of all, let’s start the Spark Shell in Python:
$ cd $SPARK_HOME
$ pyspark
Let’s check if the SparkContext was initialized:
>>> sc

Using the spark-shell let’s load a file into a variable with the help of SparkContext:
>>> lines = sc.textFile("/home/df/Temp/alerts_news.csv")
After we’ll split each line into “pieces” of separated word:
>>> pieces = lines.flatMap(lambda s: s.split(" "))
Append one with each word:
>>> tokens = pieces.map(lambda s: (s,1))
Let’s calculate the frequency of each word by adding all the ones against one word:
>>> sumEachWord = tokens.reduceByKey(lambda a, b: a + b)
Let’s check out the output:
>>> sumEachWord.collect()
Instead of print output on console, let’s save the output to a file in the local file system:
>>> sumEachWord.saveAsTextFile("/home/df/Temp/python_news.csv")
Now let’s check out the file in the path and to confirm the content:
$ cd /home/df/Temp/python_news.csv/
$ ls -la
$ head part-00000
$ less part-00000
As you can see, both programming languages are very similar in doing the same task. But, we can behold some differences between usage of Scala and Python for Spark, are:
  • In Scala, variables are initialized using var keyword. In Python is not necessary.
  • We use “anonymous functions” the different forms either in Python or Scala. It can be used in an application just like a string literal. It can be passed as an input to a higher-order method or function. It can also be assigned to a variable. A function literal is defined with input parameters in parenthesis, followed by a right arrow and the body of the function. The body of a function literal is enclosed in optional curly braces. An example is shown next.
Scala:
scala> def make_incrementor(n: Int) = (x: Int) => x + n

scala> val f = make_incrementor(42)
Python:
>>> def make_incrementor(n):

... return lambda x: x + n

...

>>> f = make_incrementor(42)
We can do the same tasks even in batch mode with “spark-submit” script.
A Spark application can be launched using the spark-submit script that comes with Spark. It is available in the SPARK_HOME/bin directory. The spark-submit script can be used for both running a Spark application on your development machine and deploying it on a real Spark cluster. It provides a unified interface for running a Spark application with any Spark-supported cluster manager.
However, when running in batch mode the SparkContext object will have to be initialized by the programmer. It is not available by default as it is an interactive shell.
Next steps:
You can extend this tutorial and improve your results by:
  • Load data to the database, for example, MySQL.
  • Explore others transformation methods.
  • Explore others actions methods.
For example, what are the words most popular?
With this tool, you can build upon your work and improve your results.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Lastest Posts

(T) Using shared variables and key-value pairs