Tag : hadoop-cluster-performance-tuning

Hadoop Cluster Maintenance

As a Hadoop Admin it’s our responsibility to perform Hadoop Cluster Maintenance frequently. Let’s see what we can do to keep our big elephant happy! 😉

 

happy-elephant

 

1. FileSystem Checks

We should check health of HDFS periodically by running fsck command

sudo -u hdfs hadoop fsck /

 

This command contacts the Namenode and checks each file recursively which comes under the provided path. Below is the sample output of fsck command

sudo -u hdfs hadoop fsck /
FSCK started by hdfs (auth:SIMPLE) from /10.0.2.15 for path / at Wed Apr 06 18:47:37 UTC 2016
Total size: 1842803118 B
Total dirs: 4612
Total files: 11123
Total symlinks: 0 (Files currently being written: 4)
Total blocks (validated): 11109 (avg. block size 165883 B) (Total open file blocks (not validated): 1)
Minimally replicated blocks: 11109 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 11109 (100.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 3
Average block replication: 1.0
Corrupt blocks: 0
Missing replicas: 22232 (66.680664 %)
Number of data-nodes: 1
Number of racks: 1
FSCK ended at Wed Apr 06 18:46:54 UTC 2016 in 1126 milliseconds

The filesystem under path '/' is HEALTHY

We can schedule a weekly cron job on edge node which will run fsck and send the output via email to Hadoop Admin.

 

2. HDFS Balancer utility

Over the period of time data becomes un-balanced across all the Datanodes in the cluster, this could be because of maintenance activity on specific Datanode, power failure, hardware failures, kernel panic, unexpected reboots etc. In this case because of data locality, Datanodes which are having more data will get churned and ultimately un-balanced cluster can directly affect your MapReduce job performance.

You can use below command to run hdfs balancer

sudo -u hdfs hdfs balancer -threshold <threshold-value>

By default threshold value is 10, we can reduce it upto 1 ( It’s better to run balancer with lowest threshold )

Sample output:

[root@sandbox ~]# sudo -u hdfs hdfs balancer -threshold 1
16/04/06 18:57:16 INFO balancer.Balancer: Using a threshold of 1.0
16/04/06 18:57:16 INFO balancer.Balancer: namenodes = [hdfs://sandbox.hortonworks.com:8020]
16/04/06 18:57:16 INFO balancer.Balancer: parameters = Balancer.Parameters [BalancingPolicy.Node, threshold = 1.0, max idle iteration = 5, #excluded nodes = 0, #included nodes = 0, #source nodes = 0, run during upgrade = false]
16/04/06 18:57:16 INFO balancer.Balancer: included nodes = []
16/04/06 18:57:16 INFO balancer.Balancer: excluded nodes = []
16/04/06 18:57:16 INFO balancer.Balancer: source nodes = []
Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved
16/04/06 18:57:17 INFO balancer.KeyManager: Block token params received from NN: update interval=10hrs, 0sec, token lifetime=10hrs, 0sec
16/04/06 18:57:17 INFO block.BlockTokenSecretManager: Setting block keys
16/04/06 18:57:17 INFO balancer.KeyManager: Update block keys every 2hrs, 30mins, 0sec
16/04/06 18:57:17 INFO balancer.Balancer: dfs.balancer.movedWinWidth = 5400000 (default=5400000)
16/04/06 18:57:17 INFO balancer.Balancer: dfs.balancer.moverThreads = 1000 (default=1000)
16/04/06 18:57:17 INFO balancer.Balancer: dfs.balancer.dispatcherThreads = 200 (default=200)
16/04/06 18:57:17 INFO balancer.Balancer: dfs.datanode.balance.max.concurrent.moves = 5 (default=5)
16/04/06 18:57:17 INFO balancer.Balancer: dfs.balancer.getBlocks.size = 2147483648 (default=2147483648)
16/04/06 18:57:17 INFO balancer.Balancer: dfs.balancer.getBlocks.min-block-size = 10485760 (default=10485760)
16/04/06 18:57:17 INFO block.BlockTokenSecretManager: Setting block keys
16/04/06 18:57:17 INFO balancer.Balancer: dfs.balancer.max-size-to-move = 10737418240 (default=10737418240)
16/04/06 18:57:17 INFO balancer.Balancer: dfs.blocksize = 134217728 (default=134217728)
16/04/06 18:57:17 INFO net.NetworkTopology: Adding a new node: /default-rack/10.0.2.15:50010
16/04/06 18:57:17 INFO balancer.Balancer: 0 over-utilized: []
16/04/06 18:57:17 INFO balancer.Balancer: 0 underutilized: []
The cluster is balanced. Exiting...
Apr 6, 2016 6:57:17 PM 0 0 B 0 B -1 B
Apr 6, 2016 6:57:17 PM Balancing took 1.383 seconds

We can schedule a weekly cron job on edge node which will run balancer and send the results via email to Hadoop Admin.

 

3. Adding new nodes to the cluster

We should always maintain the list of Datanodes which are authorized to communicate with Namenode, it can be achieved by setting dfs.hosts property in hdfs-site.xml

<property>
  <name>dfs.hosts</name>  
  <value>/etc/hadoop/conf/allowed-datanodes.txt</value>
</property>

If we don’t set this property then any machine which has Datanode installed and hdfs-site.xml property file can easily contact Namenode and become part of Hadoop cluster.

 

3.1 For Nodemanagers

We can add below property in yarn-site.xml

<property>
  <name>yarn.resourcemanager.nodes.include-path</name>  
  <value>/etc/hadoop/conf/allowed-nodemanagers.txt</value>
</property>

 

4. Decommissioning a node from the cluster

It’s a bad idea to stop single or multiple Datanode daemons or shutdown them gracefully though HDFS is fault tolerant. Better solution is to add ip address of Datanode machine that we need to remove from cluster to exclude file which is maintained by dfs.hosts.exclude property and run below command

sudo -u hdfs hdfs dfsadmin -refreshNodes

After this, Namenode will start replicating all the blocks to other existing Datanodes in the cluster, once decommission process is complete then it’s safe to shutdown Datanode daemon. You can track progress of decommission process on NN Web UI.

 

4.1 For YARN:

Add ip address of node manager machine to the file maintained by yarn.resourcemanager.nodes.exclude-path property and run below command

sudo -u yarn yarn rmadmin -refreshNodes

 

5. Datanode Volume Failures

Namenode WebUI shows information about Datanode volume failures, we should check this information periodically or set some kind of automated monitoring system using Nagios or Ambari Metrics if you are using Hortonworks Hadoop Distribution or JMX monitoring (http://<namenode-host>:50070/jmx) etc. Multiple disk failures on single Datanode could cause shutdown of Datanode daemon. ( Please check dfs.datanode.failed.volumes.tolerated property and set it accordingly in hdfs-site.xml )

 

6. Database Backups

If we you have multiple Hadoop ecosystem components installed then you should schedule a backup script to take database dumps.

for e.g.

1. hive metastore database

2. oozie-db

3. ambari db

4. ranger db

Create a simple shell script to have backup commands and schedule it on a weekend, add a logic to send an email once backups are done.

 

7. HDFS Metadata backup

fsimage has metadata about your Hadoop file system and if for some reason it gets corrupted then your cluster is un-usable, it’s  very important to keep periodic backups of filesystem fsimage.

You can schedule a shell script which will have below command to take backup of fsimage

hdfs dfsadmin -fetchImage fsimage.backup.ddmmyyyy

 

 

8. Purging older log files

In production clusters, if we don’t clean older Hadoop log files then it can eat your entire disk and daemons could crash because of “no space left on device” error. Always get older log files cleaned via cleanup script!

 

Please comment if you have any feedback/questions/suggestions. Happy Hadooping!! :)

 

facebooktwittergoogle_plusredditpinterestlinkedinmailby feather

Tune Hadoop Cluster to get Maximum Performance (Part 2)

In previous part we have seen that how can we tune our operating system to get maximum performance for Hadoop, in this article I will be focusing on how to tune hadoop cluster to get performance boost on hadoop level :-)

 

tune

Tune Hadoop Cluster to get Maximum Performance (Part 2) – http://crazyadmins.com

 

Before I actually start explaining tuning parameters let me cover some basic terms that are required to understand Hadoop level tuning.

 

What is YARN?

YARN – Yet another resource negotiator, this is Map-reduce version 2 with many new features such as dynamic memory assignment for mappers and reducers rather than having fixed slots etc.

 

What is Container?

Container represents allocated Resources like CPU, RAM etc. It’s a JVM process, in YARN AppMaster, Mapper and Reducer runs inside the Container.

 

Let’s get into the game now:

 

1. Resource Manager (RM) is responsible for allocating resources to mapreduce jobs.

2. For brand new Hadoop cluster (without any tuning) resource manager will get 8192MB (“yarn.nodemanager.resource.memory-mb”) memory per node only.

3. RM can allocate up to 8192 MB (“yarn.scheduler.maximum-allocation-mb”) to the Application Master container.

4. Default minimum allocation is 1024 MB (“yarn.scheduler.minimum-allocation-mb”).

5. The AM can only negotiate resources from Resource Manager that are in increments of (“yarn.scheduler.minimum-allocation-mb”) & it cannot exceed (“yarn.scheduler.maximum-allocation-mb”).

6. Application Master Rounds off (“mapreduce.map.memory.mb“) & (“mapreduce.reduce.memory.mb“) to a value devisable by (“yarn.scheduler.minimum-allocation-mb“).

 

What are these properties ? What can we tune ?

 

yarn.scheduler.minimum-allocation-mb

Default value is 1024m

Sets the minimum size of container that YARN will allow for running mapreduce jobs.

 

 

yarn.scheduler.maximum-allocation-mb

Default value is 8192m

The largest size of container that YARN will allow us to run the Mapreduce jobs.

 

 

yarn.nodemanager.resource.memory-mb

Default value is 8GB
Total amount of physical memory (RAM) for Containers on worker node.

Set this property= Total RAM – (RAM for OS + Hadoop Daemons + Other services)

 

 

yarn.nodemanager.vmem-pmem-ratio

Default value is 2.1

The amount of virtual memory that each Container is allowed

This can be calculated with: containerMemoryRequest*vmem-pmem-ratio

 

 

mapreduce.map.memory.mb 
mapreduce.reduce.memory.mb

These are the hard limits enforced by Hadoop on each mapper or reducer task. (Maximum memory that can be assigned to mapper or reducer’s container)

Default value – 1GB

 

 

mapreduce.map.java.opts
mapreduce.reduce.java.opts

The heapsize of the jvm –Xmx for the mapper or reducer task.

This value should always be lower than mapreduce.[map|reduce].memory.mb.

Recommended value is 80% of mapreduce.map.memory.mb/ mapreduce.reduce.memory.mb

 

 

yarn.app.mapreduce.am.resource.mb

The amount of memory for ApplicationMaster

 

 

yarn.app.mapreduce.am.command-opts

heapsize for application Master

 

 

yarn.nodemanager.resource.cpu-vcores

The number of cores that a node manager can allocate to containers is controlled by the yarn.nodemanager.resource.cpu-vcores property. It should be set to the total number of cores on the machine, minus a core for each daemon process running on the machine (datanode, node manager, and any other long-running processes).

 

 

mapreduce.task.io.sort.mb

Default value – 100MB

This is very important property to tune, when map task is in progress it writes output into a circular in-memory buffer. The size of this buffer is fixed and determined by io.sort.mb property

When this circular in-memory buffer gets filled (mapreduce.map. sort.spill.percent: 80% by default), the SPILLING to disk will start (in parallel using a separate thread). Notice that if the splilling thread is too slow and the buffer is 100% full, then the map cannot be executed and thus it has to wait.

 

 

io.file.buffer.size

Hadoop uses buffer size of 4KB by default for its I/O operations, we can increase it to 128K in order to get good performance and this value can be increased by setting io.file.buffer.size= 131072 (value in bytes) in core-site.xml

 

 

dfs.client.read.shortcircuit

Short-circuit reads – When reading a file from HDFS, the client contacts the datanode and the data is sent to the client via a TCP connection. If the block being read is on the same node as the client, then it is more efficient for the client to bypass the network and read the block data directly from the disk.

We can enable short-circuit reads by setting this property to “true”

 

 

mapreduce.task.io.sort.factor

Default value is 10.

Now imagine the situation where map task is running, each time the memory buffer reaches the spill threshold, a new spill file is created, after the map task has written its last output record, there could be several spill files. Before the task is finished, the spill files are merged into a single partitioned and sorted output file.

The configuration property mapreduce.task.io.sort.factor controls the maximum number of streams to merge at once.

 

 

mapreduce.reduce.shuffle.parallelcopies

Default value is 5

The map output file is sitting on the local disk of the machine that ran the map task

The map tasks may finish at different times, so the reduce task starts copying their outputs as soon as each completes

The reduce task has a small number of copier threads so that it can fetch map outputs in parallel.

The default is five threads, but this number can be changed by setting the mapreduce.reduce.shuffle.parallelcopies property

 

 

 

I tried my best to cover as much as I can, there are plenty of things you can do for tuning! I hope this article was helpful to you. What I recommend you guys is try tuning above properties by considering total available memory capacity, total number of cores etc. and run the Teragen, Terasort etc. benchmarking tool to get the results, try tuning until you get best out of it!! :-)

 

facebooktwittergoogle_plusredditpinterestlinkedinmailby feather