Showing posts with label data exploration. Show all posts
Showing posts with label data exploration. Show all posts

Wednesday, February 6, 2019

(T) Using shared variables and key-value pairs

(T) Using shared variables and key-value pairs

Using shared variables and key-value pairs

In this blog post, I’ll talk about Spark’s shared variables and the type of operations you can do on key-value pairs. Spark provides two limited types of shared variables for common usage patterns: broadcast variables and accumulators. Normally, when a function is passed from the driver to a worker, a separate copy of the variables are used for each worker. Broadcast variables allow each machine to work with a read-only variable cached on each machine. Spark attempts to distribute broadcast variables using efficient algorithms. As an example, broadcast variables can be used to give every node a copy of a large dataset efficiently.
The other shared variables are accumulators. These are used for counters in sums that works well in parallel. These variables can only be added through an associated operation. Only the driver can read the accumulators value, not the tasks. The tasks can only add to it. Spark supports numeric types but programmers can add support for new types. As an example, you can use accumulator variables to implement counters or sums, as in MapReduce.
Last, but not least, key-value pairs are available in Scala, Python and Java. In Scala, you create a key-value pair RDD by typing:
 scala> val kvp = ("a", "b")
To access each element, invoke the “._” notation. This is not zero-index, so the “._1” will return the value in the first index and “._2” will return the value in the second index.
scala> val kvp = ("a", "b")
scala> pair._1 // will return "a"
scala> pair._2 // will return "b"
In Python, it is a zero-index notation, so the value of the first index resides in index “0” and the second index is “1”.
>>> kvp = ("a", "b")
>>> pair[0] # will return "a"
>>> pair[1] # will return "b"
There are special operations available to RDDs of key-value pairs. In an application, you must remember to import the SparkContext package to use PairRDD Functions such as reduceByKey(). The most common ones are those that perform grouping or aggregating by a key. If you have custom objects as the key inside your key-value pair, remember that you will need to provide your own equals() method to do the comparison as well as a matching hashCode() method.
So in the example below, you have a textFile that is just a normal RDD. Then you perform some transformations on it and it creates a PairRDD which allows it to invoke the reduceByKey() method that is part of the PairRDD Functions API.
scala> val lines = sc.textFile("/home/df/Temp/alerts_new.csv")
scala> val wordCounts = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_) 
Please attention for the reduceByKey() method. This simply means that for the values of the same key, add them up together.
Another thing I want to point is that for the goal of brevity, all the functions are concatenated on one line. When you actually code it yourself, you may want split each of the functions up. For example, do the flatMap operation and return that to a RDD. Then from that RDD, do the map operation to create another RDD. Then finally, from that last RDD, invoke the reduceByKey method. That would yield multiple lines, but you would be able to test each of the transformation to see if it worked properly.
Next post, let’s understand the purpose and usage of the SparkContext.
Good luck!
If you want, you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Tuesday, January 29, 2019

(T) Working with RDDs in Apache Spark

(T) Working with RDDs in Apache Spark

Working with RDDs in Apache Spark

In my last blog post I talked about RDD (Resilient Distributed Dataset). So, let’s recap a few concepts, in special RDD Actions. RDD Action returns values. Again, you can find more information on Spark’s website. Here is a subset of some of the actions available:
  • The collect() function returns all the elements of the dataset as an array of the driver program. This is usually useful after a filter or another operation that returns a significantly small subset of data to make sure your filter function works correctly.
  • The count() function returns the number of elements in a dataset and can also be used to check and test transformations.
  • The take() function returns an array with the first n elements. Note that this is currently not executed in parallel. The driver computes all the elements.
  • The foreach() function run a function func on each element of the dataset.
And what’s the difference between Actions and Transformations?
Remember that Transformations are essentially lazy evaluations. Nothing is executed until an action is called. Each transformation function basically updates the graph and when an action is called, the graph is executed. Transformation returns a pointer to the new RDD. Here is a subset of some of the transformations available. The full list of them can be found on Spark’s website.
  • The flatMap() function is similar to map, but each input can be mapped to 0 or more output items. What this means is that the returned pointer of the func method, should return a sequence of objects, rather than a single item. It would mean that the flatMap would flatten a list of lists for the operations that follows. Basically this would be used for MapReduce operations where you might have a text file and each time a line is read in, you split that line up by spaces to get individual keywords. Each of those lines ultimately is flatten so that you can perform the map operation on it to map each keyword to the value of one.
  • The join() function combines two sets of key value pairs and return a set of keys to a pair of values from the two initial set. For example, you have a K,V pair and a K,W pair. When you join them together, you will get a K, (V,W) set.
  • The reduceByKey() function aggregates on each key by using the given reduce function. This is something you would use in a WordCount to sum up the values for each word to count its occurrences, for example.
