Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

Wednesday, February 6, 2019

(T) Using shared variables and key-value pairs

(T) Using shared variables and key-value pairs

Using shared variables and key-value pairs

In this blog post, I’ll talk about Spark’s shared variables and the type of operations you can do on key-value pairs. Spark provides two limited types of shared variables for common usage patterns: broadcast variables and accumulators. Normally, when a function is passed from the driver to a worker, a separate copy of the variables are used for each worker. Broadcast variables allow each machine to work with a read-only variable cached on each machine. Spark attempts to distribute broadcast variables using efficient algorithms. As an example, broadcast variables can be used to give every node a copy of a large dataset efficiently.
The other shared variables are accumulators. These are used for counters in sums that works well in parallel. These variables can only be added through an associated operation. Only the driver can read the accumulators value, not the tasks. The tasks can only add to it. Spark supports numeric types but programmers can add support for new types. As an example, you can use accumulator variables to implement counters or sums, as in MapReduce.
Last, but not least, key-value pairs are available in Scala, Python and Java. In Scala, you create a key-value pair RDD by typing:
 scala> val kvp = ("a", "b")
To access each element, invoke the “._” notation. This is not zero-index, so the “._1” will return the value in the first index and “._2” will return the value in the second index.
scala> val kvp = ("a", "b")
scala> pair._1 // will return "a"
scala> pair._2 // will return "b"
In Python, it is a zero-index notation, so the value of the first index resides in index “0” and the second index is “1”.
>>> kvp = ("a", "b")
>>> pair[0] # will return "a"
>>> pair[1] # will return "b"
There are special operations available to RDDs of key-value pairs. In an application, you must remember to import the SparkContext package to use PairRDD Functions such as reduceByKey(). The most common ones are those that perform grouping or aggregating by a key. If you have custom objects as the key inside your key-value pair, remember that you will need to provide your own equals() method to do the comparison as well as a matching hashCode() method.
So in the example below, you have a textFile that is just a normal RDD. Then you perform some transformations on it and it creates a PairRDD which allows it to invoke the reduceByKey() method that is part of the PairRDD Functions API.
scala> val lines = sc.textFile("/home/df/Temp/alerts_new.csv")
scala> val wordCounts = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_) 
Please attention for the reduceByKey() method. This simply means that for the values of the same key, add them up together.
Another thing I want to point is that for the goal of brevity, all the functions are concatenated on one line. When you actually code it yourself, you may want split each of the functions up. For example, do the flatMap operation and return that to a RDD. Then from that RDD, do the map operation to create another RDD. Then finally, from that last RDD, invoke the reduceByKey method. That would yield multiple lines, but you would be able to test each of the transformation to see if it worked properly.
Next post, let’s understand the purpose and usage of the SparkContext.
Good luck!
If you want, you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Tuesday, January 29, 2019

(T) Working with RDDs in Apache Spark

(T) Working with RDDs in Apache Spark

Working with RDDs in Apache Spark

In my last blog post I talked about RDD (Resilient Distributed Dataset). So, let’s recap a few concepts, in special RDD Actions. RDD Action returns values. Again, you can find more information on Spark’s website. Here is a subset of some of the actions available:
  • The collect() function returns all the elements of the dataset as an array of the driver program. This is usually useful after a filter or another operation that returns a significantly small subset of data to make sure your filter function works correctly.
  • The count() function returns the number of elements in a dataset and can also be used to check and test transformations.
  • The take() function returns an array with the first n elements. Note that this is currently not executed in parallel. The driver computes all the elements.
  • The foreach() function run a function func on each element of the dataset.
And what’s the difference between Actions and Transformations?
Remember that Transformations are essentially lazy evaluations. Nothing is executed until an action is called. Each transformation function basically updates the graph and when an action is called, the graph is executed. Transformation returns a pointer to the new RDD. Here is a subset of some of the transformations available. The full list of them can be found on Spark’s website.
  • The flatMap() function is similar to map, but each input can be mapped to 0 or more output items. What this means is that the returned pointer of the func method, should return a sequence of objects, rather than a single item. It would mean that the flatMap would flatten a list of lists for the operations that follows. Basically this would be used for MapReduce operations where you might have a text file and each time a line is read in, you split that line up by spaces to get individual keywords. Each of those lines ultimately is flatten so that you can perform the map operation on it to map each keyword to the value of one.
  • The join() function combines two sets of key value pairs and return a set of keys to a pair of values from the two initial set. For example, you have a K,V pair and a K,W pair. When you join them together, you will get a K, (V,W) set.
  • The reduceByKey() function aggregates on each key by using the given reduce function. This is something you would use in a WordCount to sum up the values for each word to count its occurrences, for example.
Now I want to get a bit into RDD persistence. You have seen this used already. That is the cache function. The cache function is actually the default of the persist function with the MEMORY_ONLY storage.
One of the key capability of Spark is its speed through persisting or caching. Each node stores any partitions of the cache and computes it in memory. When a subsequent action is called on the same dataset, or a derived dataset, it uses it from memory instead of having to retrieve it again. Future actions in such cases are often 10 times faster. The first time a RDD is persisted, it is kept in memory on the node. Caching is fault tolerant because if it any of the partition is lost, it will automatically be recomputed using the transformations that originally created it.
There are two methods to invoke RDD persistence: persist() and cache().
The persist() method allows you to specify a different storage level of caching. For example, you can choose
to persist the data set on disk, persist it in memory but as serialized objects to save space, etc.
Again the cache() method is just the default way of using persistence by storing deserialized objects in memory.
Here you can see different storage levels of caching and now let’s know what it means each one:
  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc
  • OFF_HEAP
