Saturday, December 15, 2018

(T) Resilient Distributed Dataset - Part II

(T) Resilient Distributed Dataset - Part II

Resilient Distributed Dataset - Part II

Understanding actions

Let’s start this lesson looking at this code first.
//Creating the RDD
scala> val logFile = sc.textFile("/home/df/Temp/alerts_news.csv")
//Transformations
scala> val errors = logFile.filter(_.startsWith("ERROR"))
scala> val messages = errors.map(_.split(("\t")).map(r => r(1))
//Caching
scala> messages.cache()
//Actions
scala> messages.filter((_.contains("mysql")).count()
scala> messages.filter((_.contains("php")).count()
The goal here is to analyze some log files. The first line you load the log from the hadoop file system. The next two lines you filter out the messages within the log errors. Before you invoke some action on it, you tell it to cache the filtered dataset - it doesn’t actually cache it yet as nothing has been done up until this point. Then you do more filters to get specific error messages relating to mysql and php followed by the count action to find out how many errors were related to each of those filters.
Now let’s walk through each of the steps. The first thing that happens when you load in the text file is the data is partitioned into different blocks across the cluster. Then the driver sends the code to be executed on each block. In the example, it would be the various transformations and actions that will be sent out to the workers. Actually, it is the executor on each workers that is going to be performing the work on each block.
Then the executors read the HDFS blocks to prepare the data for the operations in parallel. After a series of transformations, you want to cache the results up until that point into memory. A cache is created. After the first action completes, the results are sent back to the driver. In this case, we’re looking for messages that relate to mysql. This is then returned back to the driver. To process the second action, Spark will use the data on the cache – it doesn’t need to go to the HDFS data again. It just reads it from the cache and processes the data from there.
Finally the results go back to the driver and we have completed a full cycle.
Here is a screencast with a bit demonstration of this explanation above.
Next post, we’ll deep in RDD Transformations, RDD Actions, and RDD Persistence.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

(T) Resilient Distributed Dataset - Part I

(T) Resilient Distributed Dataset - Part I

Resilient Distributed Dataset - Part I

This post will cover Resilient Distributed Dataset. After completing this lesson, you should be able to understand and describe Spark’s primary data abstraction, the RDD. You should know how to create parallelized collections from internal and external datasets. You should be able to use RDD opeartions such as Transformations and Actions. Finally, I’ll show you how to take advantage of Spark’s shared variables and key-value pairs.
Resilient Distributed Dataset (RDD) is Spark’s primary abstraction. RDD is a fault tolerant collection of elements that can be parallelized. In other words, they can be made to be operated on in parallel. They are also immutable. These are the fundamental primary units of data in Spark. When RDDs are created, a direct acyclic graph (DAG) is created. This type of operation is called transformations. Transformations makes updates to that graph, but nothing actually happens until some action is called. Actions are another type of operations.
We’ll talk more about this shortly. The notion here is that the graphs can be replayed on nodes that need to get back to the state it was before it went offline - thus providing fault tolerance. The elements of the RDD can be operated on in parallel across the cluster. Remember, transformations return a pointer to the RDD created and actions return values that comes from the action.
There are three methods for creating a RDD. You can parallelize an existing collection. This means that the data already resides within Spark and can now be operated on in parallel.
As an example, if you have an array of data, you can create a RDD out of it by calling the parallelized method. This method returns a pointer to the RDD. So this new distributed dataset can now be operated upon in parallel throughout the cluster.
The second method to create a RDD, is to reference a dataset. This dataset can come from any storage source supported by Hadoop such as HDFS, Cassandra, HBase, Amazon S3, etc.
The third method to create a RDD is from transforming an existing RDD to create a new RDD. In other
words, let’s say you have the array of data that you parallelized earlier. Now you want to filter out strings that are shorter than 20 characters. A new RDD is created using the filter method. And it’s important remember you that Spark supports text files, SequenceFiles and any other Hadoop InputFormat.
Here is a quick example of how to create an RDD from an existing collection of data.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@525b8922

scala> val myNumbers = (1 to 20000).toList
scala> val myRDD = sc.parallelize(myNumbers)
myRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> myRDD.filter(x => x % 1000 == 0)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:29
In the examples throughout the course, unless otherwise indicated, we’re going to be using Scala to show how Spark works. In the lab exercises, you will get to work with Python. So the first thing is to launch the Spark shell. This command is located under the $SPARK_HOME/bin directory.
Once the shell is up, let’s create some data with values from 1 to 20,000. Then, create an RDD from that data using the parallelize method from the SparkContext, shown as sc on the example above. This means that the data can now be operated on in parallel.
We will cover more on the SparkContext, the sc object that is invoking the parallelized function, in our programming lesson, so for now, just know that when you initialize a shell, the SparkContext, sc, is initialized for you to use.
The parallelize method returns a pointer to the RDD. Remember, transformations operations such as parallelize, only returns a pointer to the RDD. It actually won’t create that RDD until some action is invoked on it. With this new RDD, you can perform additional transformations or actions on it such as the filter transformation.
Another way to create a RDD is from an external dataset. In the example bellow, we are creating a RDD from a text file using the textFile method of the SparkContext object. You will see plenty more examples of how to create RDD throughout this course.
scala> sc
res4: org.apache.spark.SparkContext = org.apache.spark.SparkContext@525b8922

scala> val lines = sc.textFile(/home/lserra/Temp/alerts_news.csv”)
lines: org.apache.spark.rdd.RDD[String] = /home/lserra/Temp/alerts_news.csv MapPartitionsRDD[3] at textFile at <console>:24

scala> val lineLengths = lines.map(s => s.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26

scala> val totalLengths = lineLengths.reduce((a, b) => a + b)
totalLengths: Int = 86954

scala> val wordCounts = lines.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:26

scala> wordCounts.collect()
Here we go over some basic operations. You have seen how to load a file from an external dataset. This time, however, we are loading a file from the HDFS. Loading the file creates a RDD, which is only a pointer to the file. The dataset is not loaded into memory yet.
Nothing will happen until some action is called. The transformation basically updates the direct acyclic graph (DAG). So the transformation here is saying map each line s, to the length of that line. Then, the action operation is reducing it to get the total length of all the lines. When the action is called, Spark goes through the DAG and applies all the transformation up until that point, followed by the action and then a value is returned back to the caller.
A common example is a MapReduce word count. You first split up the file by words and then map each word into a key value pair with the word as the key, and the value of 1. Then you reduce by the key, which adds up all the value of the same key, effectively, counting the number of occurrences of that key. Finally, you call the collect() function, which is an action, to have it print out all the words and its occurrences.
Next post, you will see at a high level what happens when an action is executed.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

(T) Using Apache Spark in ETL tasks - Part II

(T) Using Apache Spark in ETL tasks - Part II

Why should I use Apache Spark?

In the last post I showed you how we can use Apache Spark in ETL’s tasks. And in this post I’ll try show you how can we use Apache Spark.
Hadoop ecosystem provide abstractions that allow users to treat a cluster of computers more like a single computer - to automatically split up files and distribute storage over many machines, to automatically divide work into smaller tasks and execute them in a distributed manner, and to automatically recover from failures.
MapReduce did a revolution computation over huge data sets by offering a simple model for writing programs that could execute in parellel across hundreds to thousands of machines. The MapReduce engine achieves near linear scalability - as the data size increases, we can throw more computers at it and see jobs complete in the same amount of time - and is resilient to the fact that failures that occur rarely on a single machine occur all the time on clusters of thousands. It breaks up work into small tasks and can gracefully accommodate task failures without compromising the job to which they belong.
Apache Spark maintains MapReduce’s linear scalability and fault tolerance, but extends it in three important ways:
  • rather than relying on a rigid map-then-reduce format, its engine can execute a more general directed acyclic graph (DAG) of operators. This means that, in situations where MapReduce must write out intermediate results to the distributed filesystem, Spark can pass them directly to the next step in the pipeline.
  • it complements this capability with a rich set of transformations that enable users to express computation more naturally. It has a strong developer focus and streamlined API that can represent complex pipelines in a few lines of code.
  • extends its predecessors with in-memory processing. Its Resilient Distrituted Dataset (RDD) abstraction enables developers to materialize any point in a processing pipeline into memory across the cluster, meaning that future steps that want to deal with the same data set need not recompute it or reloaded it from disk. This capability opens up use cases that distribuited processing engines could not previously approach. Spark is well suited for highly iterative algorithms that require multiple passes over data set, as well as reactive applications that quickly respond to user queries by scanning large in-memory data sets.
Spark boasts strong integration with the variety of tools in the Hadoop ecosystem. It can read and write data in all of the data formats supported by MapReduce, allowing it to interact with the formats commonly used to store data on Hadoop like Avro and Parquet and good old CSV. It can read from and write to NoSQL databases like Hbase and Cassandra. Its stream processing library, Spark Streaming, can ingest data continuously from systems like Flume and Kafka. Its SQL library, SparkSQL, can interact with the Hive Metastore. It can run inside YARN, Hadoop’s scheduler and resource manager, allowing it to share cluster resources dynamically and to be managed with the same policies as other processing engines like MapReduce and Impala.

The Spark Programming Model

Spark programming starts with a data set or few, usually residing in some form of distributed, persistent storage like Hadoop Distributed File System (HDFS). Writing a Spark program typically consists of a few related steps:
  • defining a set of transformations on input data sets;
  • invoking actions that output the transformed data sets to persistent storage or return results to the driver’s local memory and
  • running local computations that operate on the results computed in a distributed fashion. These can help you decide what transformations and actions to undertake next.

The Spark Shell and SparkContext

In my last post I forgot to tell you that if you have a Hadoop cluster that runs a versions of Hadoop that supports YARN, you can launch the Spark jobs on the cluster by using the value of yarn-client for the Spark master:
$ spark-shell --master yarn-client
But if you’re just running on your personal computer, you can launch a local Spark cluster by specifying local[N], where N is the number of threads to run. Or you can use the symbol * to match the number of cores available on your machine.
$ spark-shell --master local[*]
Also, I can specify additional arguments to make Spark shell fully utilize your YARN resources. For example:

$ spark-shell \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    examples/jars/spark-examples*.jar \
    10
The example above starts a YARN client program which starts the default Application Master. Then the shell will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running.
For more details about it, you can read here.

What is yarn-client mode in Spark?

A Spark application consists of a driver and one or many executors. The driver program is the main program (where you instantiate SparkContext), which coordinates the executors to run the Spark application. The executors run tasks assigned by the driver.
A YARN application has the following roles: yarn client, yarn application master and list of containers running on the node managers.
When Spark application runs on YARN, it has its own implementation of yarn client and yarn application master.
With those background, the major difference is where the driver program runs.
  1. YARN Standalone Mode_: your driver program is running as a thread of the yarn application master, which itself runs on one of the node managers in the cluster. The Yarn client just pulls status from the application master. This mode is same as a mapreduce job, where the MapReduce application master coordinates the containers to run the map/reduce tasks.
  2. YARN Client Mode: your driver program is running on the yarn client where you type the command to submit the spark application (may not be a machine in the yarn cluster). In this mode, although the drive program is running on the client machine, the tasks are executed on the executors in the node managers of the YARN cluster.
For more details about it, you can read here
From the next post, we’ll deep in Resilient Distributed Dataset (RDD).
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Sunday, November 11, 2018

(T) Using Apache Spark in ETL tasks - Part I

Using Apache Spark in ETL's tasks
In this post, I’ll show you how to use Apache Spark to do a basic ETL (extract, load and transform) operation. Apache Spark™ is a fast and general engine for large-scale data processing. And you can use it in your BI or Big Data Projects.
If you want to know more about it, please visit the site.
Let’s start with the Spark Shell in Scala:
$ cd $SPARK_HOME
$ spark-shell
Let’s check if the SparkContext was initialized:
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@733fae8
Using the spark-shell in Scala let’s load a file into a variable with the help of SparkContext:
scala> var lines = sc.textFile("/home/df/Temp/alerts_news.csv")
lines: org.apache.spark.rdd.RDD[String] = /home/df/Temp/alerts_news.csv MapPartitionsRDD[1] at textFile at :24
SparkContext is a class defined in the Spark library. It is the main entry point into the Spark library. It represents a connection to a Spark cluster. It is also required to create other important objects provided by the Spark API. A Spark application must create an instance of the SparkContext class. Currently, an application can have only one active instance of SparkContext. To create another instance, it must first stop the active instance. The SparkContext class provides multiple constructors. The simplest one does not take any arguments.
The textFile method creates an RDD from a text file. It can read a file or multiple files in a directory stored on a local file system, HDFS, Amazon S3, or any other Hadoop-supported storage system. It returns an RDD of Strings, where each element represents a line in the input file.
After we’ll split each line into “pieces” of separated word:
scala> var pieces = lines.flatMap(s => s.split(" "))
pieces: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at :26
The flatMap method of a Scala collection is similar to the map. It takes a function as input, applies it to each element in a collection, and returns another collection as a result. However, the function passed to flatMap generates a collection for each element in the original collection. Thus, the result of applying the input function is a collection of collections. If the same input function were passed to the map method, it would return a collection of collections. The flatMap method instead returns a flattened collection.
Append one with each word:
scala> var tokens = pieces.map(s => (s,1))
tokens: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :28
The map method of a Scala collection applies its input function to all the elements in the collection and returns another collection. The returned collection has the exact same number of elements as the collection on which map was called. However, the elements in the returned collection need not be of the same type as that in the original collection.
Let’s calculate the frequency of each word by adding all the one’s against one word:
scala> var sumEachWord = tokens.reduceByKey((a, b) => a + b)
sumEachWord: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :30
The higher-order reduceByKey method takes an associative binary operator as input and reduces values with the same key to a single value using the specified binary operator. A binary operator takes two values as input and returns a single value as output. An associative operator returns the same result regardless of the grouping of the operands. The reduceByKey method can be used for aggregating values by key.
Let’s check out the output:
scala> sumEachWord.collect()
res1: Array[(String, Int)] = Array((PÇA,2), (VERBAIS,1), (TJRJ,4), (PERITOS,1), (11DP,1), (RESISTIU,2), (HAVER,1), (GOVERNO,21), (G1,,1), (ALEGAÇÕES,1), (SOB,2), (POSSE:,1), (ASSEMBLEIA,1), (EXCLUSIVO,1), (INFECÇÃO,1), (AUTORIZOU,1), (RIO,,3), (TUDO,3), (CARGA,1), (APRESENTOU,1), (LAGOA,,1), (AÇÃO,15), (PULMÃO,,1), (DETIDOS,1), (LANÇADO,1), (DIÁRIO,1), (TRATAR,1), (RODOANEL,2), (QUARTO,3), (AULAS,18), (IRMÃO,5), (CONCESSÃO,1), (DELE,2), (INDICIADO,2), (DETRAN,2), (ONIBUS,1), (CONTROLARAM,1), (MÉDIA,2), (AMANHÃ,,12), (-É,1), (AMANHÃ",1), (LACERDA,,1), (IDOSO,1), (7,1,1), (RECONHECER,1), (OCEÂNICO,1), (PF,,1), (INFORMA,7), (ESCORIAÇÕES,1), (31,5), (BARRA,8), (A.I.,1), (ANDERSON,1), (33,2), (CORRIGINDO,1), (RECREATIVO,1), (8H,5), (MARCO,3), (UPA,2), (METROS,2), (FAZENDINHA,,1), (NITERÓI,9)...
The collect method returns the elements in the source RDD as an array. This method should be used with caution since it moves data from all the worker nodes to the driver program. It can crash the driver program if called on a very large RDD.
Instead of print output on console, let’s save the output to a file in the local file system:
scala> sumEachWord.saveAsTextFile("/home/df/Temp/scala_news.csv")
The saveAsTextFile method saves the elements of the source RDD in the specified directory on any Hadoop-supported file system. Each RDD element is converted to its string representation and stored as a line of text.
Now let’s check out the file in the path and to confirm the content:
$ cd /home/df/Temp/scala_news.csv/
$ ls -la
$ head part-00000
$ less part-00000
That’s it!
Well, as you can see in a few minutes and some lines we did an ETL task:
  • Extract [E] - we extracted the information from a text file, using a SparkContext.
  • Transform [T] - we transformed the data into a key-value pair using the methods: flatMap, map, and reduceByKey.
  • Load [L] - we loaded the final result on the console and then we did the persistence, saving it to a local file system, using the methods: collect and saveAsTextFile.
Apache Spark is a computer platform designed to be fast and general-purpose, and easy to use. The ease of use with Spark enables you to quickly pick it up using simple APIs for Scala, Python, and Java.
Now, let’s do the same operation, but at this time we will use the spark-shell in Python instead of Scala as the programming language.
First of all, let’s start the Spark Shell in Python:
$ cd $SPARK_HOME
$ pyspark
Let’s check if the SparkContext was initialized:
>>> sc

Using the spark-shell let’s load a file into a variable with the help of SparkContext:
>>> lines = sc.textFile("/home/df/Temp/alerts_news.csv")
After we’ll split each line into “pieces” of separated word:
>>> pieces = lines.flatMap(lambda s: s.split(" "))
Append one with each word:
>>> tokens = pieces.map(lambda s: (s,1))
Let’s calculate the frequency of each word by adding all the ones against one word:
>>> sumEachWord = tokens.reduceByKey(lambda a, b: a + b)
Let’s check out the output:
>>> sumEachWord.collect()
Instead of print output on console, let’s save the output to a file in the local file system:
>>> sumEachWord.saveAsTextFile("/home/df/Temp/python_news.csv")
Now let’s check out the file in the path and to confirm the content:
$ cd /home/df/Temp/python_news.csv/
$ ls -la
$ head part-00000
$ less part-00000
As you can see, both programming languages are very similar in doing the same task. But, we can behold some differences between usage of Scala and Python for Spark, are:
  • In Scala, variables are initialized using var keyword. In Python is not necessary.
  • We use “anonymous functions” the different forms either in Python or Scala. It can be used in an application just like a string literal. It can be passed as an input to a higher-order method or function. It can also be assigned to a variable. A function literal is defined with input parameters in parenthesis, followed by a right arrow and the body of the function. The body of a function literal is enclosed in optional curly braces. An example is shown next.
Scala:
scala> def make_incrementor(n: Int) = (x: Int) => x + n

scala> val f = make_incrementor(42)
Python:
>>> def make_incrementor(n):

... return lambda x: x + n

...

>>> f = make_incrementor(42)
We can do the same tasks even in batch mode with “spark-submit” script.
A Spark application can be launched using the spark-submit script that comes with Spark. It is available in the SPARK_HOME/bin directory. The spark-submit script can be used for both running a Spark application on your development machine and deploying it on a real Spark cluster. It provides a unified interface for running a Spark application with any Spark-supported cluster manager.
However, when running in batch mode the SparkContext object will have to be initialized by the programmer. It is not available by default as it is an interactive shell.
Next steps:
You can extend this tutorial and improve your results by:
  • Load data to the database, for example, MySQL.
  • Explore others transformation methods.
  • Explore others actions methods.
For example, what are the words most popular?
With this tool, you can build upon your work and improve your results.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Lastest Posts

(T) Using shared variables and key-value pairs