Wednesday, February 6, 2019

(T) Using shared variables and key-value pairs

(T) Using shared variables and key-value pairs

Using shared variables and key-value pairs

In this blog post, I’ll talk about Spark’s shared variables and the type of operations you can do on key-value pairs. Spark provides two limited types of shared variables for common usage patterns: broadcast variables and accumulators. Normally, when a function is passed from the driver to a worker, a separate copy of the variables are used for each worker. Broadcast variables allow each machine to work with a read-only variable cached on each machine. Spark attempts to distribute broadcast variables using efficient algorithms. As an example, broadcast variables can be used to give every node a copy of a large dataset efficiently.
The other shared variables are accumulators. These are used for counters in sums that works well in parallel. These variables can only be added through an associated operation. Only the driver can read the accumulators value, not the tasks. The tasks can only add to it. Spark supports numeric types but programmers can add support for new types. As an example, you can use accumulator variables to implement counters or sums, as in MapReduce.
Last, but not least, key-value pairs are available in Scala, Python and Java. In Scala, you create a key-value pair RDD by typing:
 scala> val kvp = ("a", "b")
To access each element, invoke the “._” notation. This is not zero-index, so the “._1” will return the value in the first index and “._2” will return the value in the second index.
scala> val kvp = ("a", "b")
scala> pair._1 // will return "a"
scala> pair._2 // will return "b"
In Python, it is a zero-index notation, so the value of the first index resides in index “0” and the second index is “1”.
>>> kvp = ("a", "b")
>>> pair[0] # will return "a"
>>> pair[1] # will return "b"
There are special operations available to RDDs of key-value pairs. In an application, you must remember to import the SparkContext package to use PairRDD Functions such as reduceByKey(). The most common ones are those that perform grouping or aggregating by a key. If you have custom objects as the key inside your key-value pair, remember that you will need to provide your own equals() method to do the comparison as well as a matching hashCode() method.
So in the example below, you have a textFile that is just a normal RDD. Then you perform some transformations on it and it creates a PairRDD which allows it to invoke the reduceByKey() method that is part of the PairRDD Functions API.
scala> val lines = sc.textFile("/home/df/Temp/alerts_new.csv")
scala> val wordCounts = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_) 
Please attention for the reduceByKey() method. This simply means that for the values of the same key, add them up together.
Another thing I want to point is that for the goal of brevity, all the functions are concatenated on one line. When you actually code it yourself, you may want split each of the functions up. For example, do the flatMap operation and return that to a RDD. Then from that RDD, do the map operation to create another RDD. Then finally, from that last RDD, invoke the reduceByKey method. That would yield multiple lines, but you would be able to test each of the transformation to see if it worked properly.
Next post, let’s understand the purpose and usage of the SparkContext.
Good luck!
If you want, you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Tuesday, January 29, 2019

(T) Working with RDDs in Apache Spark

(T) Working with RDDs in Apache Spark

Working with RDDs in Apache Spark

In my last blog post I talked about RDD (Resilient Distributed Dataset). So, let’s recap a few concepts, in special RDD Actions. RDD Action returns values. Again, you can find more information on Spark’s website. Here is a subset of some of the actions available:
  • The collect() function returns all the elements of the dataset as an array of the driver program. This is usually useful after a filter or another operation that returns a significantly small subset of data to make sure your filter function works correctly.
  • The count() function returns the number of elements in a dataset and can also be used to check and test transformations.
  • The take() function returns an array with the first n elements. Note that this is currently not executed in parallel. The driver computes all the elements.
  • The foreach() function run a function func on each element of the dataset.
And what’s the difference between Actions and Transformations?
Remember that Transformations are essentially lazy evaluations. Nothing is executed until an action is called. Each transformation function basically updates the graph and when an action is called, the graph is executed. Transformation returns a pointer to the new RDD. Here is a subset of some of the transformations available. The full list of them can be found on Spark’s website.
  • The flatMap() function is similar to map, but each input can be mapped to 0 or more output items. What this means is that the returned pointer of the func method, should return a sequence of objects, rather than a single item. It would mean that the flatMap would flatten a list of lists for the operations that follows. Basically this would be used for MapReduce operations where you might have a text file and each time a line is read in, you split that line up by spaces to get individual keywords. Each of those lines ultimately is flatten so that you can perform the map operation on it to map each keyword to the value of one.
  • The join() function combines two sets of key value pairs and return a set of keys to a pair of values from the two initial set. For example, you have a K,V pair and a K,W pair. When you join them together, you will get a K, (V,W) set.
  • The reduceByKey() function aggregates on each key by using the given reduce function. This is something you would use in a WordCount to sum up the values for each word to count its occurrences, for example.
Now I want to get a bit into RDD persistence. You have seen this used already. That is the cache function. The cache function is actually the default of the persist function with the MEMORY_ONLY storage.
One of the key capability of Spark is its speed through persisting or caching. Each node stores any partitions of the cache and computes it in memory. When a subsequent action is called on the same dataset, or a derived dataset, it uses it from memory instead of having to retrieve it again. Future actions in such cases are often 10 times faster. The first time a RDD is persisted, it is kept in memory on the node. Caching is fault tolerant because if it any of the partition is lost, it will automatically be recomputed using the transformations that originally created it.
There are two methods to invoke RDD persistence: persist() and cache().
The persist() method allows you to specify a different storage level of caching. For example, you can choose
to persist the data set on disk, persist it in memory but as serialized objects to save space, etc.
Again the cache() method is just the default way of using persistence by storing deserialized objects in memory.
Here you can see different storage levels of caching and now let’s know what it means each one:
  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc
  • OFF_HEAP
Basically, you can choose to store in memory or memory and disk. If a partition does not fit in the specified cache location, then it will be recomputed on the fly. You can also decide to serialized the objects before storing this. This is space efficient, but will require the RDD to deserialized before it can be read, so it takes up more CPU workload. There’s also the option to replicate each partition on two cluster nodes.
There are many different levels, but don’t worry. Then, which kind of storage level to choose?
There are trade-offs between the different storage levels. You should analyze your current situation to decide which level works best. You can find more details about this on Spark’s website.
But, basically if your RDD fits within the default storage level, by all means, use MEMORY_ONLY.
It is the fastest option to fully take advantage of Spark’s design. If not, you can serialized the RDD and use the MEMORY_ONLY_SER level. Just be sure to choose a fast serialization library to make the objects more space efficient and still reasonably fast to access.
Don’t spill to disk unless the functions that compute your datasets are expensive or it requires a large amount of space.
If you want fast recovery, use the replicated storage levels. All levels are fully fault tolerant, but would still require the recomputing of the data. If you have a replicated copy, you can continue to work while Spark is reconstruction a lost partition.
It allows you to share the same pool of memory and significantly reduces garbage collection costs. Also, the cached data is not lost if the individual executors crash.
Next post, I’ll talk about shared variables and key-value pairs.
Good luck!
If you want you can contact me to explain some doubts, make suggestions or critical…or to know my services - please send me an e-mail, or visit my website for more details.

Lastest Posts

(T) Using shared variables and key-value pairs