Basically, you can choose to store in memory or memory and disk. If a partition does not fit in the specified cache location, then it will be recomputed on the fly. You can also decide to serialized the objects before storing this. This is space efficient, but will require the RDD to deserialized before it can be read, so it takes up more CPU workload. There’s also the option to replicate each partition on two cluster nodes.
There are many different levels, but don’t worry. Then, which kind of storage level to choose?
There are trade-offs between the different storage levels. You should analyze your current situation to decide which level works best. You can find more details about this on Spark’s website.
But, basically if your RDD fits within the default storage level, by all means, use MEMORY_ONLY.
It is the fastest option to fully take advantage of Spark’s design. If not, you can serialized the RDD and use the MEMORY_ONLY_SER level. Just be sure to choose a fast serialization library to make the objects more space efficient and still reasonably fast to access.
Don’t spill to disk unless the functions that compute your datasets are expensive or it requires a large amount of space.
If you want fast recovery, use the replicated storage levels. All levels are fully fault tolerant, but would still require the recomputing of the data. If you have a replicated copy, you can continue to work while Spark is reconstruction a lost partition.
It allows you to share the same pool of memory and significantly reduces garbage collection costs. Also, the cached data is not lost if the individual executors crash.
Next post, I’ll talk about shared variables and key-value pairs.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Saturday, December 15, 2018

(T) Resilient Distributed Dataset - Part II

(T) Resilient Distributed Dataset - Part II

Resilient Distributed Dataset - Part II

Understanding actions