Now I want to get a bit into RDD persistence. You have seen this used already. That is the cache function. The cache function is actually the default of the persist function with the MEMORY_ONLY storage.
One of the key capability of Spark is its speed through persisting or caching. Each node stores any partitions of the cache and computes it in memory. When a subsequent action is called on the same dataset, or a derived dataset, it uses it from memory instead of having to retrieve it again. Future actions in such cases are often 10 times faster. The first time a RDD is persisted, it is kept in memory on the node. Caching is fault tolerant because if it any of the partition is lost, it will automatically be recomputed using the transformations that originally created it.
There are two methods to invoke RDD persistence: persist() and cache().
The persist() method allows you to specify a different storage level of caching. For example, you can choose
to persist the data set on disk, persist it in memory but as serialized objects to save space, etc.
Again the cache() method is just the default way of using persistence by storing deserialized objects in memory.
Here you can see different storage levels of caching and now let’s know what it means each one:
  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc
  • OFF_HEAP
Basically, you can choose to store in memory or memory and disk. If a partition does not fit in the specified cache location, then it will be recomputed on the fly. You can also decide to serialized the objects before storing this. This is space efficient, but will require the RDD to deserialized before it can be read, so it takes up more CPU workload. There’s also the option to replicate each partition on two cluster nodes.
There are many different levels, but don’t worry. Then, which kind of storage level to choose?
There are trade-offs between the different storage levels. You should analyze your current situation to decide which level works best. You can find more details about this on Spark’s website.
But, basically if your RDD fits within the default storage level, by all means, use MEMORY_ONLY.
It is the fastest option to fully take advantage of Spark’s design. If not, you can serialized the RDD and use the MEMORY_ONLY_SER level. Just be sure to choose a fast serialization library to make the objects more space efficient and still reasonably fast to access.
Don’t spill to disk unless the functions that compute your datasets are expensive or it requires a large amount of space.
If you want fast recovery, use the replicated storage levels. All levels are fully fault tolerant, but would still require the recomputing of the data. If you have a replicated copy, you can continue to work while Spark is reconstruction a lost partition.
It allows you to share the same pool of memory and significantly reduces garbage collection costs. Also, the cached data is not lost if the individual executors crash.
Next post, I’ll talk about shared variables and key-value pairs.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Saturday, December 15, 2018

(T) Resilient Distributed Dataset - Part II

(T) Resilient Distributed Dataset - Part II

Resilient Distributed Dataset - Part II

Understanding actions

