Spark Performance Tuning

Apache Spark overview

Analytics is increasingly an integral part of day-to-day operations at today's leading
businesses, and transformation is also occurring through huge growth in mobile and digital
channels. Previously acceptable response times and delays for analytic insight are no longer
viable, with more push toward real-time and in-transaction analytics. In addition, data science skills are increasingly in demand. As a result, enterprise organizations are attempting to leverage analytics in new ways and transition existing analytic capability to respond with more flexibility, while making the most efficient use of highly valuable data science skills.

Although the demand for more agile analytics across the enterprise is increasing, many of
today’s solutions are aligned to specific platforms, tied to inflexible programming models,
require vast data movements into data lakes. These lakes quickly become stale and
unmanageable, resulting in pockets of analytics and insight that require ongoing manual
intervention to integrate into coherent analytics solutions.

With all these impending forces converging, organizations are well-poised for a change. The
recent growth and adoption of Apache Spark as an analytics framework and platform is timely and helps meet these challenging demands.

What is Apache Spark

Apache Spark is an open source, in-memory analytics computing framework offered by the
Apache Foundation. It offers high performance for both batch and interactive processing. It exposes APIs for Java, Python, and Scala and consists of Spark core and several related projects.
Hive is not designed for online transaction processing.  It is best used for traditional data warehousing tasks.

• Spark SQL - Module for working with structured data. Allows you to seamlessly mix SQL queries with Spark programs.
• Spark Streaming - API that allows you to build scalable fault-tolerant streaming applications.
• MLlib - API that implements common machine learning algorithms.
• GraphX - API for graphs and graph-parallel computation.
Spark Ecosystem

How spark executes your program

A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster.

The driver is the process that is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running.

A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime.
Spark Architecture

Spark’s performance optimization

When you write Apache Spark code and page through the public APIs, you come across words like transformation, action, and RDD. Similarly, when things start to fail, or when you venture into the web UI to try to understand why your application is taking so long, you’re confronted with a new vocabulary of words like job, stage, and task. Understanding of these words play very important role in performance tuning.

Spark programs can be bottlenecked by any of resource i.e CPU, network bandwidth, or memory.
If the data fits in memory, the bottleneck is network bandwidth, but sometimes, you also need to do some tuning, such as storing RDDs in serialized form, to decrease memory usage.
This guide will cover two main topics:

Data serialization, which is crucial for good network performance and can also reduce memory use, and memory tuning.
Data Serialization

Serialization plays an important role in the performance of any distributed application. Formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation.
Often, this will be the first thing you should tune to optimize a Spark application. It provides two serialization libraries:
·        Java serialization: By default, Spark serializes objects using Java’s ObjectOutputStream framework, and can work with any class you create that implements java.io.Serializable. You can also control the performance of your serialization more closely by extending java.io.Externalizable.
Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes.
·        Kryo serialization: Spark can also use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x.
You can switch to using Kryo by initializing your job with a SparkConf and calling conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). 
 This setting configures the serializer used for not only shuffling data between worker nodes but also when serializing RDDs to disk.
Memory Tuning

There are three considerations in tuning memory usage:
·        the amount of memory used by your objects (you may want your entire dataset to fit in memory),
·        the cost of accessing those objectsand
·        the overhead of garbage collection 

By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space than the “raw” data inside their fields. This is due to several reasons:
Each distinct Java object has an “object header”, which is about 16 bytes and contains information such as a pointer to its class.
Java Strings have about 40 bytes of overhead over the raw string data (since they store it in an array of Chars and keep extra data such as the length)
Common collection classes, such as HashMap and LinkedList. This object not only has a header, but also pointers (typically 8 bytes each) to the next object in the list.
How to improve it – either by changing your data structures, or by storing data in a serialized format.
Memory Management Overview

