Analytics Resources: Insights, Best Practices,

Case Studies and More

close
Written by Pravat Kumar Sutar
on November 12, 2016

Following up on my earlier post on some of the configuration and optimization techniques for HIVE-TEZ , this document describes few points on how to optimize hive queries/hive performance tuning. If we don’t fine tune hive properly, then even for select queries on smaller table will take few minutes to emit results. Because of this reason hive is mainly limited to OLAP features only. When instant results expected then hive is not suitable but by following some of the underlined best practices, we can improve the query performance at least by 50%.

Avoid setting number of reducer manually

When Tez executes a query, it initially determines the number of reducers it needs and automatically adjusts as needed based on the number of bytes processed.

We can use parameter mapred.reduce.tasks. By default, it is set to -1, which lets Tez automatically determine the number of reducers (Recommended)

It is better let Tez determine this and make the proper changes within its framework, instead of using the brute force method.

 

set mapred.reduce.tasks = 38; (Not Recommended)
set mapred.reduce.tasks = -1; (Recommended)
 

We can generate statistics using "ANALYZE TABLE .. COMPUTE STATISTICS" statement in order to observe how many Mappers and Reducers are being created by TEZ. This statistic will help to identify if the mapper completes quickly or if any execution stuck at the reducer end.

We can turn CBO and Vectorization ON and execute the EXPLAIN plan to observe the amount of data funnel through the reducers.

If we are not setting mapred.reduce.task to -1 as stated above, we need to calculate the number of reducer to be configured. The formula for the calculation is

 

Max(1, Min(hive.exec.reducers.max [1099], ReducerStage estimate/hive.exec.reducers.byte.per.reducer)) x hive.tez.max.partition.factor [2]

 

TEZ uses the above formula to properly define the number of reducer with the -1 value before scheduling the TEZ DAG.

Tez settings for select query optimization

This optimizes "select statement with where clause" on ORC tables.

set hive.optimize.index.filter=true;

 

This optimizes "select statement with limit clause;" to run < 1 second

set hive.fetch.task.conversion=more;

 

This optimizes "select count (1) from table;" to run in ~1 second

set hive.compute.query.using.stats=true;

Tez settings for multi tenancy

In Hiveserver2, to improve performance by turning off some isolation & sharing sessions between JDBC queries, the tez setting for multi-tenancy is

tez.am.container.session.delay-allocation-millis=1000

 

This means that a container idling for more than 1 second will be killed. This is ideal for re-use within a query, but will free up resources between queries. This has been fairly good for multi-tenancy and keeps reuse working within long-running queries. But for a single query perf run, you can set this to 2 minutes for most queries to reuse containers from a previous query in the same session.

Using Vectorization

Vectorized query execution improves performance of operations like scans, aggregations, filters and joins, by performing them in batches of 1024 rows at once instead of single row each time.

Introduced in Hive 0.13, this feature significantly improves query execution time, and is easily enabled with two parameters settings:

 

set hive.vectorized.execution.enabled = true;

set hive.vectorized.execution.reduce.enabled = true;

 

Using Cost Based Optimization

Hive optimizes each query’s logical and physical execution plan before submitting for final execution. These optimizations are not based on the cost of the query.

A recent addition to Hive, Cost-based optimization, performs further optimizations based on query cost, resulting in potentially different decisions: how to order joins, which type of join to perform, degree of parallelism and others.

To use cost-based optimization (also known as CBO), we need to set the following parameters at the beginning of your query:

 

set hive.cbo.enable=true;

set hive.compute.query.using.stats=true;

set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

 

Now we can prepare the data for CBO by running Hive’s “analyze” command to collect various statistics on the tables for which we want to use CBO.

 

analyze table table_name compute statistics;

 

Example:

analyze table tweets compute statistics for columns sender, topic;

 

With HIVE 0.14 (on HDP 2.2) the analyze command works much faster, and we don’t need to specify each column, so we can just issue:

