Why do Spark jobs fail with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 in speculation mode?

Apache Spark

Apache Spark Problem Overview


I'm running a Spark job with in a speculation mode. I have around 500 tasks and around 500 files of 1 GB gz compressed. I keep getting in each job, for 1-2 tasks, the attached error where it reruns afterward dozens of times (preventing the job to complete).

> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

Any idea what is the meaning of the problem and how to overcome it?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
    at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

Apache Spark Solutions


Solution 1 - Apache Spark

This happened to me when I gave more memory to the worker node than it has. Since it didn't have swap, spark crashed while trying to store objects for shuffling with no more memory left.

Solution was to either add swap, or configure the worker/executor to use less memory in addition with using MEMORY_AND_DISK storage level for several persists.

Solution 2 - Apache Spark

We had a similar error with Spark, but I'm not sure it's related to your issue.

We used JavaPairRDD.repartitionAndSortWithinPartitions on 100GB data and it kept failing similarly to your app. Then we looked at the Yarn logs on the specific nodes and found out that we have some kind of out-of-memory problem, so the Yarn interrupted the execution. Our solution was to change/add spark.shuffle.memoryFraction 0 in .../spark/conf/spark-defaults.conf. That allowed us to handle a much larger (but unfortunately not infinite) amount of data this way.

Solution 3 - Apache Spark

I got the same issue on my 3 machine YARN cluster. I kept changing RAM but the issue persisted. Finally I saw the following messages in the logs:

17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms

and after this, there was this message:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67

I modified the properties in spark-defaults.conf as follows:

spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000

That's it! My job completed successfully after this.

Solution 4 - Apache Spark

The error arises when there is a lot of data in a particular spark partition. The way to solve this is to do the following steps:

  1. Increase the number of shuffle-partitions: --conf spark.sql.shuffle.partitions=
  2. In normal cases the number of partitions should be set to number of executors * number of cores per executor . But this kind of partitioning scheme will be problematic if we have huge amount of data. See the example below.

Suppose we had the following data and we had three executors with 1 core each , so the number of partitions(physical-partitions) in this case would be 3

 Data:  1,2,3,4,5,6,7,8,9,13,16,19,22

 Partitions:  1,2,3 
 Distribution of Data in Partitions (partition logic based on modulo by 3)

          1-> 1,4,7,13,16,19,22
          2-> 2,5,8
          3->3,6,9
 From above we can see that there is data skew, partition 1 is having more 
 data than the rest
 
 Now lets increase the number of partitions to : number of executors * number 
 of cores per executor*2 = 6 (in our example. These 6 partitions will be 
 logical partitions.Now each executor will be having 2 logical partitions 
 instead of 1 .Data partitioning will be based on modulo 6 instead of 3.

 Partitions of data in each executor:

        1->(0,1)->1,6,7,13,19
        2->(2,3)-->2,3,8,9
        3->(4,5)->4,5,16,22
The increase in logical partitions leads to fair partitioning.

3. The next thing you can do after increasing the number of shuffle partitions is to decrease the storage part of the spark memory if you are not persisting or caching any dataframe. By default the storage part is 0.5 and execution part is also 0.5 . To reduce the storage part you can set in your spark-submit command the following configuration

        --conf spark.memory.storageFraction=0.3

4.) Apart from the above two things you can also set executor overhead memory. --conf spark.executor.memoryOverhead=2g

 This is off-heap memory that is used for Virtual Machine overheads, interned 
 strings etc.

5.) Apart from this , you can limit the number of files processed in a particular microbatch by setting the maxFilesPerTrigger to a smaller value say 10.

Solution 5 - Apache Spark

I solved this error increasing the allocated memory in executorMemory and driverMemory. You can do this in HUE selecting the Spark Program which is causing the problem and in properties -> Option list you can add something like this:

--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2

Of course the values of the parameters will vary depending on you cluster's size and your needs.

Solution 6 - Apache Spark

For me, I was doing some windowing on large data (about 50B rows) and getting a boat load of

> ExternalAppendOnlyUnsafeRowArray:54 - Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

In my logs. Obviously 4096 can be small on such data size... this led me to the following JIRA:

https://issues.apache.org/jira/browse/SPARK-21595

And ultimately to the following two config options:

  • spark.sql.windowExec.buffer.spill.threshold
  • spark.sql.windowExec.buffer.in.memory.threshold

Both default to 4096; I raised them much higher (2097152) and things now seem to do well. I'm not 100% sure this is the same as the issue raised here, but it's another thing to try.

Solution 7 - Apache Spark

in the Spark Web UI, if there is some info like Executors lost, then you have to check the yarn log, make sure whether your container has been killed.

If the container was killed, it is probably due to the lack of memory.

How to find the key info in yarn logs? For example, there might be some warnings like this:

Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.

In this case, it suggests you should increase spark.yarn.executor.memoryOverhead.

Solution 8 - Apache Spark

In my case (standalone cluster) the exception was thrown because the file system of some Spark slaves was filled 100%. Deleting everything in the spark/work folders of the slaves solved the issue.

Solution 9 - Apache Spark

I got the same problem, but I searched many answers which can not solve my problem. eventually, I debug my code step by step. I find the problem that caused by the data size is not balanced for each partition , leaded to MetadataFetchFailedException that in map stage not reduce stage . just do df_rdd.repartition(nums) before reduceByKey()

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestiondotanView Question on Stackoverflow
Solution 1 - Apache SparkJoren Van SeverenView Answer on Stackoverflow
Solution 2 - Apache SparkNotinlistView Answer on Stackoverflow
Solution 3 - Apache SparkxplorerdevView Answer on Stackoverflow
Solution 4 - Apache Sparkkushagra deepView Answer on Stackoverflow
Solution 5 - Apache SparkIgnacio AlorreView Answer on Stackoverflow
Solution 6 - Apache SparkMichaelChiricoView Answer on Stackoverflow
Solution 7 - Apache SparkDennisLiView Answer on Stackoverflow
Solution 8 - Apache Sparki000174View Answer on Stackoverflow
Solution 9 - Apache SparkDoGView Answer on Stackoverflow