Let’s start this lesson looking at this code first.
//Creating the RDD
scala> val logFile = sc.textFile("/home/df/Temp/alerts_news.csv")
//Transformations
scala> val errors = logFile.filter(_.startsWith("ERROR"))
scala> val messages = errors.map(_.split(("\t")).map(r => r(1))
//Caching
scala> messages.cache()
//Actions
scala> messages.filter((_.contains("mysql")).count()
scala> messages.filter((_.contains("php")).count()
The goal here is to analyze some log files. The first line you load the log from the hadoop file system. The next two lines you filter out the messages within the log errors. Before you invoke some action on it, you tell it to cache the filtered dataset - it doesn’t actually cache it yet as nothing has been done up until this point. Then you do more filters to get specific error messages relating to mysql and php followed by the count action to find out how many errors were related to each of those filters.
Now let’s walk through each of the steps. The first thing that happens when you load in the text file is the data is partitioned into different blocks across the cluster. Then the driver sends the code to be executed on each block. In the example, it would be the various transformations and actions that will be sent out to the workers. Actually, it is the executor on each workers that is going to be performing the work on each block.
Then the executors read the HDFS blocks to prepare the data for the operations in parallel. After a series of transformations, you want to cache the results up until that point into memory. A cache is created. After the first action completes, the results are sent back to the driver. In this case, we’re looking for messages that relate to mysql. This is then returned back to the driver. To process the second action, Spark will use the data on the cache – it doesn’t need to go to the HDFS data again. It just reads it from the cache and processes the data from there.
Finally the results go back to the driver and we have completed a full cycle.
Here is a screencast with a bit demonstration of this explanation above.
Next post, we’ll deep in RDD Transformations, RDD Actions, and RDD Persistence.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

(T) Resilient Distributed Dataset - Part I

(T) Resilient Distributed Dataset - Part I

Resilient Distributed Dataset - Part I

This post will cover Resilient Distributed Dataset. After completing this lesson, you should be able to understand and describe Spark’s primary data abstraction, the RDD. You should know how to create parallelized collections from internal and external datasets. You should be able to use RDD opeartions such as Transformations and Actions. Finally, I’ll show you how to take advantage of Spark’s shared variables and key-value pairs.
Resilient Distributed Dataset (RDD) is Spark’s primary abstraction. RDD is a fault tolerant collection of elements that can be parallelized. In other words, they can be made to be operated on in parallel. They are also immutable. These are the fundamental primary units of data in Spark. When RDDs are created, a direct acyclic graph (DAG) is created. This type of operation is called transformations. Transformations makes updates to that graph, but nothing actually happens until some action is called. Actions are another type of operations.
We’ll talk more about this shortly. The notion here is that the graphs can be replayed on nodes that need to get back to the state it was before it went offline - thus providing fault tolerance. The elements of the RDD can be operated on in parallel across the cluster. Remember, transformations return a pointer to the RDD created and actions return values that comes from the action.
There are three methods for creating a RDD. You can parallelize an existing collection. This means that the data already resides within Spark and can now be operated on in parallel.
As an example, if you have an array of data, you can create a RDD out of it by calling the parallelized method. This method returns a pointer to the RDD. So this new distributed dataset can now be operated upon in parallel throughout the cluster.
The second method to create a RDD, is to reference a dataset. This dataset can come from any storage source supported by Hadoop such as HDFS, Cassandra, HBase, Amazon S3, etc.
The third method to create a RDD is from transforming an existing RDD to create a new RDD. In other
words, let’s say you have the array of data that you parallelized earlier. Now you want to filter out strings that are shorter than 20 characters. A new RDD is created using the filter method. And it’s important remember you that Spark supports text files, SequenceFiles and any other Hadoop InputFormat.
Here is a quick example of how to create an RDD from an existing collection of data.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@525b8922

scala> val myNumbers = (1 to 20000).toList
scala> val myRDD = sc.parallelize(myNumbers)
myRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> myRDD.filter(x => x % 1000 == 0)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:29
In the examples throughout the course, unless otherwise indicated, we’re going to be using Scala to show how Spark works. In the lab exercises, you will get to work with Python. So the first thing is to launch the Spark shell. This command is located under the $SPARK_HOME/bin directory.
Once the shell is up, let’s create some data with values from 1 to 20,000. Then, create an RDD from that data using the parallelize method from the SparkContext, shown as sc on the example above. This means that the data can now be operated on in parallel.
We will cover more on the SparkContext, the sc object that is invoking the parallelized function, in our programming lesson, so for now, just know that when you initialize a shell, the SparkContext, sc, is initialized for you to use.
The parallelize method returns a pointer to the RDD. Remember, transformations operations such as parallelize, only returns a pointer to the RDD. It actually won’t create that RDD until some action is invoked on it. With this new RDD, you can perform additional transformations or actions on it such as the filter transformation.
Another way to create a RDD is from an external dataset. In the example bellow, we are creating a RDD from a text file using the textFile method of the SparkContext object. You will see plenty more examples of how to create RDD throughout this course.
scala> sc
res4: org.apache.spark.SparkContext = org.apache.spark.SparkContext@525b8922

scala> val lines = sc.textFile(/home/lserra/Temp/alerts_news.csv”)
lines: org.apache.spark.rdd.RDD[String] = /home/lserra/Temp/alerts_news.csv MapPartitionsRDD[3] at textFile at <console>:24

scala> val lineLengths = lines.map(s => s.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26

scala> val totalLengths = lineLengths.reduce((a, b) => a + b)
totalLengths: Int = 86954

scala> val wordCounts = lines.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:26

scala> wordCounts.collect()
Here we go over some basic operations. You have seen how to load a file from an external dataset. This time, however, we are loading a file from the HDFS. Loading the file creates a RDD, which is only a pointer to the file. The dataset is not loaded into memory yet.
Nothing will happen until some action is called. The transformation basically updates the direct acyclic graph (DAG). So the transformation here is saying map each line s, to the length of that line. Then, the action operation is reducing it to get the total length of all the lines. When the action is called, Spark goes through the DAG and applies all the transformation up until that point, followed by the action and then a value is returned back to the caller.
A common example is a MapReduce word count. You first split up the file by words and then map each word into a key value pair with the word as the key, and the value of 1. Then you reduce by the key, which adds up all the value of the same key, effectively, counting the number of occurrences of that key. Finally, you call the collect() function, which is an action, to have it print out all the words and its occurrences.
Next post, you will see at a high level what happens when an action is executed.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

(T) Using Apache Spark in ETL tasks - Part II

(T) Using Apache Spark in ETL tasks - Part II

Why should I use Apache Spark?

In the last post I showed you how we can use Apache Spark in ETL’s tasks. And in this post I’ll try show you how can we use Apache Spark.
Hadoop ecosystem provide abstractions that allow users to treat a cluster of computers more like a single computer - to automatically split up files and distribute storage over many machines, to automatically divide work into smaller tasks and execute them in a distributed manner, and to automatically recover from failures.
MapReduce did a revolution computation over huge data sets by offering a simple model for writing programs that could execute in parellel across hundreds to thousands of machines. The MapReduce engine achieves near linear scalability - as the data size increases, we can throw more computers at it and see jobs complete in the same amount of time - and is resilient to the fact that failures that occur rarely on a single machine occur all the time on clusters of thousands. It breaks up work into small tasks and can gracefully accommodate task failures without compromising the job to which they belong.
Apache Spark maintains MapReduce’s linear scalability and fault tolerance, but extends it in three important ways:
  • rather than relying on a rigid map-then-reduce format, its engine can execute a more general directed acyclic graph (DAG) of operators. This means that, in situations where MapReduce must write out intermediate results to the distributed filesystem, Spark can pass them directly to the next step in the pipeline.
  • it complements this capability with a rich set of transformations that enable users to express computation more naturally. It has a strong developer focus and streamlined API that can represent complex pipelines in a few lines of code.
  • extends its predecessors with in-memory processing. Its Resilient Distrituted Dataset (RDD) abstraction enables developers to materialize any point in a processing pipeline into memory across the cluster, meaning that future steps that want to deal with the same data set need not recompute it or reloaded it from disk. This capability opens up use cases that distribuited processing engines could not previously approach. Spark is well suited for highly iterative algorithms that require multiple passes over data set, as well as reactive applications that quickly respond to user queries by scanning large in-memory data sets.
Spark boasts strong integration with the variety of tools in the Hadoop ecosystem. It can read and write data in all of the data formats supported by MapReduce, allowing it to interact with the formats commonly used to store data on Hadoop like Avro and Parquet and good old CSV. It can read from and write to NoSQL databases like Hbase and Cassandra. Its stream processing library, Spark Streaming, can ingest data continuously from systems like Flume and Kafka. Its SQL library, SparkSQL, can interact with the Hive Metastore. It can run inside YARN, Hadoop’s scheduler and resource manager, allowing it to share cluster resources dynamically and to be managed with the same policies as other processing engines like MapReduce and Impala.

The Spark Programming Model

Spark programming starts with a data set or few, usually residing in some form of distributed, persistent storage like Hadoop Distributed File System (HDFS). Writing a Spark program typically consists of a few related steps:
  • defining a set of transformations on input data sets;
  • invoking actions that output the transformed data sets to persistent storage or return results to the driver’s local memory and
  • running local computations that operate on the results computed in a distributed fashion. These can help you decide what transformations and actions to undertake next.

The Spark Shell and SparkContext

In my last post I forgot to tell you that if you have a Hadoop cluster that runs a versions of Hadoop that supports YARN, you can launch the Spark jobs on the cluster by using the value of yarn-client for the Spark master:
$ spark-shell --master yarn-client
But if you’re just running on your personal computer, you can launch a local Spark cluster by specifying local[N], where N is the number of threads to run. Or you can use the symbol * to match the number of cores available on your machine.
$ spark-shell --master local[*]
Also, I can specify additional arguments to make Spark shell fully utilize your YARN resources. For example:

$ spark-shell \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    examples/jars/spark-examples*.jar \
    10
The example above starts a YARN client program which starts the default Application Master. Then the shell will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running.
For more details about it, you can read here.

What is yarn-client mode in Spark?

A Spark application consists of a driver and one or many executors. The driver program is the main program (where you instantiate SparkContext), which coordinates the executors to run the Spark application. The executors run tasks assigned by the driver.
A YARN application has the following roles: yarn client, yarn application master and list of containers running on the node managers.
When Spark application runs on YARN, it has its own implementation of yarn client and yarn application master.
With those background, the major difference is where the driver program runs.
  1. YARN Standalone Mode_: your driver program is running as a thread of the yarn application master, which itself runs on one of the node managers in the cluster. The Yarn client just pulls status from the application master. This mode is same as a mapreduce job, where the MapReduce application master coordinates the containers to run the map/reduce tasks.
  2. YARN Client Mode: your driver program is running on the yarn client where you type the command to submit the spark application (may not be a machine in the yarn cluster). In this mode, although the drive program is running on the client machine, the tasks are executed on the executors in the node managers of the YARN cluster.
For more details about it, you can read here
From the next post, we’ll deep in Resilient Distributed Dataset (RDD).
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Sunday, November 11, 2018

(T) Using Apache Spark in ETL tasks - Part I

Using Apache Spark in ETL's tasks
In this post, I’ll show you how to use Apache Spark to do a basic ETL (extract, load and transform) operation. Apache Spark™ is a fast and general engine for large-scale data processing. And you can use it in your BI or Big Data Projects.
If you want to know more about it, please visit the site.
Let’s start with the Spark Shell in Scala:
$ cd $SPARK_HOME
$ spark-shell
Let’s check if the SparkContext was initialized:
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@733fae8
Using the spark-shell in Scala let’s load a file into a variable with the help of SparkContext:
scala> var lines = sc.textFile("/home/df/Temp/alerts_news.csv")
lines: org.apache.spark.rdd.RDD[String] = /home/df/Temp/alerts_news.csv MapPartitionsRDD[1] at textFile at :24
SparkContext is a class defined in the Spark library. It is the main entry point into the Spark library. It represents a connection to a Spark cluster. It is also required to create other important objects provided by the Spark API. A Spark application must create an instance of the SparkContext class. Currently, an application can have only one active instance of SparkContext. To create another instance, it must first stop the active instance. The SparkContext class provides multiple constructors. The simplest one does not take any arguments.
The textFile method creates an RDD from a text file. It can read a file or multiple files in a directory stored on a local file system, HDFS, Amazon S3, or any other Hadoop-supported storage system. It returns an RDD of Strings, where each element represents a line in the input file.
After we’ll split each line into “pieces” of separated word:
scala> var pieces = lines.flatMap(s => s.split(" "))
pieces: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at :26
The flatMap method of a Scala collection is similar to the map. It takes a function as input, applies it to each element in a collection, and returns another collection as a result. However, the function passed to flatMap generates a collection for each element in the original collection. Thus, the result of applying the input function is a collection of collections. If the same input function were passed to the map method, it would return a collection of collections. The flatMap method instead returns a flattened collection.
Append one with each word:
scala> var tokens = pieces.map(s => (s,1))
tokens: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :28
The map method of a Scala collection applies its input function to all the elements in the collection and returns another collection. The returned collection has the exact same number of elements as the collection on which map was called. However, the elements in the returned collection need not be of the same type as that in the original collection.
Let’s calculate the frequency of each word by adding all the one’s against one word:
scala> var sumEachWord = tokens.reduceByKey((a, b) => a + b)
sumEachWord: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :30
The higher-order reduceByKey method takes an associative binary operator as input and reduces values with the same key to a single value using the specified binary operator. A binary operator takes two values as input and returns a single value as output. An associative operator returns the same result regardless of the grouping of the operands. The reduceByKey method can be used for aggregating values by key.
Let’s check out the output:
scala> sumEachWord.collect()
res1: Array[(String, Int)] = Array((PÇA,2), (VERBAIS,1), (TJRJ,4), (PERITOS,1), (11DP,1), (RESISTIU,2), (HAVER,1), (GOVERNO,21), (G1,,1), (ALEGAÇÕES,1), (SOB,2), (POSSE:,1), (ASSEMBLEIA,1), (EXCLUSIVO,1), (INFECÇÃO,1), (AUTORIZOU,1), (RIO,,3), (TUDO,3), (CARGA,1), (APRESENTOU,1), (LAGOA,,1), (AÇÃO,15), (PULMÃO,,1), (DETIDOS,1), (LANÇADO,1), (DIÁRIO,1), (TRATAR,1), (RODOANEL,2), (QUARTO,3), (AULAS,18), (IRMÃO,5), (CONCESSÃO,1), (DELE,2), (INDICIADO,2), (DETRAN,2), (ONIBUS,1), (CONTROLARAM,1), (MÉDIA,2), (AMANHÃ,,12), (-É,1), (AMANHÃ",1), (LACERDA,,1), (IDOSO,1), (7,1,1), (RECONHECER,1), (OCEÂNICO,1), (PF,,1), (INFORMA,7), (ESCORIAÇÕES,1), (31,5), (BARRA,8), (A.I.,1), (ANDERSON,1), (33,2), (CORRIGINDO,1), (RECREATIVO,1), (8H,5), (MARCO,3), (UPA,2), (METROS,2), (FAZENDINHA,,1), (NITERÓI,9)...
The collect method returns the elements in the source RDD as an array. This method should be used with caution since it moves data from all the worker nodes to the driver program. It can crash the driver program if called on a very large RDD.
Instead of print output on console, let’s save the output to a file in the local file system:
scala> sumEachWord.saveAsTextFile("/home/df/Temp/scala_news.csv")
The saveAsTextFile method saves the elements of the source RDD in the specified directory on any Hadoop-supported file system. Each RDD element is converted to its string representation and stored as a line of text.
Now let’s check out the file in the path and to confirm the content:
$ cd /home/df/Temp/scala_news.csv/
$ ls -la
$ head part-00000
$ less part-00000
That’s it!
Well, as you can see in a few minutes and some lines we did an ETL task:
  • Extract [E] - we extracted the information from a text file, using a SparkContext.
  • Transform [T] - we transformed the data into a key-value pair using the methods: flatMap, map, and reduceByKey.
  • Load [L] - we loaded the final result on the console and then we did the persistence, saving it to a local file system, using the methods: collect and saveAsTextFile.
Apache Spark is a computer platform designed to be fast and general-purpose, and easy to use. The ease of use with Spark enables you to quickly pick it up using simple APIs for Scala, Python, and Java.
Now, let’s do the same operation, but at this time we will use the spark-shell in Python instead of Scala as the programming language.
First of all, let’s start the Spark Shell in Python:
$ cd $SPARK_HOME
$ pyspark
Let’s check if the SparkContext was initialized:
>>> sc

Using the spark-shell let’s load a file into a variable with the help of SparkContext:
>>> lines = sc.textFile("/home/df/Temp/alerts_news.csv")
After we’ll split each line into “pieces” of separated word:
>>> pieces = lines.flatMap(lambda s: s.split(" "))
Append one with each word:
>>> tokens = pieces.map(lambda s: (s,1))
Let’s calculate the frequency of each word by adding all the ones against one word:
>>> sumEachWord = tokens.reduceByKey(lambda a, b: a + b)
Let’s check out the output:
>>> sumEachWord.collect()
Instead of print output on console, let’s save the output to a file in the local file system:
>>> sumEachWord.saveAsTextFile("/home/df/Temp/python_news.csv")
Now let’s check out the file in the path and to confirm the content:
$ cd /home/df/Temp/python_news.csv/
$ ls -la
$ head part-00000
$ less part-00000
As you can see, both programming languages are very similar in doing the same task. But, we can behold some differences between usage of Scala and Python for Spark, are:
  • In Scala, variables are initialized using var keyword. In Python is not necessary.
  • We use “anonymous functions” the different forms either in Python or Scala. It can be used in an application just like a string literal. It can be passed as an input to a higher-order method or function. It can also be assigned to a variable. A function literal is defined with input parameters in parenthesis, followed by a right arrow and the body of the function. The body of a function literal is enclosed in optional curly braces. An example is shown next.
Scala:
scala> def make_incrementor(n: Int) = (x: Int) => x + n

scala> val f = make_incrementor(42)
Python:
>>> def make_incrementor(n):

... return lambda x: x + n

...

>>> f = make_incrementor(42)
We can do the same tasks even in batch mode with “spark-submit” script.
A Spark application can be launched using the spark-submit script that comes with Spark. It is available in the SPARK_HOME/bin directory. The spark-submit script can be used for both running a Spark application on your development machine and deploying it on a real Spark cluster. It provides a unified interface for running a Spark application with any Spark-supported cluster manager.
However, when running in batch mode the SparkContext object will have to be initialized by the programmer. It is not available by default as it is an interactive shell.
Next steps:
You can extend this tutorial and improve your results by:
  • Load data to the database, for example, MySQL.
  • Explore others transformation methods.
  • Explore others actions methods.
For example, what are the words most popular?
With this tool, you can build upon your work and improve your results.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Friday, October 6, 2017

(A) Data Science in Practice with Python - Sample 2

In this post, I'll explain what is a recommender system, how to work it and show you some code examples. In my previous post I did a quick introduction:

Sample 2 - Recommender System

WHAT IS A RECOMMENDER SYSTEM? A model that filters information to present users with a curated subset of options they’re likely to find appealing.
HOW DOES IT WORK? Generally via a collaborative approach (considering the user’s previous behavior) or content-based approach (based on discrete assigned characteristics).

Now I'll get into in some concepts very important about recommender systems.

Recommender System in Details:

We can say that the goal of a recommender system is to make product or service recommendations to people. Of course, these recommendations should be for products or services they’re more likely to want to buy or consume.

Recommender systems are active information filtering systems which personalize the information coming to a user based on his interests, relevance of the information etc. Recommender systems are used widely for recommending movies, articles, restaurants, places to visit, items to buy etc.

1. Type of Recommendation Engines

A simple way to think about the different types of recommender are:
  • Content Filtering: “If you liked this item, you might also like …”
  • Item-Item Collaborative Filtering: “Customers who liked this item also liked …”
  • User-Item Collaborative Filtering: “Customers who are similar to you also liked …”
Confused...So, let's see in practice some cases: 

Case 1: Content-based algorithm
Idea: “If you liked this item, you might also like …”
Based on similarity of the items being recommended. It generally works well when its easy to determine the context/properties of each item. For instance when we are recommending the same kind of item like a movie recommendation or song recommendation.

Case 2: Collaborative filtering algorithm
If a person A likes item 1, 2, 3 and B like 2, 3, 4 then they have similar interests and A should like item 4 and B should like item 1.

This algorithm is entirely based on past behavior and not on the context. This makes it one of the most commonly used algorithms as it is not dependent on any additional information. For instance: product recommendations by e-commerce player like Amazon and merchant recommendations by banks like American Express.

An important point here is that in this case, there are several types of collaborative filtering algorithms. Let's see the two most important:

Item-Item Collaborative filtering:
Idea: “Customers who liked this item also liked …”
It is quite similar to the previous algorithm, but instead of finding customer look alike, we try finding item look alike. Once we have item look alike matrix, we can easily recommend alike items to a customer who has purchased an item from the store. This algorithm is far less resource consuming than user-user collaborative filtering. Hence, for a new customer, the algorithm takes far lesser time than user-user collaborate as we don’t need all similarity scores between customers. And with a fixed number of products, product-product look alike matrix is fixed over time.

User-Item Collaborative filtering:
Idea: “Customers who are similar to you also liked …
Here we find look-alike customers (based on similarity) and offer products which first customer’s look-alike has chosen in past. This algorithm is very effective but takes a lot of time and resources. It requires to compute every customer pair information which takes time. Therefore, for big base platforms, this algorithm is hard to implement without a very strong parallelization system.

But before we proceed, let me define a couple of terms:
  • Item would refer to content whose attributes are used in the recommended models. These could be movies, documents, book etc.
  • Attribute refers to the characteristic of an item. A movie tag, words in a document are examples.
Recommender System Hands-On:

As I said above, "Collaborative filtering" algorithms search large groupings of preference expressions to find similarities to some input preference or preferences. The output from these algorithms is a ranked list of suggestions that is a subset of all possible preferences, and hence, it's called "filtering". The "collaborative" comes from the use of many other peoples' preferences in order to find suggestions for themselves. This can be seen either as a search of the space of preferences (for brute-force techniques), a clustering problem (grouping similarly preferred items), or even some other predictive model.

Many algorithmic attempts have been created in order to optimize or solve this problem across sparse or large datasets, and we will discuss a few of them in this post.

The goals of this post are:
  • Understanding how to model preferences from a variety of sources
  • Learning how to compute similarities using distance metrics
To demonstrate the techniques in this post, I will use the "MovieLens" database from the University of Minnesota that contains star ratings of moviegoers for their preferred movies. You can find the data here.

I will use the smaller MoveLens 100k dataset (4.7 MB in size / ml-100k.zip) in order to load the entire model into the memory with ease. Unzip the downloaded data into the directory of your choice.

These data consists of:
  • 100,000 ratings (1-5) from 943 users on 1682 movies.
  • Each user has rated at least 20 movies.
  • Simple demographic info for the users (age, gender, occupation, zip)
  • Genre information of movies
In a zipped file, there are two main files that we will be using are as follows:
  • u.data: This contains the user moving ratings, it is the main file and it is tab delimited
  • u.item: This contains the movie information and other details, it is pipe delimited
The u.data file, the first column is the "user ID", the second column is the "movie ID", the third is the "star rating", and the last is the "timestamp".

The u.item file contains much more information, including the "ID", "title", "release date", and so on. Interestingly, this file also has a Boolean array indicating the genre(s) of each movie, including (in order) action, adventure, animation, children, comedy, crime, documentary, drama, fantasy, film noir, horror, musical, mystery, romance, sci-fi, thriller, war, and western.

Well ... let's see all this in the practice, step-by-step or better line-by-line. For this, I prepared a Jupyter notebook. You can find the notebook here.

I hope you enjoy it!

See you again on next post ... Bye, Bye!

Saturday, July 8, 2017

(A) Data Science in Practice with Python - Sample 1


The top trending in Twitter or other social network is the term “data science”. But ...
  • What’s the data science? 
  • How do real companies use data science to make products, services and operations better? 
  • How does it work? 
  • What does the data science lifecycle look like? 
This is the buzzword at the moment. A lot of people ask me about it. Are many questions. I’ll try answer all of these questions through of some samples.

Sample 1 - Regression

WHAT IS A REGRESSION? This is the better definition what I found [Source: Wikipedia] - Regression analysis is widely used for prediction and forecasting, where its use has substantial overlap with the field of machine learning.
HOW DOES IT WORK? Regression analysis is also used to understand which among the independent variables are related to the dependent variable, and to explore the forms of these relationships. In restricted circumstances, regression analysis can be used to infer causal relationships between the independent and dependent variables. However this can lead to illusions or false relationships, so caution is advisable; for example, correlation does not imply causation.

Sample 2 - Recommender System

WHAT IS A RECOMMENDER SYSTEM? A model that filters information to present users with a curated subset of options they’re likely to find appealing.
HOW DOES IT WORK? Generally via a collaborative approach (considering user’s previous behavior) or content based approach (based on discrete assigned characteristics).

Sample 3 - Credit Scoring

WHAT IS CREDIT SCORING? A model that determines an applicant’s creditworthiness for a mortgage, loan or credit card.
HOW DOES IT WORK? A set of decision management rules evaluates how likely an applicant is to repay debts.

Sample 4 - Dynamic Pricing

WHAT IS DYNAMIC PRICING? Modeling price as a function of supply, demand, competitor pricing and exogenous factors.
HOW DOES IT WORK? Generalized linear models and classification trees are popular techniques for estimating the “right” price to maximize expected revenue.

Sample 5 - Customer Churn

WHAT IS CUSTOMER CHURN? Predicting which customers are going to abandon a product or service.
HOW DOES IT WORK? Data scientists may consider using support vector machines, random forest or k-nearest-neighbors algorithms.

Sample 6 - Fraud Detection

WHAT IS FRAUD DETECTION? Detecting and preventing fraudulent financial transactions from being processed.
HOW DOES IT WORK? Fraud detection is a binary classification problem: “is this transaction legitimate or not?”

This post will be divided in 5 parts. In each one I’ll explain the machine learning techniques mentioned above. This is the first post and I’ll show you how work the sample 1: regression. But, first let’s start with the question (below):

- What’s the data science?

In my previous post “Tucson Best Buy Analysis”, you can know more about it. There I explain “what is” and show you some examples of the day-by-day of a Data Scientist.

Being so, let’s talk about sample 1: regression. To explain this let’s start with a simple problem, I could say “classic problem”, predicting house prices in Russia. A Kaggle's challenge what was closed on 06/29/2017. The goal is to predict the median value of a house in a particular area. As usual, we have some training data, where the answer is known to us. To our study and a better comprehension about this topic I created a notebook on Jupyter tool. To access the notebook, please click here.

Have fun! See you on next post ...

Lastest Posts

(T) Using shared variables and key-value pairs