Distribution of Executors, Cores and Memory for a Spark Application
Resource Allocation is an important aspect during the execution of any spark job. If not configured correctly, a spark job can consume entire cluster resources and make other applications starve for resources
Let’s start with some basic definitions of the terms used in handling Spark applications.
Partitions : A partition is a small chunk of a large distributed data set. Spark manages data using partitions that helps parallelize data processing with minimal data shuffle across the executors.
Task : A task is a unit of work that can be run on a partition of a distributed dataset and gets executed on a single executor. The unit of parallel execution is at the task level.All the tasks with-in a single stage can be executed in parallel
Executor : An executor is a single JVM process which is launched for an application on a worker node. Executor runs tasks and keeps data in memory or disk storage across them. Each application has its own executors. A single node can run multiple executors and executors for an application can span multiple worker nodes. An executor stays up for the
duration of the Spark Application and runs the tasks in multiple threads. The number of executors for a spark application can be specified inside the SparkConf or via the flag –num-executors from command-line.
Cluster Manager : An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN). Spark is agnostic to a cluster manager as long as it can acquire executor processes and those can communicate with each other.We are primarily interested in Yarn as the cluster manager. A spark cluster can run in either yarn cluster or yarn-client mode:
yarn-client mode – A driver runs on client process, Application Master is only used for requesting resources from YARN.
yarn-cluster mode – A driver runs inside application master process, client goes away once the application is initialized
Cores : A core is a basic computation unit of CPU and a CPU may have one or more cores to perform tasks at a given time. The more cores we have, the more work we can do. In spark, this controls the number of parallel tasks an executor can run.
Distribution of Executors, Cores and Memory for a Spark Application running in Yarn:
spark-submit –class –num-executors ? –executor-cores ? –executor-memory ? ….
Ever wondered how to configure –num-executors, –executor-memory and –execuor-cores spark config params for your cluster?
Following list captures some recommendations to keep in mind while configuring them:
- Hadoop/Yarn/OS Deamons: When we run spark application using a cluster manager like Yarn, there’ll be several daemons that’ll run in the background like NameNode, Secondary NameNode, DataNode, JobTracker and TaskTracker. So, while specifying num-executors, we need to make sure that we leave aside enough cores (~1 core per node) for these daemons to run smoothly.
- Yarn ApplicationMaster (AM): ApplicationMaster is responsible for negotiating resources from the ResourceManager and working with the NodeManagers to execute and monitor the containers and their resource consumption. If we are running spark on yarn, then we need to budget in the resources that AM would need (~1024MB and 1 Executor).
- HDFS Throughput: HDFS client has trouble with tons of concurrent threads. It was observed that HDFS achieves full write throughput with ~5 tasks per executor . So it’s good to keep the number of cores per executor below that number.
- MemoryOverhead: Following picture depicts spark-yarn-memory-usage.
Two things to make note of from this picture:
Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
Max(384MB, 7% of spark.executor-memory)
So, if we request 20GB per executor, AM will actually get 20GB + memoryOverhead = 20 + 7% of 20GB = ~23GB memory for us.
- Running executors with too much memory often results in excessive garbage collection delays.
- Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM.
There are two ways in which we configure the executor and core details to the Spark job. They are:
- Static Allocation – The values are given as part of spark-submit
- Dynamic Allocation – The values are picked up based on the requirement (size of data, amount of computations needed) and released after use. This helps the resources to be re-used for other applications.
Now, let’s consider a 10 node cluster with following config and analyse different possibilities of executors-core-memory distribution:
16 cores per Node
64GB RAM per Node
First Approach: Tiny executors [One Executor per core]:
Tiny executors essentially means one executor per core. Following table depicts the values of our spar-config params with this approach:
--num-executors = In this approach, we'll assign one executor per core
= num-cores-per-node * total-nodes-in-cluster
= 16 x 10 = 160
--executor-cores = 1 (one executor per core)
--executor-memory = amount of memory per executor
= 64GB/16 = 4GB
Analysis: With only one executor per core, as we discussed above, we’ll not be able to take advantage of running multiple tasks in the same JVM. Also, shared/cached variables like broadcast variables and accumulators will be replicated in each core of the nodes which is 16 times. Also, we are not leaving enough memory overhead for Hadoop/Yarn daemon processes and we are not counting in ApplicationManager. NOT GOOD!
Second Approach: Fat executors (One Executor per node):
Fat executors essentially means one executor per node. Following table depicts the values of our spark-config params with this approach:
--num-executors = In this approach, we'll assign one executor per node
--executor-cores = one executor per node means all the cores of the node are assigned to one executor
--executor-memory = amount of memory per executor
= 64GB/1 = 64GB
Analysis: With all 16 cores per executor, apart from ApplicationManager and daemon processes are not counted for, HDFS throughput will hurt and it’ll result in excessive garbage results. Also,NOT GOOD!
Third Approach: Balance between Fat (vs) Tiny
According to the recommendations which we discussed above:
- Based on the recommendations mentioned above, Let’s assign 5 core per executors =>
--executor-cores = 5(for good HDFS throughput)
- Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15
- So, Total available of cores in cluster = 15 x 10 = 150
Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30
- Leaving 1 executor for ApplicationManager =>
- Number of executors per node = 30/10 = 3
- Memory per executor = 64GB/3 = 21GB
- Counting off heap overhead = 7% of 21GB = 3GB. So, actual
--executor-memory= 21 – 3 = 18GB
So, recommended config is: 29 executors, 18GB memory each and 5 cores each!!
Analysis: It is obvious as to how this third approach has found right balance between Fat vs Tiny approaches. Needless to say, it achieved parallelism of a fat executor and best throughputs of a tiny executor!!
Note: Upper bound for the number of executors if dynamic allocation is enabled is infinity. So this says that spark application can eat away all the resources if needed. In a cluster where we have other applications running and they also need cores to run the tasks, we need to make sure that we assign the cores at cluster level.
This means that we can allocate specific number of cores for YARN based applications based on user access. So we can create a spark_user and then give cores (min/max) for that user. These limits are for sharing between spark and other applications which run on YARN.
To understand dynamic allocation, we need to have knowledge of the following properties:
spark.dynamicAllocation.enabled – when this is set to true we need not mention executors. The reason is below:
The static parameter numbers we give at spark-submit is for the entire job duration. However if dynamic allocation comes into picture, there would be different stages like the following:
What is the number for executors to start with:
Initial number of executors (spark.dynamicAllocation.initialExecutors) to start with
Controlling the number of executors dynamically:
Then based on load (tasks pending) how many executors to request. This would eventually be the number what we give at spark-submit in static way. So once the initial executor numbers are set, we go to min (spark.dynamicAllocation.minExecutors) and max (spark.dynamicAllocation.maxExecutors) numbers.
When to ask new executors or give away current executors:
When do we request new executors (spark.dynamicAllocation.schedulerBacklogTimeout) – This means that there have been pending tasks for this much duration. So the request for the number of executors requested in each round increases exponentially from the previous round. For instance, an application will add 1 executor in the first round, and then 2, 4, 8 and so on executors in the subsequent rounds. At a specific point, the above property max comes into picture.
When do we give away an executor is set using spark.dynamicAllocation.executorIdleTimeout.
To conclude, if we need more control over the job execution time, monitor the job for unexpected data volume the static numbers would help. By moving to dynamic, the resources would be used at the background and the jobs involving unexpected volumes might affect other applications.