analyze table tweets compute statistics for columns;

 

Now executing a query using this table should result in a different execution plan that is faster because of the cost calculation and different execution plan created by Hive.

 

Better Workload Management by Using Queues

Queues are the primary method used to manage multiple workloads. Queues can provide workload isolation and can guarantee that capacity is available for different workloads. Queues can also support meeting Service Level Agreements (SLAs) for different workloads. Within each queue, you can allow one or more sessions to live simultaneously. Sessions cooperatively share the resources of the queue.

For example, if you have a queue that is assigned 10% of cluster resources, those cluster resources can be allocated anywhere in the cluster, depending on the query and data placement. Where resources are allocated might change as more queries run.

 Picture1.png

 

null

 

 

Tez Container size configuration

Using map joins is very efficient because one table is held in memory as a hash map on every node and the larger fact table is streamed. This minimizes data movement, resulting in very fast joins. However, there must be enough memory for the in-memory table so you must set more memory for a Tez container with the following settings in hive-site.xml:

 

Set the Tez container size to be a larger multiple of the YARN container size (4GB):

SET hive.tez.container.size=4096MB

 

Set how much of this memory can be used for tables stored as the hash map (one-third of the Tez container size is recommended):

SET hive.auto.convert.join.noconditionaltask.size=1370MB

 

Note: The size is shown in bytes in the hive-site.xml file, but set in MB with Ambari.

 

If we find that you are not getting map joins, we need to check the size of your Tez containers in relation to YARN containers. The size of Tez containers must be a multiple of the YARN container size. For example, if our YARN containers are set to 2GB, set Tez container size to 4GB. Then run the EXPLAIN command with our query to view the query execution plan to make sure we are getting map joins instead of shuffle joins. Keep in mind that if our Tez containers are too large, the space is wasted. Also, do not configure more than one processor per Tez container to limit the size of our largest container. As an example, if you have 16 processors and 64GB of memory, configure one Tez container per processor and set their size to 4GB and no larger.

 

Enable Compression in Hive

For data intensive workloads, I/O operation and network data transfer will take considerable time to complete. By enabling compression in hive, we can improve the performance hive queries and as well as save the storage space on HDFS cluster. Compression can be enabled at various stages like on intermediate data, final output and at time of table creation.

 

Avoid global sorting in hive

Global sorting in hive can be achieved with ORDER BY clause but this comes with a drawback. ORDER BY produced a result by setting the number of reducers to one, making a very inefficient for large datasets. When a global sorted result is not required, then we can use SORT BY clause. SORT BY produces a sorted file per reducer. If we need to control which reducer a particular rows goes to, we can use DISTRIBUTE BY clause.

 

For example

SELECT id, name, salary, dept FROM employee
DISTRIBUTE BY dept
SORT BY id ASC, name DESC;

Here dept will be processed separately by a reducer and the records will be sorted by id and name fields with in each dept separately.

Optimize LIMIT Operator

By default LIMIT operator executes the entire query, then returns a limited results. Ideally this behavior is wasteful. It can be avoided by setting below properties

<property>
<name>hive.limit.optimize.enable</name>
<value>true</value>
<description>Whether to enable to optimization to trying a smaller subset of data for simple LIMIT first.</description>
</property>

<property>
<name>hive.limit.row.max.size</name>
<value>100000</value>
<description>When trying a smaller subset of data for simple LIMIT, how much size we need to guarantee each row to have at least.</description>
</property>

<property>
<name>hive.limit.optimize.limit.file</name>
<value>10</value>
<description>When trying a smaller subset of data for simple LIMIT, maximum number of files we can sample.</description>
</property>

<property>
<name>hive.limit.optimize.fetch.max</name>
<value>50000</value>
<description>Maximum number of rows allowed for a smaller subset of data for simple LIMIT, if it is a fetch query. Insert queries are not restricted by this limit.</description>
</property>

Enable parallel execution

