Friday, June 23, 2017

Job not Failing/Killed

                                             

                                  Job not Failing/Killed


Recently we have seen an issue in one of the job which had major data skew due to which only 2 reducer were doing all the work. This usually would cause job completion delay but in this case weird observation was job would spin up reducer on different nodes do some work and get killed so we wanted to understand why Job didn't get killed even thought the attempts were getting killed on different nodes in a loop with below message . 

2017-06-12 06:48:27,282 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Killing taskAttempt:attempt_1496463308975_228648_r_000481_1000 because it is running on unusable node:A:8091 
2017-06-12 06:50:27,594 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Killing taskAttempt:attempt_1496463308975_228648_r_000481_1001 because it is running on unusable node:B:8091 
2017-06-12 07:56:21,369 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Killing taskAttempt:attempt_1496463308975_228648_r_000481_1002 because it is running on unusable node:C:8091 
2017-06-12 08:15:15,669 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Killing taskAttempt:attempt_1496463308975_228648_r_000481_1003 because it is running on unusable node:D:8091 
2017-06-12 08:21:20,158 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Killing taskAttempt:attempt_1496463308975_228648_r_000481_1005 because it is running on unusable node:E:8091 
2017-06-12 09:11:41,232 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Killing taskAttempt:attempt_1496463308975_228648_r_000481_1004 because it is running on unusable node:F:8091 
2017-06-12 09:36:21,470 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Killing taskAttempt:attempt_1496463308975_228648_r_000481_1006 because it is running on unusable node:G:8091 


When i checked NM logs : It reported issue since the NM dir was full hence this container was killed assuming it ran on bad node. 

2017-06-12 07:01:15,487 WARN org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection: Directory /opt/mapr/tmp/hadoop-mapr/nm-local-dir error, used space above threshold of 90.0%, removing from list of valid directories 
2017-06-12 07:01:15,487 INFO org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService: Disk(s) failed: 1/1 local-dirs are bad: /opt/mapr/tmp/hadoop-mapr/nm-local-dir; 
2017-06-12 07:01:15,488 ERROR org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService: Most of the disks failed. 1/1 local-dirs are bad: /opt/mapr/tmp/hadoop-mapr/nm-local-dir; 

After a minute or 2 when all containers failed temporarily and cleanup of the directory the utilization went below 90% so the NM became healthy again.  

2017-06-12 07:02:41,737 INFO org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor: Deleting absolute path : /opt/mapr/tmp/hadoop-mapr/nm-local-dir/usercache/fstaladm/appcache/application_1496463308975_240690 
2017-06-12 07:03:15,487 INFO org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection: Directory /opt/mapr/tmp/hadoop-mapr/nm-local-dir passed disk check, adding to list of valid directories. 
2017-06-12 07:03:15,487 INFO org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService: Disk(s) turned good: 1/1 local-dirs are good: /opt/mapr/tmp/hadoop-mapr/nm-local-dir; 1/1 log-dirs are good: /opt/mapr/hadoop/hadoop-2.7.0/logs/userlogs 


So due to above issue job will continuously spin up attempts on different node one after the other when attempt fails without doing any useful work and worse Admin's will see's no issue with NM and it seems to be self healing so it might not be even noticed. 

Now to find why this failed attempts even after 4 attempts didn't cause the job to fail :  Task attempt iterator does not take into account task preemption, disk failure (NM error or lack of space) or RM restart/failover. 

  public boolean shouldCountTowardsMaxAttemptRetry() { 
    try { 
      this.readLock.lock(); 
      int exitStatus = getAMContainerExitStatus(); 
      return !(exitStatus == ContainerExitStatus.PREEMPTED 
          || exitStatus == ContainerExitStatus.ABORTED 
          || exitStatus == ContainerExitStatus.DISKS_FAILED 
          || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); 
    } finally { 
      this.readLock.unlock(); 
    } 
  } 

Yes the definitions "disk failure" and "NM local dir utilization more 90%" are not equal but both are leading to the result that node becomes unusable hence this failure's are not counted towards attempt values. 

Currently the iterator does not count three scenarios:
i) Task preempted by the scheduler
ii) Hardware failures, such as NM failing, lost NM and NM disk errors
iii) Task killed by RM because of RM restart or failover

// Do not count AM preemption, hardware failures or NM resync 
1329 // as attempt failure. 
1330 for (RMAppAttempt attempt : attempts.values()) { 
1331 if (attempt.shouldCountTowardsMaxAttemptRetry()) { 
1332 if (this.attemptFailuresValidityInterval <= 0 
1333 || (attempt.getFinishTime() > endTime 
1334 - this.attemptFailuresValidityInterval)) { 
1335 completedAttempts++; 
1336 } 
1337 } 
1338 } 

Currently as of Hadoop 2.7 (YARN ) there is no good way to detect this corner case and fail the job .  

Few possibility like adding a comparison between number of same task launch and the value of mapreduce.map/reduce.max.attempts and to kill application if the first value will be over the second but this will break a another features in YARN. For example, if we have lots of applications running and intensive preemption happens some applications might be killed only because their tasks were preempted a few times, and this is broken behavior. 

To fix the issue temporarily I killed the job and to fix permanently we broke the job into multiple small dataset jobs and also optimized the query ( Joins ) for all reducers to do work and avoid Data skew .