Apache Spark overview
Analytics is
increasingly an integral part of day-to-day operations at today's leading
businesses,
and transformation is also occurring through huge growth in mobile and digital
channels.
Previously acceptable response times and delays for analytic insight are no
longer
viable, with
more push toward real-time and in-transaction analytics. In addition, data
science skills are increasingly in demand. As a result, enterprise organizations
are attempting to leverage analytics in new ways and transition existing
analytic capability to respond with more flexibility, while making the most
efficient use of highly valuable data science skills.
Although the
demand for more agile analytics across the enterprise is increasing, many of
today’s
solutions are aligned to specific platforms, tied to inflexible programming
models,
require vast
data movements into data lakes. These lakes quickly become stale and
unmanageable,
resulting in pockets of analytics and insight that require ongoing manual
intervention
to integrate into coherent analytics solutions.
With all
these impending forces converging, organizations are well-poised for a change.
The
recent
growth and adoption of Apache Spark as an analytics framework and platform is
timely and helps meet these challenging demands.
What is Apache Spark
Apache Spark
is an open source, in-memory analytics computing framework offered by the
Apache
Foundation. It offers high performance for both batch and interactive
processing. It exposes APIs for Java, Python, and Scala and consists of
Spark core and several related projects.
Hive is not
designed for online transaction processing.
It is best used for traditional data warehousing tasks.
• Spark
SQL - Module for working with structured data. Allows you to seamlessly
mix SQL queries with Spark programs.
• Spark
Streaming - API that allows you to build scalable fault-tolerant streaming
applications.
• MLlib -
API that implements common machine learning algorithms.
•
GraphX - API for graphs and graph-parallel computation.
How spark executes your program
A Spark
application consists of a single driver process and a set
of executor processes scattered across nodes on the cluster.
The driver
is the process that is in charge of the high-level control flow of work that
needs to be done. The executor processes are responsible for executing this
work, in the form of tasks, as well as for storing any data that the user
chooses to cache. Both the driver and the executors typically stick around for
the entire time the application is running.
A single
executor has a number of slots for running tasks, and will run many
concurrently throughout its lifetime.
![]() |
Spark Architecture |
Spark’s performance optimization
When you
write Apache Spark code and page through the public APIs, you come across words
like transformation, action, and RDD. Similarly, when things
start to fail, or when you venture into the web UI to try to understand why
your application is taking so long, you’re confronted with a new vocabulary of
words like job, stage, and task. Understanding of these words
play very important role in performance tuning.
Spark
programs can be bottlenecked by any of resource i.e CPU, network bandwidth, or
memory.
If the data
fits in memory, the bottleneck is network bandwidth, but sometimes, you also
need to do some tuning, such as storing RDDs in serialized form, to
decrease memory usage.
This guide
will cover two main topics:
Data
serialization, which is crucial for good network performance and can also
reduce memory use, and memory tuning.
Data Serialization
Serialization plays an important role in the performance of any
distributed application. Formats that are slow to serialize objects into, or
consume a large number of bytes, will greatly slow down the computation.
Often, this will be the first thing you should tune to optimize a Spark
application. It provides two serialization libraries:
·
Java
serialization: By default, Spark serializes objects using Java’s ObjectOutputStream framework,
and can work with any class you create that implements java.io.Serializable.
You can also control the performance of your serialization more closely by
extending java.io.Externalizable.
Java serialization is flexible but often quite slow, and leads to large
serialized formats for many classes.
·
Kryo serialization: Spark can also use the
Kryo library (version 2) to serialize objects more quickly. Kryo is
significantly faster and more compact than Java serialization (often as much as
10x.
You can switch to using Kryo by initializing your job with a SparkConf and
calling conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").
This setting configures the serializer used for not only shuffling
data between worker nodes but also when serializing RDDs to disk.
Memory Tuning
There are three
considerations in tuning memory usage:
·
the amount of memory used by your objects (you may want your entire
dataset to fit in memory),
·
the cost of accessing those objects, and
·
the overhead
of garbage collection
By default, Java objects are fast to access, but can easily consume a
factor of 2-5x more space than the “raw” data inside their fields. This is due
to several reasons:
Each distinct Java object has an “object header”, which is about 16
bytes and contains information such as a pointer to its class.
Java Strings have about 40 bytes of overhead over the raw string data
(since they store it in an array of Chars and keep extra data such as the
length)
Common collection classes, such as HashMap and LinkedList. This object
not only has a header, but also pointers (typically 8 bytes each) to the next
object in the list.
How to improve it – either by changing your data structures, or by
storing data in a serialized format.
Memory Management Overview
Memory usage in Spark largely falls under one of two categories:
execution and storage.
Execution memory refers to that used for computation in shuffles, joins, sorts and
aggregations,
Storage memory refers to that used for caching and propagating internal data across
the cluster.
In Spark, execution and storage share a unified region (M).
When no execution memory is used, storage can acquire all the available
memory and vice versa. Execution may evict storage if necessary, but only until
total storage memory usage falls under a certain threshold (R).
In other words, R describes a sub region within M where cached blocks
are never evicted.
Storage may not evict execution due to complexities in implementation.
This design ensures several desirable properties.
First, applications that do not use caching can use the entire space for
execution, obviating unnecessary disk spills.
Second, applications that do use caching can reserve a minimum storage
space (R) where their data blocks are immune to being evicted.
Lastly, this approach provides reasonable out-of-the-box performance for
a variety of workloads without requiring user expertise of how memory is
divided internally.
Although there are two relevant configurations, the typical user should
not need to adjust them as the default values are applicable to most workloads:
·
spark.memory.fraction expresses the size
of M as a fraction of the (JVM heap space - 300MB) (default 0.6). The
rest of the space (40%) is reserved for user data structures, internal metadata
in Spark, and safeguarding against OOM errors in the case of sparse and
unusually large records.
·
spark.memory.storageFraction expresses the size
of R as a fraction of M (default 0.5). R is the storage space within M where cached blocks
immune to being evicted by execution.
The value of spark.memory.fraction should be set in
order to fit this amount of heap space comfortably within the JVM’s old or
“tenured” generation. See the discussion of advanced GC tuning below for
details.
Determining Memory Consumption
The best way to size the amount of memory consumption a dataset will
require is to create an RDD, put it into cache, and look at the “Storage” page
in the web UI. The page will tell you how much memory the RDD is occupying.
Tuning Data Structures
The first way to reduce memory consumption is to avoid the Java features
that add overhead, such as pointer-based data structures and wrapper objects.
There are several ways to do this:
1.
Design your data structures to prefer arrays
of objects, and primitive types, instead of the standard Java or Scala
collection classes (e.g. HashMap). The fastutil library provides
convenient collection classes for primitive types that are compatible with the
Java standard library.
2.
Avoid nested structures with a lot of small
objects and pointers when possible.
3.
Consider using numeric IDs or enumeration
objects instead of strings for keys.
4.
If you have less than 32 GB of RAM, set the JVM
flag -XX:+UseCompressedOops to make pointers be four bytes instead of
eight. You can add these options in spark-env.sh.
Serialized RDD Storage
When your objects are still too large to efficiently store despite this
tuning, a much simpler way to reduce memory usage is to store them
in serialized form, using the serialized StorageLevels in
the RDD persistence API, such as MEMORY_ONLY_SER. Spark will then
store each RDD partition as one large byte array.
The only downside of storing data in serialized form is slower access
times, due to having to deserialize each object on the fly. We highly
recommend using Kryo if you want to cache data in serialized form, as
it leads to much smaller sizes than Java serialization (and certainly than raw
Java objects).
When caching in Spark, there are two options
1.
Raw storage
2.
Serialized
Here are some differences between the two
options
Raw caching
|
Serialized Caching
|
Pretty fast to process
|
Slower processing than raw caching
|
Can take up 2x-4x more spaceFor example, 100MB data cached could
consume 350MB memory
|
Overhead is minimal
|
can put pressure in JVM and JVM garbage collection
|
less pressure
|
usage:rdd.persist(
StorageLevel.MEMORY_ONLY) or rdd.cache()
|
usage:rdd.persist(
StorageLevel.MEMORY_ONLY_SER)
|
Garbage Collection Tuning
JVM garbage collection can be a problem when you have large “churn” in
terms of the RDDs stored by your program. (It is usually not a problem in
programs that just read an RDD once and then run many operations on it.)
When Java needs to evict old objects to make room for new ones, it will
need to trace through all your Java objects and find the unused ones.
The main point to remember here is that the cost of garbage
collection is proportional to the number of Java objects, so using data
structures with fewer objects (e.g. an array of Ints instead of
a LinkedList) greatly lowers this cost. An even better method is to
persist objects in serialized form, as described above: now there will be
only one object (a byte array) per RDD partition. Before trying other
techniques, the first thing to try if GC is a problem is to use serialized
caching.
GC can also be a problem due to interference between your tasks’ working
memory (the amount of space needed to run the task) and the RDDs cached on your
nodes. We will discuss how to control the space allocated to the RDD cache to
mitigate this.
Broadcasting Large Variables
Using
the broadcast
functionality available in SparkContext can greatly reduce the size of each serialized
task, and the cost of launching a job over a cluster. If your tasks use any
large object from the driver program inside of them (e.g. a static lookup
table), consider turning it into a broadcast variable. Spark prints the
serialized size of each task on the master, so you can look at that to decide
whether your tasks are too large; in general tasks larger than about 20 KB are
probably worth optimizing.
Data Locality
Data
locality can have a major impact on the performance of Spark jobs. If data and
the code that operates on it are together then computation tends to be fast.
But if code and data are separated, one must move to the other. Typically it is
faster to ship serialized code from place to place than a chunk of data because
code size is much smaller than data. Spark builds its scheduling around this
general principle of data locality.
Data
locality is how close data is to the code processing it. There are several
levels of locality based on the data’s current location. In order from closest
to farthest:
·
PROCESS_LOCAL data is in the same JVM as the running code. This
is the best locality possible
·
NODE_LOCAL data is on the same node. Examples might be in
HDFS on the same node, or in another executor on the same node. This is a
little slower than PROCESS_LOCAL because
the data has to travel between processes
·
NO_PREF data is accessed equally quickly from anywhere and
has no locality preference
·
RACK_LOCAL data is on the same rack of servers. Data is on a
different server on the same rack so needs to be sent over the network,
typically through a single switch
·
ANY data is elsewhere on the network and not in the
same rack
Spark
prefers to schedule all tasks at the best locality level, but this is not
always possible. In situations where there is no unprocessed data on any idle
executor, Spark switches to lower locality levels.
There
are two options: a) wait until a busy CPU frees up to start a task on data on
the same server, or b) immediately start a new task in a farther away place
that requires moving data there.
What
Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once
that timeout expires, it starts moving the data from far away to the free CPU.
The wait timeout for fallback between each level can be configured individually
or all together in one parameter; see the spark.locality parameters on the configuration page for details.
You should increase these settings if your tasks are long and see poor
locality, but the default usually works well.
Spark’s performance tuning best practices
1.
Monitor job stages by Spark UI
2.
Use the right level of parallelism for distributed shuffles, such as
groupByKey and reduceByKey.
3.
Reduce working set size
4.
Avoid groupByKey for associative operations, use reduceByKey
instead.
5.
Avoid reduceByKey when the input and output value types are different
6.
Avoid the flatMap-join-groupBy pattern
7.
Use broadcast variables
8.
Play around with executor, core and memory
9.
Cache judiciously
10.
Use write StorageLevel as per RDD Size
11.
Don’t collect large RDDs
12.
Minimize amount of data shuffled
13.
Avoid data shuffling by Data Serialization.
14.
Monitor Garbage Collection
Hello Ashish kumar,
ReplyDeleteThe Article on Apache Spark overview is nice.It give detail information about it .Thanks for Sharing the information about Spark Performance Tuning. big data scientist