YARN Resource Management
Also available as:
PDF
loading table of contents...

Deployment Considerations

Memory Considerations for Running One Component on a Node

You can adjust the amount of memory given to a component to achieve mutual exclusion of components, depending upon the NodeManager configuration on each node. Typically all nodes have the same value independent of the actual memory.

Assuming the memory capacity for each NodeManager is known (yarn.nodemanager.resource.memory-mb), you can configure the component to ask for 51% (basically more than half) of the maximum capacity. You also need to ensure that the maximum possible memory allocation (yarn.scheduler.maximum-allocation-mb) allows that value.

For example, if yarn.nodemanager.resource.memory-mb = yarn.scheduler.maximum-allocation-mb = 2048 Set yarn.memory = 1280 for the nimbus.childopts property in the appConfig.json file.

For SUPERVISOR allocate 1/4th of 1280 to supervisor.childopts and depending on how many workers you plan to run, divide the rest of the available container memory by the number of workers and adjust worker.childopts accordingly.

Log Aggregation

This feature is backed by https://issues.apache.org/jira/browse/YARN-2468.

Log aggregation is specified in the global section of resources.json:

"global": { "yarn.log.include.patterns": "", "yarn.log.exclude.patterns": "", "yarn.log.interval": "0"
 },

The yarn.log.interval unit is seconds.

You can specify the name(s) of log files (for example, agent.log) that you do not want to aggregate using yarn.log.exclude.patterns.

The aggregated logs are stored in the HDFS /app-logs/ directory.

The following command can be used to retrieve the logs:

yarn logs -applicationId <app_id>

For Storm you should exclude all active logs. Any file under storm.log.dir/*.log and the storm.log.dir/metadata directory should be excluded. You should be collecting the rolled-over logs. So any file with *.log.%i is ready to be collected for log aggregation.

The following is an example of a Storm log4j.properties file:

<configuration scan="true" scanPeriod="60 seconds">
 <appender name="A1" class="ch.qos.logback.core.rolling.RollingFileAppender">
 <file>${storm.log.dir}/${logfile.name}</file>
 <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
 <fileNamePattern>${storm.log.dir}/${logfile.name}.%i</fileNamePattern>
 <minIndex>1</minIndex>
 <maxIndex>9</maxIndex>
 </rollingPolicy>

 <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
 <maxFileSize>100MB</maxFileSize>
 </triggeringPolicy>

 <encoder>
 <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
 </encoder>
 </appender>

 <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
 <file>${storm.log.dir}/access.log</file>
 <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
 <fileNamePattern>${storm.log.dir}/access.log.%i</fileNamePattern>
 <minIndex>1</minIndex>
 <maxIndex>9</maxIndex>
 </rollingPolicy>

 <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
 <maxFileSize>100MB</maxFileSize>
 </triggeringPolicy>

 <encoder>
 <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
 </encoder>
 </appender>

 <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender">
 <file>${storm.log.dir}/metrics.log</file>
 <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
 <fileNamePattern>${storm.log.dir}/logs/metrics.log.%i</fileNamePattern>
 <minIndex>1</minIndex>
 <maxIndex>9</maxIndex>
 </rollingPolicy>

 <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
 <maxFileSize>2MB</maxFileSize>
 </triggeringPolicy>

 <encoder>
 <pattern>%d %-8r %m%n</pattern>
 </encoder>
 </appender>

 <root level="INFO">
 <appender-ref ref="A1"/>
 </root>

 <logger name="backtype.storm.security.auth.authorizer" additivity="false">
 <level value="INFO" />
 <appender-ref ref="ACCESS" />
 </logger>

 <logger name="backtype.storm.metric.LoggingMetricsConsumer" additivity="false" >
 <level value="INFO"/>
 <appender-ref ref="METRICS"/>
 </logger>

</configuration>

Reserving Nodes for Storm

You can use YARN node labels to reserve cluster nodes for applications and their components. You could use node labels to reserve cluster nodes for Storm to ensure that NIMBUS and SUPERVISOR provide a consistent performance level.

Node labels are specified with the yarn.label.expression property. If no label is specified, only non-labeled nodes are used when allocating containers for component instances.

A brief summary is that you could label particular YARN nodes for Storm, say with labels “storm1_nimbus” for nimbus and “storm1_supervisor” for supervisors, and create a separate queue for assigning containers to these nodes. “STORM_UI_SERVER” can run in the same label as storm1_nimbus. “storm1_drpc” label should be for DRPC. To use these labeled nodes, you would add yarn.label.expression parameters to the Storm components in your resources.json file (including the slider-appmaster), e.g. "yarn.label.expression": "storm1_supervisor". When you run the “slider create” command for your Storm cluster, you would add the parameter “--queue <queue name>”.

Running Example Topologies on Storm-on-YARN

Installing the Storm-Slider Client

  • Make sure you are using HDP-2.2

  • yum search storm

  • storm_<latest_version>-slider-client

  • Pick the latest package

  • yum install storm_<latest_version>-slider-client

  • edit /etc/storm-slider-client/conf/storm-slider-env.sh

  • export SLIDER_HOME=/usr/hdp/current/slider

  • export JAVA_HOME=/usr/jdk64/jdk1.7

Using Quicklinks to Access the Storm UI

Use the following command format:

storm-slider --app <yarn-app-name> quicklinks

For example:

/usr/hdp/<hdp_version>/storm-slider-client/bin/storm-slider --app cl2 quicklinks

Example output:

{ "org.apache.slider.jmx" : "http://ec2-54-172-39-242.compute-1.amazonaws.com:34579/api/v1/cluster/summary", "org.apache.slider.metrics" : "http://ec2-54-172-207-11.compute-1.amazonaws.com/cgi-bin/rrd.py?c=Application2", "nimbus.host_port" : "http://ec2-54-172-39-242.compute-1.amazonaws.com:38818", "org.apache.slider.monitor" : "http://ec2-54-172-39-242.compute-1.amazonaws.com:34579", "org.apache.slider.metrics.ui" : "http://ec2-54-172-207-11.compute-1.amazonaws.com/ganglia?c=Application2"
}

org.apache.slider.metrics.monitor points to the Storm UI.

Deploy the Storm Word Count Topology

Use the following command to deploy the Storm wordcount topology:

/usr/hdp/current/storm-slider-client/bin/storm-slider --app <cluster_name> jar /usr/hdp/current/storm-slider-client/contrib/storm-starter/storm-starter-topologies-*.jar storm.starter.WordCountTopology wordcount

Replace <cluster_name> with the cluster name deployed using Slider.

Tests

  • Deploy wordcount topology and go to the Storm UI. Check to see if wordcount topology is listed under topologies. Click on wordcount topology and check to see if emitted and transferred numbers are increasing.

  • Start multiple Storm clusters, then deploy wordcount topologies on each cluster and check to see if they are running.

  • Check to see if the labeling of torm components is working by listing out the NIMBUS process on the storm1_nimbus labeled node.