Hive converts a query into one or more stages. Stages could be map reduce stage, merge stage, a limit stage etc. By default hive executes these stages one at a time. A particular job may consists of some stages that are not dependednt on each other and could be executed in parallel. This helps the overall job to complete more quickly. Parallel execution can be enables using below properties.

<property>
<name>hive.exec.parallel</name>
<value>true</value>
<description>Whether to execute jobs in parallel</description>
</property>

<property>
<name>hive.exec.parallel.thread.number</name>
<value>8</value>
<description>How many jobs at most can be executed in parallel</description>
</property>

Use ORC file format

Using Optimized Record Columnar file format, we can improve the the performance of hive queries very effectively. Below picture depicts the power of ORC file over other format.

ORC.png

Optimize Join

We can improve the performance of joins by enabling Auto convert Map join and enabling optimization of skew joins.

 Auto Map Join:

Auto map join is very useful when joining a big table with a small table. If we enable this feature, the small table will be saved in the local cache on each node and then joined with the big table in the map phase itself. This fearure provides two advantages

  • Loading a small table into cache will save read time on each data node
  • It avoids skew joins in the hive query since the join operation has been already done in the map phase for each block of data.
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
<description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size</description>
</property>

<property>
<name>hive.auto.convert.join.noconditionaltask</name>
<value>true</value>
<description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size. If this parameter is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than the specified size, the join is directly converted to a mapjoin (there is no conditional task).</description>
</property>

<property>
<name>hive.auto.convert.join.noconditionaltask.size</name>
<value>10000000</value>
<description>If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. However, if it is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than this size, the join is directly converted to a mapjoin(there is no conditional task). The default is 10MB</description>
</property>

<property>
<name>hive.auto.convert.join.use.nonstaged</name>
<value>false</value>
<description>For conditional joins, if input stream from a small alias can be directly applied to join operator without filtering or projection, the alias need not to be pre-staged in distributed cache via mapred local task. Currently, this is not working with vectorization or tez execution engine.</description>
</property>

Skew Join

We can enable optimization of skew join i.e. imbalanced joins by setting hive.optimize.skewjoin property to true.

<property>
<name>hive.optimize.skewjoin</name>
<value>true</value>
<description> The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce job, process those skewed keys. The same key need not be skewed for all the tables, and so, the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a map-join.</description>
</property>

<property>
<name>hive.skewjoin.key</name>
<value>100000</value>
<description>Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator, we think the key as a skew join key.</description>
</property>

<property>
<name>hive.skewjoin.mapjoin.map.tasks</name>
<value>10000</value>
<description>Determine the number of map task used in the follow up map join job for a skew join. It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.</description>
</property>

<property>
<name>hive.skewjoin.mapjoin.min.split</name>
<value>33554432</value>
<description>Determine the number of map task at most used in the follow up map join job for a skew join by specifying the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.</description>
</property>

Setting Fair Scheduler ON in the Capacity Scheduler View in Ambari.

“Fair scheduling" policy in YARN is introduced in HDP 2.3. Fair scheduling enables all sessions running within a queue to get equal resources. For example, if there is a query running already in a queue and taking up all of the resources, when the second session with a query is introduced, the sessions eventually end up with equal numbers of resources per session. Initially, there is a delay, but if ten queries are run concurrently most of the time, the resources are divided equally among them.

 

References
 
If you want to know how we can assist you in your big data initiatives please click here 
 
Know More

You may also like:

Big Data Data Modernization Azure Migration Azure Databricks

Making the Most Out of Your Azure Investment with Azure Databricks

With End of Support for Microsoft’s most popular SQL Server 2008, moving to Azure is the obvious next step. While many b...

Big Data Hadoop

HIVE-TEZ SQL Query Optimization Best Practices

Introduction Whie working on my current project for a large bank on a data warehouse and processing engine built using H...

Big Data

Hadoop Ecosystem

Various components of Hadoop ecosystem