Let’s start this lesson looking at this code first.
//Creating the RDD
scala> val logFile = sc.textFile("/home/df/Temp/alerts_news.csv")
//Transformations
scala> val errors = logFile.filter(_.startsWith("ERROR"))
scala> val messages = errors.map(_.split(("\t")).map(r => r(1))
//Caching
scala> messages.cache()
//Actions
scala> messages.filter((_.contains("mysql")).count()
scala> messages.filter((_.contains("php")).count()
The goal here is to analyze some log files. The first line you load the log from the hadoop file system. The next two lines you filter out the messages within the log errors. Before you invoke some action on it, you tell it to cache the filtered dataset - it doesn’t actually cache it yet as nothing has been done up until this point. Then you do more filters to get specific error messages relating to mysql and php followed by the count action to find out how many errors were related to each of those filters.
Now let’s walk through each of the steps. The first thing that happens when you load in the text file is the data is partitioned into different blocks across the cluster. Then the driver sends the code to be executed on each block. In the example, it would be the various transformations and actions that will be sent out to the workers. Actually, it is the executor on each workers that is going to be performing the work on each block.
Then the executors read the HDFS blocks to prepare the data for the operations in parallel. After a series of transformations, you want to cache the results up until that point into memory. A cache is created. After the first action completes, the results are sent back to the driver. In this case, we’re looking for messages that relate to mysql. This is then returned back to the driver. To process the second action, Spark will use the data on the cache – it doesn’t need to go to the HDFS data again. It just reads it from the cache and processes the data from there.
Finally the results go back to the driver and we have completed a full cycle.
Here is a screencast with a bit demonstration of this explanation above.
Next post, we’ll deep in RDD Transformations, RDD Actions, and RDD Persistence.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

(T) Resilient Distributed Dataset - Part I

(T) Resilient Distributed Dataset - Part I

Resilient Distributed Dataset - Part I

This post will cover Resilient Distributed Dataset. After completing this lesson, you should be able to understand and describe Spark’s primary data abstraction, the RDD. You should know how to create parallelized collections from internal and external datasets. You should be able to use RDD opeartions such as Transformations and Actions. Finally, I’ll show you how to take advantage of Spark’s shared variables and key-value pairs.
Resilient Distributed Dataset (RDD) is Spark’s primary abstraction. RDD is a fault tolerant collection of elements that can be parallelized. In other words, they can be made to be operated on in parallel. They are also immutable. These are the fundamental primary units of data in Spark. When RDDs are created, a direct acyclic graph (DAG) is created. This type of operation is called transformations. Transformations makes updates to that graph, but nothing actually happens until some action is called. Actions are another type of operations.
We’ll talk more about this shortly. The notion here is that the graphs can be replayed on nodes that need to get back to the state it was before it went offline - thus providing fault tolerance. The elements of the RDD can be operated on in parallel across the cluster. Remember, transformations return a pointer to the RDD created and actions return values that comes from the action.
There are three methods for creating a RDD. You can parallelize an existing collection. This means that the data already resides within Spark and can now be operated on in parallel.
As an example, if you have an array of data, you can create a RDD out of it by calling the parallelized method. This method returns a pointer to the RDD. So this new distributed dataset can now be operated upon in parallel throughout the cluster.
The second method to create a RDD, is to reference a dataset. This dataset can come from any storage source supported by Hadoop such as HDFS, Cassandra, HBase, Amazon S3, etc.
The third method to create a RDD is from transforming an existing RDD to create a new RDD. In other
words, let’s say you have the array of data that you parallelized earlier. Now you want to filter out strings that are shorter than 20 characters. A new RDD is created using the filter method. And it’s important remember you that Spark supports text files, SequenceFiles and any other Hadoop InputFormat.
Here is a quick example of how to create an RDD from an existing collection of data.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@525b8922

scala> val myNumbers = (1 to 20000).toList
scala> val myRDD = sc.parallelize(myNumbers)
myRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> myRDD.filter(x => x % 1000 == 0)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:29
In the examples throughout the course, unless otherwise indicated, we’re going to be using Scala to show how Spark works. In the lab exercises, you will get to work with Python. So the first thing is to launch the Spark shell. This command is located under the $SPARK_HOME/bin directory.
Once the shell is up, let’s create some data with values from 1 to 20,000. Then, create an RDD from that data using the parallelize method from the SparkContext, shown as sc on the example above. This means that the data can now be operated on in parallel.
We will cover more on the SparkContext, the sc object that is invoking the parallelized function, in our programming lesson, so for now, just know that when you initialize a shell, the SparkContext, sc, is initialized for you to use.
The parallelize method returns a pointer to the RDD. Remember, transformations operations such as parallelize, only returns a pointer to the RDD. It actually won’t create that RDD until some action is invoked on it. With this new RDD, you can perform additional transformations or actions on it such as the filter transformation.
Another way to create a RDD is from an external dataset. In the example bellow, we are creating a RDD from a text file using the textFile method of the SparkContext object. You will see plenty more examples of how to create RDD throughout this course.
scala> sc
res4: org.apache.spark.SparkContext = org.apache.spark.SparkContext@525b8922

scala> val lines = sc.textFile(/home/lserra/Temp/alerts_news.csv”)
lines: org.apache.spark.rdd.RDD[String] = /home/lserra/Temp/alerts_news.csv MapPartitionsRDD[3] at textFile at <console>:24

scala> val lineLengths = lines.map(s => s.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26

scala> val totalLengths = lineLengths.reduce((a, b) => a + b)
totalLengths: Int = 86954

scala> val wordCounts = lines.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:26

scala> wordCounts.collect()
Here we go over some basic operations. You have seen how to load a file from an external dataset. This time, however, we are loading a file from the HDFS. Loading the file creates a RDD, which is only a pointer to the file. The dataset is not loaded into memory yet.
Nothing will happen until some action is called. The transformation basically updates the direct acyclic graph (DAG). So the transformation here is saying map each line s, to the length of that line. Then, the action operation is reducing it to get the total length of all the lines. When the action is called, Spark goes through the DAG and applies all the transformation up until that point, followed by the action and then a value is returned back to the caller.
A common example is a MapReduce word count. You first split up the file by words and then map each word into a key value pair with the word as the key, and the value of 1. Then you reduce by the key, which adds up all the value of the same key, effectively, counting the number of occurrences of that key. Finally, you call the collect() function, which is an action, to have it print out all the words and its occurrences.
Next post, you will see at a high level what happens when an action is executed.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

(T) Using Apache Spark in ETL tasks - Part II

(T) Using Apache Spark in ETL tasks - Part II

Why should I use Apache Spark?

In the last post I showed you how we can use Apache Spark in ETL’s tasks. And in this post I’ll try show you how can we use Apache Spark.
Hadoop ecosystem provide abstractions that allow users to treat a cluster of computers more like a single computer - to automatically split up files and distribute storage over many machines, to automatically divide work into smaller tasks and execute them in a distributed manner, and to automatically recover from failures.
MapReduce did a revolution computation over huge data sets by offering a simple model for writing programs that could execute in parellel across hundreds to thousands of machines. The MapReduce engine achieves near linear scalability - as the data size increases, we can throw more computers at it and see jobs complete in the same amount of time - and is resilient to the fact that failures that occur rarely on a single machine occur all the time on clusters of thousands. It breaks up work into small tasks and can gracefully accommodate task failures without compromising the job to which they belong.
Apache Spark maintains MapReduce’s linear scalability and fault tolerance, but extends it in three important ways:
  • rather than relying on a rigid map-then-reduce format, its engine can execute a more general directed acyclic graph (DAG) of operators. This means that, in situations where MapReduce must write out intermediate results to the distributed filesystem, Spark can pass them directly to the next step in the pipeline.
  • it complements this capability with a rich set of transformations that enable users to express computation more naturally. It has a strong developer focus and streamlined API that can represent complex pipelines in a few lines of code.
  • extends its predecessors with in-memory processing. Its Resilient Distrituted Dataset (RDD) abstraction enables developers to materialize any point in a processing pipeline into memory across the cluster, meaning that future steps that want to deal with the same data set need not recompute it or reloaded it from disk. This capability opens up use cases that distribuited processing engines could not previously approach. Spark is well suited for highly iterative algorithms that require multiple passes over data set, as well as reactive applications that quickly respond to user queries by scanning large in-memory data sets.
Spark boasts strong integration with the variety of tools in the Hadoop ecosystem. It can read and write data in all of the data formats supported by MapReduce, allowing it to interact with the formats commonly used to store data on Hadoop like Avro and Parquet and good old CSV. It can read from and write to NoSQL databases like Hbase and Cassandra. Its stream processing library, Spark Streaming, can ingest data continuously from systems like Flume and Kafka. Its SQL library, SparkSQL, can interact with the Hive Metastore. It can run inside YARN, Hadoop’s scheduler and resource manager, allowing it to share cluster resources dynamically and to be managed with the same policies as other processing engines like MapReduce and Impala.

The Spark Programming Model

Spark programming starts with a data set or few, usually residing in some form of distributed, persistent storage like Hadoop Distributed File System (HDFS). Writing a Spark program typically consists of a few related steps:
  • defining a set of transformations on input data sets;
  • invoking actions that output the transformed data sets to persistent storage or return results to the driver’s local memory and
  • running local computations that operate on the results computed in a distributed fashion. These can help you decide what transformations and actions to undertake next.

The Spark Shell and SparkContext

In my last post I forgot to tell you that if you have a Hadoop cluster that runs a versions of Hadoop that supports YARN, you can launch the Spark jobs on the cluster by using the value of yarn-client for the Spark master:
$ spark-shell --master yarn-client
But if you’re just running on your personal computer, you can launch a local Spark cluster by specifying local[N], where N is the number of threads to run. Or you can use the symbol * to match the number of cores available on your machine.
$ spark-shell --master local[*]
Also, I can specify additional arguments to make Spark shell fully utilize your YARN resources. For example:

$ spark-shell \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    examples/jars/spark-examples*.jar \
    10
The example above starts a YARN client program which starts the default Application Master. Then the shell will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running.
For more details about it, you can read here.

What is yarn-client mode in Spark?

A Spark application consists of a driver and one or many executors. The driver program is the main program (where you instantiate SparkContext), which coordinates the executors to run the Spark application. The executors run tasks assigned by the driver.
A YARN application has the following roles: yarn client, yarn application master and list of containers running on the node managers.
When Spark application runs on YARN, it has its own implementation of yarn client and yarn application master.
With those background, the major difference is where the driver program runs.
  1. YARN Standalone Mode_: your driver program is running as a thread of the yarn application master, which itself runs on one of the node managers in the cluster. The Yarn client just pulls status from the application master. This mode is same as a mapreduce job, where the MapReduce application master coordinates the containers to run the map/reduce tasks.
  2. YARN Client Mode: your driver program is running on the yarn client where you type the command to submit the spark application (may not be a machine in the yarn cluster). In this mode, although the drive program is running on the client machine, the tasks are executed on the executors in the node managers of the YARN cluster.
For more details about it, you can read here
From the next post, we’ll deep in Resilient Distributed Dataset (RDD).
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Sunday, November 11, 2018

(T) Using Apache Spark in ETL tasks - Part I

Using Apache Spark in ETL's tasks
In this post, I’ll show you how to use Apache Spark to do a basic ETL (extract, load and transform) operation. Apache Spark™ is a fast and general engine for large-scale data processing. And you can use it in your BI or Big Data Projects.
If you want to know more about it, please visit the site.
Let’s start with the Spark Shell in Scala:
$ cd $SPARK_HOME
$ spark-shell
Let’s check if the SparkContext was initialized:
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@733fae8
Using the spark-shell in Scala let’s load a file into a variable with the help of SparkContext:
scala> var lines = sc.textFile("/home/df/Temp/alerts_news.csv")
lines: org.apache.spark.rdd.RDD[String] = /home/df/Temp/alerts_news.csv MapPartitionsRDD[1] at textFile at :24
SparkContext is a class defined in the Spark library. It is the main entry point into the Spark library. It represents a connection to a Spark cluster. It is also required to create other important objects provided by the Spark API. A Spark application must create an instance of the SparkContext class. Currently, an application can have only one active instance of SparkContext. To create another instance, it must first stop the active instance. The SparkContext class provides multiple constructors. The simplest one does not take any arguments.
The textFile method creates an RDD from a text file. It can read a file or multiple files in a directory stored on a local file system, HDFS, Amazon S3, or any other Hadoop-supported storage system. It returns an RDD of Strings, where each element represents a line in the input file.
After we’ll split each line into “pieces” of separated word:
scala> var pieces = lines.flatMap(s => s.split(" "))
pieces: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at :26
The flatMap method of a Scala collection is similar to the map. It takes a function as input, applies it to each element in a collection, and returns another collection as a result. However, the function passed to flatMap generates a collection for each element in the original collection. Thus, the result of applying the input function is a collection of collections. If the same input function were passed to the map method, it would return a collection of collections. The flatMap method instead returns a flattened collection.
Append one with each word:
scala> var tokens = pieces.map(s => (s,1))
tokens: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :28
The map method of a Scala collection applies its input function to all the elements in the collection and returns another collection. The returned collection has the exact same number of elements as the collection on which map was called. However, the elements in the returned collection need not be of the same type as that in the original collection.
Let’s calculate the frequency of each word by adding all the one’s against one word:
scala> var sumEachWord = tokens.reduceByKey((a, b) => a + b)
sumEachWord: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :30
The higher-order reduceByKey method takes an associative binary operator as input and reduces values with the same key to a single value using the specified binary operator. A binary operator takes two values as input and returns a single value as output. An associative operator returns the same result regardless of the grouping of the operands. The reduceByKey method can be used for aggregating values by key.
Let’s check out the output:
scala> sumEachWord.collect()
res1: Array[(String, Int)] = Array((PÇA,2), (VERBAIS,1), (TJRJ,4), (PERITOS,1), (11DP,1), (RESISTIU,2), (HAVER,1), (GOVERNO,21), (G1,,1), (ALEGAÇÕES,1), (SOB,2), (POSSE:,1), (ASSEMBLEIA,1), (EXCLUSIVO,1), (INFECÇÃO,1), (AUTORIZOU,1), (RIO,,3), (TUDO,3), (CARGA,1), (APRESENTOU,1), (LAGOA,,1), (AÇÃO,15), (PULMÃO,,1), (DETIDOS,1), (LANÇADO,1), (DIÁRIO,1), (TRATAR,1), (RODOANEL,2), (QUARTO,3), (AULAS,18), (IRMÃO,5), (CONCESSÃO,1), (DELE,2), (INDICIADO,2), (DETRAN,2), (ONIBUS,1), (CONTROLARAM,1), (MÉDIA,2), (AMANHÃ,,12), (-É,1), (AMANHÃ",1), (LACERDA,,1), (IDOSO,1), (7,1,1), (RECONHECER,1), (OCEÂNICO,1), (PF,,1), (INFORMA,7), (ESCORIAÇÕES,1), (31,5), (BARRA,8), (A.I.,1), (ANDERSON,1), (33,2), (CORRIGINDO,1), (RECREATIVO,1), (8H,5), (MARCO,3), (UPA,2), (METROS,2), (FAZENDINHA,,1), (NITERÓI,9)...
The collect method returns the elements in the source RDD as an array. This method should be used with caution since it moves data from all the worker nodes to the driver program. It can crash the driver program if called on a very large RDD.
Instead of print output on console, let’s save the output to a file in the local file system:
scala> sumEachWord.saveAsTextFile("/home/df/Temp/scala_news.csv")
The saveAsTextFile method saves the elements of the source RDD in the specified directory on any Hadoop-supported file system. Each RDD element is converted to its string representation and stored as a line of text.
Now let’s check out the file in the path and to confirm the content:
$ cd /home/df/Temp/scala_news.csv/
$ ls -la
$ head part-00000
$ less part-00000
That’s it!
Well, as you can see in a few minutes and some lines we did an ETL task:
  • Extract [E] - we extracted the information from a text file, using a SparkContext.
  • Transform [T] - we transformed the data into a key-value pair using the methods: flatMap, map, and reduceByKey.
  • Load [L] - we loaded the final result on the console and then we did the persistence, saving it to a local file system, using the methods: collect and saveAsTextFile.
Apache Spark is a computer platform designed to be fast and general-purpose, and easy to use. The ease of use with Spark enables you to quickly pick it up using simple APIs for Scala, Python, and Java.
Now, let’s do the same operation, but at this time we will use the spark-shell in Python instead of Scala as the programming language.
First of all, let’s start the Spark Shell in Python:
$ cd $SPARK_HOME
$ pyspark
Let’s check if the SparkContext was initialized:
>>> sc

Using the spark-shell let’s load a file into a variable with the help of SparkContext:
>>> lines = sc.textFile("/home/df/Temp/alerts_news.csv")
After we’ll split each line into “pieces” of separated word:
>>> pieces = lines.flatMap(lambda s: s.split(" "))
Append one with each word:
>>> tokens = pieces.map(lambda s: (s,1))
Let’s calculate the frequency of each word by adding all the ones against one word:
>>> sumEachWord = tokens.reduceByKey(lambda a, b: a + b)
Let’s check out the output:
>>> sumEachWord.collect()
Instead of print output on console, let’s save the output to a file in the local file system:
>>> sumEachWord.saveAsTextFile("/home/df/Temp/python_news.csv")
Now let’s check out the file in the path and to confirm the content:
$ cd /home/df/Temp/python_news.csv/
$ ls -la
$ head part-00000
$ less part-00000
As you can see, both programming languages are very similar in doing the same task. But, we can behold some differences between usage of Scala and Python for Spark, are:
  • In Scala, variables are initialized using var keyword. In Python is not necessary.
  • We use “anonymous functions” the different forms either in Python or Scala. It can be used in an application just like a string literal. It can be passed as an input to a higher-order method or function. It can also be assigned to a variable. A function literal is defined with input parameters in parenthesis, followed by a right arrow and the body of the function. The body of a function literal is enclosed in optional curly braces. An example is shown next.
Scala:
scala> def make_incrementor(n: Int) = (x: Int) => x + n

scala> val f = make_incrementor(42)
Python:
>>> def make_incrementor(n):

... return lambda x: x + n

...

>>> f = make_incrementor(42)
We can do the same tasks even in batch mode with “spark-submit” script.
A Spark application can be launched using the spark-submit script that comes with Spark. It is available in the SPARK_HOME/bin directory. The spark-submit script can be used for both running a Spark application on your development machine and deploying it on a real Spark cluster. It provides a unified interface for running a Spark application with any Spark-supported cluster manager.
However, when running in batch mode the SparkContext object will have to be initialized by the programmer. It is not available by default as it is an interactive shell.
Next steps:
You can extend this tutorial and improve your results by:
  • Load data to the database, for example, MySQL.
  • Explore others transformation methods.
  • Explore others actions methods.
For example, what are the words most popular?
With this tool, you can build upon your work and improve your results.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Friday, October 6, 2017

(A) Data Science in Practice with Python - Sample 2

In this post, I'll explain what is a recommender system, how to work it and show you some code examples. In my previous post I did a quick introduction:

Sample 2 - Recommender System

WHAT IS A RECOMMENDER SYSTEM? A model that filters information to present users with a curated subset of options they’re likely to find appealing.
HOW DOES IT WORK? Generally via a collaborative approach (considering the user’s previous behavior) or content-based approach (based on discrete assigned characteristics).

Now I'll get into in some concepts very important about recommender systems.

Recommender System in Details:

We can say that the goal of a recommender system is to make product or service recommendations to people. Of course, these recommendations should be for products or services they’re more likely to want to buy or consume.

Recommender systems are active information filtering systems which personalize the information coming to a user based on his interests, relevance of the information etc. Recommender systems are used widely for recommending movies, articles, restaurants, places to visit, items to buy etc.

1. Type of Recommendation Engines

A simple way to think about the different types of recommender are:
  • Content Filtering: “If you liked this item, you might also like …”
  • Item-Item Collaborative Filtering: “Customers who liked this item also liked …”
  • User-Item Collaborative Filtering: “Customers who are similar to you also liked …”
Confused...So, let's see in practice some cases: 

Case 1: Content-based algorithm
Idea: “If you liked this item, you might also like …”
Based on similarity of the items being recommended. It generally works well when its easy to determine the context/properties of each item. For instance when we are recommending the same kind of item like a movie recommendation or song recommendation.

Case 2: Collaborative filtering algorithm
If a person A likes item 1, 2, 3 and B like 2, 3, 4 then they have similar interests and A should like item 4 and B should like item 1.

This algorithm is entirely based on past behavior and not on the context. This makes it one of the most commonly used algorithms as it is not dependent on any additional information. For instance: product recommendations by e-commerce player like Amazon and merchant recommendations by banks like American Express.

An important point here is that in this case, there are several types of collaborative filtering algorithms. Let's see the two most important:

Item-Item Collaborative filtering:
Idea: “Customers who liked this item also liked …”
It is quite similar to the previous algorithm, but instead of finding customer look alike, we try finding item look alike. Once we have item look alike matrix, we can easily recommend alike items to a customer who has purchased an item from the store. This algorithm is far less resource consuming than user-user collaborative filtering. Hence, for a new customer, the algorithm takes far lesser time than user-user collaborate as we don’t need all similarity scores between customers. And with a fixed number of products, product-product look alike matrix is fixed over time.

User-Item Collaborative filtering:
Idea: “Customers who are similar to you also liked …
Here we find look-alike customers (based on similarity) and offer products which first customer’s look-alike has chosen in past. This algorithm is very effective but takes a lot of time and resources. It requires to compute every customer pair information which takes time. Therefore, for big base platforms, this algorithm is hard to implement without a very strong parallelization system.

But before we proceed, let me define a couple of terms:
  • Item would refer to content whose attributes are used in the recommended models. These could be movies, documents, book etc.
  • Attribute refers to the characteristic of an item. A movie tag, words in a document are examples.
Recommender System Hands-On:

As I said above, "Collaborative filtering" algorithms search large groupings of preference expressions to find similarities to some input preference or preferences. The output from these algorithms is a ranked list of suggestions that is a subset of all possible preferences, and hence, it's called "filtering". The "collaborative" comes from the use of many other peoples' preferences in order to find suggestions for themselves. This can be seen either as a search of the space of preferences (for brute-force techniques), a clustering problem (grouping similarly preferred items), or even some other predictive model.

Many algorithmic attempts have been created in order to optimize or solve this problem across sparse or large datasets, and we will discuss a few of them in this post.

The goals of this post are:
  • Understanding how to model preferences from a variety of sources
  • Learning how to compute similarities using distance metrics
To demonstrate the techniques in this post, I will use the "MovieLens" database from the University of Minnesota that contains star ratings of moviegoers for their preferred movies. You can find the data here.

I will use the smaller MoveLens 100k dataset (4.7 MB in size / ml-100k.zip) in order to load the entire model into the memory with ease. Unzip the downloaded data into the directory of your choice.

These data consists of:
  • 100,000 ratings (1-5) from 943 users on 1682 movies.
  • Each user has rated at least 20 movies.
  • Simple demographic info for the users (age, gender, occupation, zip)
  • Genre information of movies
In a zipped file, there are two main files that we will be using are as follows:
  • u.data: This contains the user moving ratings, it is the main file and it is tab delimited
  • u.item: This contains the movie information and other details, it is pipe delimited
The u.data file, the first column is the "user ID", the second column is the "movie ID", the third is the "star rating", and the last is the "timestamp".

The u.item file contains much more information, including the "ID", "title", "release date", and so on. Interestingly, this file also has a Boolean array indicating the genre(s) of each movie, including (in order) action, adventure, animation, children, comedy, crime, documentary, drama, fantasy, film noir, horror, musical, mystery, romance, sci-fi, thriller, war, and western.

Well ... let's see all this in the practice, step-by-step or better line-by-line. For this, I prepared a Jupyter notebook. You can find the notebook here.

I hope you enjoy it!

See you again on next post ... Bye, Bye!

Saturday, July 8, 2017

(A) Data Science in Practice with Python - Sample 1


The top trending in Twitter or other social network is the term “data science”. But ...
  • What’s the data science? 
  • How do real companies use data science to make products, services and operations better? 
  • How does it work? 
  • What does the data science lifecycle look like? 
This is the buzzword at the moment. A lot of people ask me about it. Are many questions. I’ll try answer all of these questions through of some samples.

Sample 1 - Regression

WHAT IS A REGRESSION? This is the better definition what I found [Source: Wikipedia] - Regression analysis is widely used for prediction and forecasting, where its use has substantial overlap with the field of machine learning.
HOW DOES IT WORK? Regression analysis is also used to understand which among the independent variables are related to the dependent variable, and to explore the forms of these relationships. In restricted circumstances, regression analysis can be used to infer causal relationships between the independent and dependent variables. However this can lead to illusions or false relationships, so caution is advisable; for example, correlation does not imply causation.

Sample 2 - Recommender System

WHAT IS A RECOMMENDER SYSTEM? A model that filters information to present users with a curated subset of options they’re likely to find appealing.
HOW DOES IT WORK? Generally via a collaborative approach (considering user’s previous behavior) or content based approach (based on discrete assigned characteristics).

Sample 3 - Credit Scoring

WHAT IS CREDIT SCORING? A model that determines an applicant’s creditworthiness for a mortgage, loan or credit card.
HOW DOES IT WORK? A set of decision management rules evaluates how likely an applicant is to repay debts.

Sample 4 - Dynamic Pricing

WHAT IS DYNAMIC PRICING? Modeling price as a function of supply, demand, competitor pricing and exogenous factors.
HOW DOES IT WORK? Generalized linear models and classification trees are popular techniques for estimating the “right” price to maximize expected revenue.

Sample 5 - Customer Churn

WHAT IS CUSTOMER CHURN? Predicting which customers are going to abandon a product or service.
HOW DOES IT WORK? Data scientists may consider using support vector machines, random forest or k-nearest-neighbors algorithms.

Sample 6 - Fraud Detection

WHAT IS FRAUD DETECTION? Detecting and preventing fraudulent financial transactions from being processed.
HOW DOES IT WORK? Fraud detection is a binary classification problem: “is this transaction legitimate or not?”

This post will be divided in 5 parts. In each one I’ll explain the machine learning techniques mentioned above. This is the first post and I’ll show you how work the sample 1: regression. But, first let’s start with the question (below):

- What’s the data science?

In my previous post “Tucson Best Buy Analysis”, you can know more about it. There I explain “what is” and show you some examples of the day-by-day of a Data Scientist.

Being so, let’s talk about sample 1: regression. To explain this let’s start with a simple problem, I could say “classic problem”, predicting house prices in Russia. A Kaggle's challenge what was closed on 06/29/2017. The goal is to predict the median value of a house in a particular area. As usual, we have some training data, where the answer is known to us. To our study and a better comprehension about this topic I created a notebook on Jupyter tool. To access the notebook, please click here.

Have fun! See you on next post ...

Sunday, June 11, 2017

(A) Tucson Best Buy Analysis

“Data! Data! Data!” he cried impatiently. 
“I can’t make bricks without clay.”
—Arthur Conan Doyle

The Ascendance of Data

We live in a world that’s drowning in data. Websites track every user’s every click. Your smartphone is building up a record of your location and speed every second of every day. “Quantified selfers” wear pedometers-on-steroids that are ever recording their heart rates, movement habits, diet, and sleep patterns. Smart cars collect driving habits, smart homes collect living habits, and smart marketers collect purchasing habits. The Internet itself represents a huge graph of knowledge that contains (among other things) an enormous cross-referenced encyclopedia; domain-specific databases
about movies, music, sports results, pinball machines, memes, and cocktails; and too many government statistics (some of them nearly true!) from too many governments to wrap your head around.

Buried in these data are answers to countless questions that no one’s ever thought to ask. In this post, we’ll talk about how to find them.

What is Data Science?

There’s a joke that says a data scientist is someone who knows more statistics than a computer scientist and more computer science than a statistician. (I didn’t say it was a good joke.) In fact, some data scientists are—for all practical purposes—statisticians, while others are pretty much indistinguishable from software engineers. Some are machine-learning experts, while others couldn’t machine-learn their way out of kin‐dergarten. Some are PhDs with impressive publication records, while others have never read an academic paper (shame on them, though). In short, pretty much no matter how you define data science, you’ll find practitioners for whom the definition is totally, absolutely wrong.

Nonetheless, we won’t let that stop us from trying. We’ll say that a data scientist is someone who extracts insights from messy data. Today’s world is full of people trying to turn data into insight.

For instance, the dating site in USA called OkCupid asks its members to answer thousands of questions in order to find the most appropriate matches for them. But it also analyzes these results to figure out innocuous-sounding questions you can ask someone to find out how likely someone is to sleep with you on the first date.

Facebook asks you to list your hometown and your current location, ostensibly to make it easier for your friends to find and connect with you. But it also analyzes these locations to identify global migration patterns and where the fanbases of different football teams live.

As a large retailer in Brazil, B2W (owner of the brands Americanas.com, Shoptime.com, Submarino.com and Soubarato.com) tracks your purchases and interactions online. And it uses the data to predictively model which of its customers are pregnant, to better market baby-related purchases to them.

In 2012, the Obama campaign employed dozens of data scientists who data-mined and experimented their way to identifying voters who needed extra attention, choosing optimal donor-specific fundraising appeals and programs, and focusing get-out-the-vote efforts where they were most likely to be useful. It is generally agreed that these efforts played an important role in the president’s re-election, which means it is a safe bet that political campaigns of the future will become more and more data-driven, resulting in a never-ending arms race of data science and data collection.

Now, before you start feeling too jaded: some data scientists also occasionally use their skills for good—using data to make government more effective, to help the homeless, and to improve public health. But it certainly won’t hurt your career if you like figuring out the best way to get people to click on advertisements.

Data Science in pratice

So, let's play with some data and to understand some activities performed by a Data Scientist.
In this exercise, the idea is to find some options to buy a Hyundai Tucson car. There is a specialized site to buy and sell cars and motorbikes (http://www.webmotors.com.br). The cars/motorbikes can be new or used. There are many advertisements that are published every day in this place. So many options: color, year, model, price, etc. And I visit this site every day to find out a good opportunity.

How work: I created an app in python to extract (crawler) these data from this site. Then I saved these data in a sqlite database, called 'crawler-motors.db'. The next step was to analyze these data. So, I created this notebook in Jupyter tool. This notebook has seven parts. In general I use pandas for data manipulation, some statistical analysis and matplotlib to create some graphics. Also, I am using approaches of machine learning to do a prediction of the sale's price throught by a linear regression model.

Tuesday, April 25, 2017

(T) Statistical Computing with Python - Part Two

This is a quick tutorial and here I'll show you “how-to do” some statistical programming tasks using python. For that, is necessary to have some basic knowledge with python and be familiar with statistical programming in a language like R, Stata, SAS, SPSS or Matlab.

Please click here to see this post in pdf.

Friday, September 16, 2016

(T) Statistical Computing with Python - Part One



This is a quick tutorial and here I'll show you “how-to do” some statistical programming tasks using python. For that, is necessary to have some basic knowledge with python and be familiar with statistical programming in a language like R, Stata, SAS, SPSS or Matlab.

Please click here to see this post in pdf.

Tuesday, June 9, 2015

(T) NumPy in Detail

In my post previous there was some examples contained matrices or other data structures of higher dimensionality—just one-dimensional vectors. To understand how NumPy treats objects with dimensions greater than one, we need to develop at least a superficial understanding for the way NumPy is implemented. It is misleading to think of NumPy as a “matrix package for Python” (although it’s commonly used as such). I find it more helpful to think of NumPy as a wrapper and access layer for underlying C buffers. These buffers are contiguous blocks of C memory, which—by their nature—are one-dimensional data structures. All elements in those data structures must be of the same size, and we can specify almost any native C type (including C structs) as the type of the individual elements. The default type corresponds to a C double and that is what we use in the examples that follow, but keep in mind that other choices are possible. All operations that apply to the data overall are performed in C and are therefore very fast.

To interpret the data as a matrix or other multidimensional data structure, the shape or layout is imposed during element access. The same 12-element data structure can therefore be interpreted as a 12-element vector or a 3×4 matrix or a 2×2×3 tensor—the shape comes into play only through the way we access the individual elements. (Keep in mind that although reshaping a data structure is very easy, resizing is not.) The encapsulation of the underlying C data structures is not perfect: when choosing the
types of the atomic elements, we specify C data types not Python types. Similarly, some features provided by NumPy allow us to manage memory manually, rather than have the memory be managed transparently by the Python runtime. This is an intentional design decision, because NumPy has been designed to accommodate large data structures—large enough that you might want (or need) to exercise a greater degree of control over the way memory is managed. For this reason, you have the ability to choose types that take up less space as elements in a collection (e.g., C float elements rather than the default double). For the same reason, all ufuncs accept an optional argument pointing to an (already allocated) location where the results will be placed, thereby avoiding the need to claim additional memory themselves. Finally, several access and structuring routines return a view (not a copy!) of the same underlying data. This does pose an aliasing problem that you need to watch out for.

The next listing quickly demonstrates the concepts of shape and views. Here, I assume that the commands are entered at an interactive Python prompt (shown as >>> in the listing). Output generated by Python is shown without a prompt:



Let’s step through this. We create two vectors of 12 elements each. Then we reshape the first one into a 3 × 4 matrix. Note that the shape property is a data member—not an accessory function! For the second vector, we create a view in the form of a 3 × 4 matrix. Now d1 and the newly created view of d2 have the same shape, so we can combine them (by forming their sum, in this case). Note that even though reshape() is a member function, it does not change the shape of the instance itself but instead returns a new view object: d2 is still a one-dimensional vector. (There is also a standalone version of this function, so we could also have written view = np.reshape( d2, (3,4) ). The presence of such redundant functionality is due to the desire to maintain backward compatibility with both of NumPy’s ancestors.)

We can now access individual elements of the data structures, depending on their shape. Since both d1 and view are matrices, they are indexed by a pair of indices (in the order [row,col]). However, d2 is still a one-dimensional vector and thus takes only a single index. (We will have more to say about indexing in a moment.) Finally, we examine some diagnostics regarding the shape of the data structures, emphasizing their precise semantics. The shape is a tuple, giving the number of elements in each dimension. The size is the total number of elements and corresponds to the value returned by len() for the entire data structure. Finally, ndim gives the number of dimensions (i.e., d.ndim == len(d.shape)) and is equivalent to the “rank” of the entire data structure. (Again, the redundant functionality exists to maintain backward compatibility.)

Finally, let’s take a closer look at the ways in which we can access elements or larger subsets of an ndarray. In the previous listing we saw how to access an individual element by fully specifying an index for each dimension. We can also specify larger sub-arrays of a data structure using two additional techniques, known as slicing and advanced indexing. The following listing shows some representative examples. (Again, consider this an interactive Python session.)


We first create a 12-element vector and reshape it into a 3 × 4 matrix as before. Slicing uses the standard Python slicing syntax start:stop:step, where the start position is inclusive but the stopping position is exclusive. (In the listing, I use only the simplest form of slicing, selecting all available elements.) There are two potential “gotchas” with slicing. First of all, specifying an explicit subscripting index (not a slice!) reduces the corresponding dimension to a scalar. Slicing, though, does not reduce the dimensionality of the data structure. Consider the two extreme cases: in the expression d[0,1], indices for both dimensions are fully specified, and so we are left with a scalar. In contrast, d[0:1,1:2] is sliced in both dimensions. Neither dimension is removed, and the resulting object is still a (two-dimensional) matrix but of smaller size: it has shape 1 × 1. The second issue to watch out for is that slices return views, not copies. Besides slicing, we can also index an ndarray with a vector of indices, by an operation called “advanced indexing.” The previous listing showed two simple examples. In the first we use a Python list object, which contains the integer indices (i.e., the positions) of the desired columns and in the desired order, to select a subset of columns. In the second example, we form an ndarray of Boolean entries to select only those rows for which the Boolean evaluates to True. In contrast to slicing, advanced indexing returns copies, not views. This completes our overview of the basic capabilities of the NumPy module. NumPy is easy and convenient to use for simple use cases but can get very confusing otherwise.

Monday, June 1, 2015

(T) NumPy in Action

The NumPy module provides effecient and convenient handling of large numerical arrays in Python. This module is used by many other libraries and projects and in this sense is a "base" technology. Let's look at some quick examples.

NumPy objects are of type ndarray. There are different ways of creating then. We can create an ndarray by:

  • Converting a Python list
  • Using a library function that returns a populated vector
  • Reading data from a file directly into a NumPy object
The listing that follows shows five different ways to create NumPy objects. First we create one by converting a Python list. Then we show two different factory routines that generate equally spaced grid points. These routines differ in how they interpret the provided boundary values: one routine includes both boundary values, and the other includes one and excludes the other. Next we create a vector filled with zeros and set each element in a loop. Finally, we read data from a text file. (I am showing only the simplest or default cases here—all these routines have many more options that can be used to influence their behavior.)



In the end, all five vectors contain identical data. You should observe that the values in the Python list used to initialize vec1 are floating-point values and that we specified the type desired for the vector elements explicitly when using the arange() function to create vec2. Now that we have created these objects, we can operate with them (see the next listing). One of the major conveniences provided by NumPy is that we can operate with NumPy objects as if they were atomic data types: we can add, subtract, and multiply them (and so forth) without the need for explicit loops. Avoiding explicit loops makes our code clearer. It also makes it faster.





All operations are performed element by element: if we add two vectors, then the corresponding elements from each vector are combined to give the element in the resulting vector. In other words, the compact expression vec1 + vec2 for v1 in the listing is equivalent to the explicit loop construction used to calculate v2. This is true even for multiplication: vec1 * vec2 will result in a vector in which the corresponding elements of both operands have been multiplied element by element. (If you want a true vector or “dot” product, you must use the dot() function instead.) Obviously, this requires that all operands have the same number of elements!

Now we shall demonstrate two further convenience features that in the NumPy documentation are referred to as broadcasting and ufuncs (short for “universal functions”). The term “broadcasting” in this context has nothing to do with messaging. Instead, it means that if you try to combine two arguments of different shapes, then the smaller one will be extended (“cast broader”) to match the larger one. This is especially useful when combining scalars with vectors: the scalar is expanded to a vector of appropriate size and whose elements all have the value given by the scalar; then the operation proceeds, element by element, as before. The term “ufunc” refers to a scalar function that can be applied to a NumPy object. The function is applied, element by element, to all entries in the NumPy object, and the result is a new NumPy object with the same shape as the original one.

Lastest Posts

(T) Using shared variables and key-value pairs