Memory usage in Spark largely falls under one of two categories: execution and storage.
Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations,
Storage memory refers to that used for caching and propagating internal data across the cluster.
In Spark, execution and storage share a unified region (M).
When no execution memory is used, storage can acquire all the available memory and vice versa. Execution may evict storage if necessary, but only until total storage memory usage falls under a certain threshold (R).
In other words, R describes a sub region within M where cached blocks are never evicted.
Storage may not evict execution due to complexities in implementation.
Apache Spark Unified Memory Manager
This design ensures several desirable properties.
First, applications that do not use caching can use the entire space for execution, obviating unnecessary disk spills.
Second, applications that do use caching can reserve a minimum storage space (R) where their data blocks are immune to being evicted.
Lastly, this approach provides reasonable out-of-the-box performance for a variety of workloads without requiring user expertise of how memory is divided internally.
Although there are two relevant configurations, the typical user should not need to adjust them as the default values are applicable to most workloads:
·        spark.memory.fraction expresses the size of M as a fraction of the (JVM heap space - 300MB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.
·        spark.memory.storageFraction expresses the size of R as a fraction of M (default 0.5). R is the storage space within M where cached blocks immune to being evicted by execution.
The value of spark.memory.fraction should be set in order to fit this amount of heap space comfortably within the JVM’s old or “tenured” generation. See the discussion of advanced GC tuning below for details.
Determining Memory Consumption

The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it into cache, and look at the “Storage” page in the web UI. The page will tell you how much memory the RDD is occupying.
Tuning Data Structures
The first way to reduce memory consumption is to avoid the Java features that add overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this:
1.      Design your data structures to prefer arrays of objects, and primitive types, instead of the standard Java or Scala collection classes (e.g. HashMap). The fastutil library provides convenient collection classes for primitive types that are compatible with the Java standard library.
2.      Avoid nested structures with a lot of small objects and pointers when possible.
3.      Consider using numeric IDs or enumeration objects instead of strings for keys.
4.      If you have less than 32 GB of RAM, set the JVM flag -XX:+UseCompressedOops to make pointers be four bytes instead of eight. You can add these options in spark-env.sh.

Serialized RDD Storage

When your objects are still too large to efficiently store despite this tuning, a much simpler way to reduce memory usage is to store them in serialized form, using the serialized StorageLevels in the RDD persistence API, such as MEMORY_ONLY_SER. Spark will then store each RDD partition as one large byte array.
The only downside of storing data in serialized form is slower access times, due to having to deserialize each object on the fly. We highly recommend using Kryo if you want to cache data in serialized form, as it leads to much smaller sizes than Java serialization (and certainly than raw Java objects).
When caching in Spark, there are two options
1.      Raw storage
2.      Serialized

Here are some differences between the two options
Raw caching
Serialized Caching
Pretty fast to process
Slower processing than raw caching
Can take up 2x-4x more spaceFor example, 100MB data cached could   consume 350MB memory
Overhead is minimal
can put pressure in JVM and JVM garbage collection
less pressure
usage:rdd.persist(
StorageLevel.MEMORY_ONLY)  or  rdd.cache()
usage:rdd.persist(
StorageLevel.MEMORY_ONLY_SER)

Garbage Collection Tuning

JVM garbage collection can be a problem when you have large “churn” in terms of the RDDs stored by your program. (It is usually not a problem in programs that just read an RDD once and then run many operations on it.)
When Java needs to evict old objects to make room for new ones, it will need to trace through all your Java objects and find the unused ones.
The main point to remember here is that the cost of garbage collection is proportional to the number of Java objects, so using data structures with fewer objects (e.g. an array of Ints instead of a LinkedList) greatly lowers this cost. An even better method is to persist objects in serialized form, as described above: now there will be only one object (a byte array) per RDD partition. Before trying other techniques, the first thing to try if GC is a problem is to use serialized caching.
GC can also be a problem due to interference between your tasks’ working memory (the amount of space needed to run the task) and the RDDs cached on your nodes. We will discuss how to control the space allocated to the RDD cache to mitigate this.
Broadcasting Large Variables

Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program inside of them (e.g. a static lookup table), consider turning it into a broadcast variable. Spark prints the serialized size of each task on the master, so you can look at that to decide whether your tasks are too large; in general tasks larger than about 20 KB are probably worth optimizing.
Data Locality

Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:
·        PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality possible
·        NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
·        NO_PREF data is accessed equally quickly from anywhere and has no locality preference
·        RACK_LOCAL data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
·        ANY data is elsewhere on the network and not in the same rack

Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels.
There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see the spark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.
Spark’s performance tuning best practices

1.      Monitor job stages by Spark UI
2.      Use the right level of parallelism for distributed shuffles, such as groupByKey and reduceByKey.
3.      Reduce working set size
4.      Avoid groupByKey for associative operations, use reduceByKey instead.
5.      Avoid reduceByKey when the input and output value types are different
6.      Avoid the flatMap-join-groupBy pattern
7.      Use broadcast variables
8.      Play around with executor, core and memory
9.      Cache judiciously
10.   Use write StorageLevel as per RDD Size
11.   Don’t collect large RDDs
12.   Minimize amount of data shuffled
13.   Avoid data shuffling by Data Serialization.
14.   Monitor Garbage Collection

1 comments:

  1. Hi thanks to share ur knowledge. yes i agree with you if applied all these 14 optimization techniques easily spark optimize performance.

    Venu spark training in Hyderabad

    bigdata training in Hyderabad

    ReplyDelete