<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduce_Compatibility_Hadoop1_Hadoop2.html">Compatibilty between Hadoop 1.x and Hadoop 2.x</a>
<a href="http://maven.apache.org/" title="Built by Maven" class="poweredBy">
<img alt="Built by Maven" src="./images/logos/maven-feather.png"/>
</a>
</div>
</div>
<div id="bodyColumn">
<div id="contentBox">
<!-- Licensed under the Apache License, Version 2.0 (the "License"); --><!-- you may not use this file except in compliance with the License. --><!-- You may obtain a copy of the License at --><!-- --><!-- http://www.apache.org/licenses/LICENSE-2.0 --><!-- --><!-- Unless required by applicable law or agreed to in writing, software --><!-- distributed under the License is distributed on an "AS IS" BASIS, --><!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --><!-- See the License for the specific language governing permissions and --><!-- limitations under the License. See accompanying LICENSE file. --><div class="section">
<h2>Hadoop MapReduce Next Generation - Writing YARN Applications<a name="Hadoop_MapReduce_Next_Generation_-_Writing_YARN_Applications"></a></h2>
<p>[ <a href="./index.html">Go Back</a> ]</p>
<ul>
<li><a href="#Hadoop_MapReduce_Next_Generation_-_Writing_YARN_Applications">Hadoop MapReduce Next Generation - Writing YARN Applications</a>
<ul>
<li><a href="#Purpose">Purpose </a></li>
<li><a href="#Concepts_and_Flow">Concepts and Flow</a></li>
<li><a href="#Interfaces">Interfaces </a></li>
<li><a href="#Writing_a_Simple_Yarn_Application">Writing a Simple Yarn Application</a>
<ul>
<li><a href="#Writing_a_simple_Client">Writing a simple Client</a></li>
<li><a href="#Writing_an_ApplicationMaster">Writing an ApplicationMaster</a></li></ul></li>
<li><a href="#FAQ">FAQ </a>
<ul>
<li><a href="#How_can_I_distribute_my_applications_jars_to_all_of_the_nodes_in_the_YARN_cluster_that_need_it">How can I distribute my application's jars to all of the nodes in the YARN cluster that need it?</a></li>
<li><a href="#How_do_I_get_the_ApplicationMasters_ApplicationAttemptId">How do I get the ApplicationMaster's ApplicationAttemptId? </a></li>
<li><a href="#My_container_is_being_killed_by_the_Node_Manager">My container is being killed by the Node Manager</a></li>
<li><a href="#How_do_I_include_native_libraries">How do I include native libraries?</a></li></ul></li>
<p>This document describes, at a high-level, the way to implement new Applications for YARN.</p></div>
<div class="section">
<h3>Concepts and Flow<a name="Concepts_and_Flow"></a></h3>
<p>The general concept is that an 'Application Submission Client' submits an 'Application' to the YARN Resource Manager. The client communicates with the ResourceManager using the 'ApplicationClientProtocol' to first acquire a new 'ApplicationId' if needed via ApplicationClientProtocol#getNewApplication and then submit the 'Application' to be run via ApplicationClientProtocol#submitApplication. As part of the ApplicationClientProtocol#submitApplication call, the client needs to provide sufficient information to the ResourceManager to 'launch' the application's first container i.e. the ApplicationMaster. You need to provide information such as the details about the local files/jars that need to be available for your application to run, the actual command that needs to be executed (with the necessary command line arguments), any Unix environment settings (optional), etc. Effectively, you need to describe the Unix process(es) that needs to be launched for your ApplicationMaster. </p>
<p>The YARN ResourceManager will then launch the ApplicationMaster (as specified) on an allocated container. The ApplicationMaster is then expected to communicate with the ResourceManager using the 'ApplicationMasterProtocol'. Firstly, the ApplicationMaster needs to register itself with the ResourceManager. To complete the task assigned to it, the ApplicationMaster can then request for and receive containers via ApplicationMasterProtocol#allocate. After a container is allocated to it, the ApplicationMaster communicates with the NodeManager using ContainerManager#startContainer to launch the container for its task. As part of launching this container, the ApplicationMaster has to specify the ContainerLaunchContext which, similar to the ApplicationSubmissionContext, has the launch information such as command line specification, environment, etc. Once the task is completed, the ApplicationMaster has to signal the ResourceManager of its completion via the ApplicationMasterProtocol#finishApplicationMaster. </p>
<p>Meanwhile, the client can monitor the application's status by querying the ResourceManager or by directly querying the ApplicationMaster if it supports such a service. If needed, it can also kill the application via ApplicationClientProtocol#forceKillApplication. </p></div>
<div class="section">
<h3>Interfaces <a name="Interfaces"></a></h3>
<p>The interfaces you'd most like be concerned with are:</p>
<ul>
<li>ApplicationClientProtocol - Client<-->ResourceManager<br />The protocol for a client that wishes to communicate with the ResourceManager to launch a new application (i.e. the ApplicationMaster), check on the status of the application or kill the application. For example, a job-client (a job launching program from the gateway) would use this protocol. </li>
<li>ApplicationMasterProtocol - ApplicationMaster<-->ResourceManager<br />The protocol used by the ApplicationMaster to register/unregister itself to/from the ResourceManager as well as to request for resources from the Scheduler to complete its tasks. </li>
<li>ContainerManager - ApplicationMaster<-->NodeManager<br />The protocol used by the ApplicationMaster to talk to the NodeManager to start/stop containers and get status updates on the containers if needed. </li></ul></div>
<div class="section">
<h3>Writing a Simple Yarn Application<a name="Writing_a_Simple_Yarn_Application"></a></h3>
<div class="section">
<h4>Writing a simple Client<a name="Writing_a_simple_Client"></a></h4>
<ul>
<li>The first step that a client needs to do is to connect to the ResourceManager or to be more specific, the ApplicationsManager (AsM) interface of the ResourceManager.
LOG.info("Got new ApplicationId=" + response.getApplicationId());</pre></div></li>
<li>The response from the ASM for a new application also contains information about the cluster such as the minimum/maximum resource capabilities of the cluster. This is required so that to ensure that you can correctly set the specifications of the container in which the ApplicationMaster would be launched. Please refer to GetNewApplicationResponse for more details. </li>
<li>The main crux of a client is to setup the ApplicationSubmissionContext which defines all the information needed by the ResourceManager to launch the ApplicationMaster. A client needs to set the following into the context:
<ul>
<li>Application Info: id, name</li>
<li>Queue, Priority info: Queue to which the application will be submitted, the priority to be assigned for the application. </li>
<li>User: The user submitting the application </li>
<li>ContainerLaunchContext: The information defining the container in which the ApplicationMaster will be launched and run. The ContainerLaunchContext, as mentioned previously, defines all the required information needed to run the ApplicationMaster such as the local resources (binaries, jars, files etc.), security tokens, environment settings (CLASSPATH etc.) and the command to be executed. </li></ul>
<div class="source">
<pre> // Create a new ApplicationSubmissionContext
<li>At this point, the ResourceManager will have accepted the application and in the background, will go through the process of allocating a container with the required specifications and then eventually setting up and launching the ApplicationMaster on the allocated container. </li>
<li>There are multiple ways a client can track progress of the actual task.
<ul>
<li>It can communicate with the ResourceManager and request for a report of the application via ApplicationClientProtocol#getApplicationReport.
<p>The ApplicationReport received from the ResourceManager consists of the following: </p>
<ul>
<li>General application information: ApplicationId, queue to which the application was submitted, user who submitted the application and the start time for the application. </li>
<li>ApplicationMaster details: the host on which the ApplicationMaster is running, the rpc port (if any) on which it is listening for requests from clients and a token that the client needs to communicate with the ApplicationMaster. </li>
<li>Application tracking information: If the application supports some form of progress tracking, it can set a tracking url which is available via ApplicationReport#getTrackingUrl that a client can look at to monitor progress. </li>
<li>ApplicationStatus: The state of the application as seen by the ResourceManager is available via ApplicationReport#getYarnApplicationState. If the YarnApplicationState is set to FINISHED, the client should refer to ApplicationReport#getFinalApplicationStatus to check for the actual success/failure of the application task itself. In case of failures, ApplicationReport#getDiagnostics may be useful to shed some more light on the the failure. </li></ul></li>
<li>If the ApplicationMaster supports it, a client can directly query the ApplicationMaster itself for progress updates via the host:rpcport information obtained from the ApplicationReport. It can also use the tracking url obtained from the report if available.</li></ul></li>
<li>In certain situations, if the application is taking too long or due to other factors, the client may wish to kill the application. The ApplicationClientProtocol supports the forceKillApplication call that allows a client to send a kill signal to the ApplicationMaster via the ResourceManager. An ApplicationMaster if so designed may also support an abort call via its rpc layer that a client may be able to leverage.
<h4>Writing an ApplicationMaster<a name="Writing_an_ApplicationMaster"></a></h4>
<ul>
<li>The ApplicationMaster is the actual owner of the job. It will be launched by the ResourceManager and via the client will be provided all the necessary information and resources about the job that it has been tasked with to oversee and complete. </li>
<li>As the ApplicationMaster is launched within a container that may (likely will) be sharing a physical host with other containers, given the multi-tenancy nature, amongst other issues, it cannot make any assumptions of things like pre-configured ports that it can listen on. </li>
<li>When the ApplicationMaster starts up, several parameters are made available to it via the environment. These include the ContainerId for the ApplicationMaster container, the application submission time and details about the NodeManager host running the Application Master. Ref ApplicationConstants for parameter names.</li>
<li>All interactions with the ResourceManager require an ApplicationAttemptId (there can be multiple attempts per application in case of failures). The ApplicationAttemptId can be obtained from the ApplicationMaster containerId. There are helper apis to convert the value obtained from the environment into objects.
<li>After an ApplicationMaster has initialized itself completely, it needs to register with the ResourceManager via ApplicationMasterProtocol#registerApplicationMaster. The ApplicationMaster always communicate via the Scheduler interface of the ResourceManager.
<div class="source">
<pre> // Connect to the Scheduler of the ResourceManager.
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress =
NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
<li>The ApplicationMaster has to emit heartbeats to the ResourceManager to keep it informed that the ApplicationMaster is alive and still running. The timeout expiry interval at the ResourceManager is defined by a config setting accessible via YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS with the default being defined by YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS. The ApplicationMasterProtocol#allocate calls to the ResourceManager count as heartbeats as it also supports sending progress update information. Therefore, an allocate call with no containers requested and progress information updated if any is a valid way for making heartbeat calls to the ResourceManager. </li>
<li>Based on the task requirements, the ApplicationMaster can ask for a set of containers to run its tasks on. The ApplicationMaster has to use the ResourceRequest class to define the following container specifications:
<ul>
<li>Hostname: If containers are required to be hosted on a particular rack or a specific host. '*' is a special value that implies any host will do. </li>
<li>Resource capability: Currently, YARN only supports memory based resource requirements so the request should define how much memory is needed. The value is defined in MB and has to less than the max capability of the cluster and an exact multiple of the min capability. Memory resources correspond to physical memory limits imposed on the task containers.</li>
<li>Priority: When asking for sets of containers, an ApplicationMaster may define different priorities to each set. For example, the Map-Reduce ApplicationMaster may assign a higher priority to containers needed for the Map tasks and a lower priority for the Reduce tasks' containers.</li></ul>
<li>After defining the container requirements, the ApplicationMaster has to construct an AllocateRequest to send to the ResourceManager. The AllocateRequest consists of:
<ul>
<li>Requested containers: The container specifications and the no. of containers being requested for by the ApplicationMaster from the ResourceManager. </li>
<li>Released containers: There may be situations when the ApplicationMaster may have requested for more containers that it needs or due to failure issues, decide to use other containers allocated to it. In all such situations, it is beneficial to the cluster if the ApplicationMaster releases these containers back to the ResourceManager so that they can be re-allocated to other applications. </li>
<li>ResponseId: The response id that will be sent back in the response from the allocate call. </li>
<li>Progress update information: The ApplicationMaster can send its progress update (range between to 0 to 1) to the ResourceManager. </li></ul>
<li>The AllocateResponse sent back from the ResourceManager provides the following information:
<ul>
<li>Reboot flag: For scenarios when the ApplicationMaster may get out of sync with the ResourceManager. </li>
<li>Allocated containers: The containers that have been allocated to the ApplicationMaster.</li>
<li>Headroom: Headroom for resources in the cluster. Based on this information and knowing its needs, an ApplicationMaster can make intelligent decisions such as re-prioritizing sub-tasks to take advantage of currently allocated containers, bailing out faster if resources are not becoming available etc. </li>
<li>Completed containers: Once an ApplicationMaster triggers a launch an allocated container, it will receive an update from the ResourceManager when the container completes. The ApplicationMaster can look into the status of the completed container and take appropriate actions such as re-trying a particular sub-task in case of a failure.</li>
<li>Number of cluster nodes: The number of hosts available on the cluster.</li></ul>
<p>One thing to note is that containers will not be immediately allocated to the ApplicationMaster. This does not imply that the ApplicationMaster should keep on asking the pending count of required containers. Once an allocate request has been sent, the ApplicationMaster will eventually be allocated the containers based on cluster capacity, priorities and the scheduling policy in place. The ApplicationMaster should only request for containers again if and only if its original estimate changed and it needs additional containers. </p>
<div class="source">
<pre>
// Retrieve list of allocated containers from the response
// and on each allocated container, lets assume we are launching
// application job on container returned a non-zero exit code
// counts as completed
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
}
else {
// something else bad happened
// app job did not complete for some reason
// we should re-try as the container was lost for some reason
// decrementing the requested count so that we ask for an
// additional one in the next allocate call.
numRequestedContainers.decrementAndGet();
// we do not need to release the container as that has already
// been done by the ResourceManager/NodeManager.
}
}
else {
// nothing to do
// container completed successfully
numCompletedContainers.incrementAndGet();
numSuccessfulContainers.incrementAndGet();
}
}
}</pre></div></li>
<li>After a container has been allocated to the ApplicationMaster, it needs to follow a similar process that the Client followed in setting up the ContainerLaunchContext for the eventual task that is going to be running on the allocated Container. Once the ContainerLaunchContext is defined, the ApplicationMaster can then communicate with the ContainerManager to start its allocated container.
<div class="source">
<pre>
//Assuming an allocated Container obtained from AllocateResponse
Container container;
// Connect to ContainerManager on the allocated container
<li>The ApplicationMaster, as mentioned previously, will get updates of completed containers as part of the response from the ApplicationMasterProtocol#allocate calls. It can also monitor its launched containers pro-actively by querying the ContainerManager for the status.
+ ", status=" + statusResp.getStatus());</pre></div></li></ul><!-- ** Defining the context in which your code runs --><!-- *** Container Resource Requests --><!-- *** Local Resources --><!-- *** Environment --><!-- **** Managing the CLASSPATH --><!-- ** Security --></div></div>
<div class="section">
<h3>FAQ <a name="FAQ"></a></h3>
<div class="section">
<h4>How can I distribute my application's jars to all of the nodes in the YARN cluster that need it?<a name="How_can_I_distribute_my_applications_jars_to_all_of_the_nodes_in_the_YARN_cluster_that_need_it"></a></h4>
<p>You can use the LocalResource to add resources to your application request. This will cause YARN to distribute the resource to the ApplicationMaster node. If the resource is a tgz, zip, or jar - you can have YARN unzip it. Then, all you need to do is add the unzipped folder to your classpath. For example, when creating your application request:</p>
<p>As you can see, the setLocalResources command takes a map of names to resources. The name becomes a sym link in your application's cwd, so you can just refer to the artifacts inside by using ./package/*. </p>
<p>Note: Java's classpath (cp) argument is VERY sensitive. Make sure you get the syntax EXACTLY correct.</p>
<p>Once your package is distributed to your ApplicationMaster, you'll need to follow the same process whenever your ApplicationMaster starts a new container (assuming you want the resources to be sent to your container). The code for this is the same. You just need to make sure that you give your ApplicationMaster the package path (either HDFS, or local), so that it can send the resource URL along with the container ctx.</p></div>
<div class="section">
<h4>How do I get the ApplicationMaster's ApplicationAttemptId? <a name="How_do_I_get_the_ApplicationMasters_ApplicationAttemptId"></a></h4>
<p>The ApplicationAttemptId will be passed to the ApplicationMaster via the environment and the value from the environment can be converted into an ApplicationAttemptId object via the ConverterUtils helper function.</p></div>
<div class="section">
<h4>My container is being killed by the Node Manager<a name="My_container_is_being_killed_by_the_Node_Manager"></a></h4>
<p>This is likely due to high memory usage exceeding your requested container memory size. There are a number of reasons that can cause this. First, look at the process tree that the node manager dumps when it kills your container. The two things you're interested in are physical memory and virtual memory. If you have exceeded physical memory limits your app is using too much physical memory. If you're running a Java app, you can use -hprof to look at what is taking up space in the heap. If you have exceeded virtual memory, you may need to increase the value of the the cluster-wide configuration variable <tt>yarn.nodemanager.vmem-pmem-ratio</tt>.</p></div>
<div class="section">
<h4>How do I include native libraries?<a name="How_do_I_include_native_libraries"></a></h4>
<p>Setting -Djava.library.path on the command line while launching a container can cause native libraries used by Hadoop to not be loaded correctly and can result in errors. It is cleaner to use LD_LIBRARY_PATH instead.</p></div></div>
<div class="section">
<h3>Useful Links<a name="Useful_Links"></a></h3>
<ul>
<li><a class="externalLink" href="https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf">Map Reduce Next Generation Architecture</a></li>
<li><a class="externalLink" href="http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/">Map Reduce Next Generation Scheduler</a></li></ul></div></div>