# Optimizing CAL Report Hadoop MapReduce Jobs

#### eBay's Central Application Logging system (CAL) collects log data from all kinds of applications. The summary reports for log data are created using Hadoop MapReduce jobs. This article discusses our experiences optimizing these jobs.

eBay's Central Application Logging (CAL) collects log data from all kind of applications to provide a log viewer and summary report service. Application owners can use logging summary reports to find out the following:

• Percentile of time spent on each transaction
• Service transaction calls
• DB operations for an application

CAL uses Hadoop MapReduce jobs to generate reports for each kind of application.

This article discusses about our experiences optimizing Hadoop jobs. We hope to provide developers some insight to the identification and solutions/options to optimize and resolve Hadoop MapReduce (MR) problems, including the reduced consumption of resources.

### Problem statement

There are three issues that need to be addressed:

• Data Set—The volume of CAL logs is increasing 70% YoY. Also, the CAL upstream applications log patterns are different. Some are database operation intensive and some contain complex nested transactions, and the log volume of each application varies widely.
• Computing Resource—CAL uses a shared Hadoop Cluster that can only use 19% capacity to run jobs during the hours of 23:00-8:00 GMT-7. Additionally, jobs running more than 1 hour are killed. A current job takes ~50% resources of all clusters to complete.
• Success Rate—The success rate is 92.5% for current MapReduce jobs. Also, some jobs take up to 6 hours to complete.

### MapReduce jobs

Before we go into the solution, let’s briefly visit the Hadoop MapReduce phases.

There are five phases in the CAL Hadoop MR job: CombineFileInputFormat, Mapper, Combiner, Partitioner, and Reducer.

Figure 1: Hadoop MapReduce Job Phases

In CombineFileInputFormat, small files are combined into a split, and the split is assigned to a mapper to be processed. The split is a logical representation of the data stored in file blocks on Hadoop.

Mapper transforms input records into intermediate records, which are then processed later in the reducer.

The intermediate records are transferred from the mapper node to the reducer node through the network. If the intermediate record set is too large, the effort would be expensive. The combiner is a semi-reducer in MR that preprocesses intermediate results and delivers them to the reducer.

The partition assigns intermediate results to different reducers, according to the key of the intermediate results. Bad practices can result in data skew in the reducer.

Reducer reduces a set of intermediate values that share a key to values you would expect.

The duration of a Hadoop job depends on the slowest map task and the slowest reduce task.

Given:

• $T_{Map}$ stand for Map task duration
• $Count_{Map}$ stand for Map task number
• $T_{Reduce}$ stand for Reduce task duration
• $Count_{Reduce}$ stand for Map task number

Then $T$, which is the MR job duration, could be expressed as:

$$T = \max_{Count_{Map}}T_{Map} + \max_{Count_{Reduce}}T_{Reduce}$$

Map time $T_{Map}$ or reduce time $T_{Map}$ is proportional to the input record number $N_{input}$ and its output record number $N_{output}$.

$$T_{Map} = { \theta * (\alpha_{0} * N_{input}+ (\alpha_{1} * N_{output}) + T_{GC}}$$

$$T_{Reduce} = { \theta * (\alpha_{0} * N_{input}+ (\alpha_{1} * N_{output}) + T_{GC}}$$

Also, the computation complexity $\theta$ has an impact on the Hadoop job duration. For some abnormal jobs, garbage collection (GC) time $T_{GC}$ should be take into consideration.

In order to control the Hadoop job duration, we need to work on three aspects:

1. GC, which can reduce $T_{GC}$
2. Data skew, aimed at reducing max $T_{Map}$ and max $T_{Reduce}$
3. Computation, which is about reducing $\theta$ in the expression

#### Resource usage

Given:

• $Size_{Map}$ stands for the Mapper container memory size in an MR job
• $Size_{Reduce}$ is the container memory size for reducer
• $Size_{AM}$ stands for the application master container memory size in an MR job
• $Count_{MAP}$ stands for the number of mappers in each MR job
• $Count_{Reduce}$ is for the reducer

Then

• Total memory resource usage as R should be related to container size and container holding time, which can be expressed as:

$$R = \sum_{Count_{Map}}Size_{Map}* T_{Map} + \sum_{Count_{Reduce}}Size_{Reduce}* T_{Reduce} + T_{job} * Size_{AM}$$

Therefore, to reduce resource usage, we need to do the following:

1. Reduce Map or Reduce task number: $Count_{Map}$
2. Decrease Map or Reduce task container size: $Size_{Map}$, $Size_{Reduce}$
3. Optimize time cost: $T_{Map}$, $T_{Reduce}$, $T_{job}$

### Solution

#### Garbage collection problem

Garbage collection is the standout problem in CAL. The reason for job failure usually is “GC overhead” or “Out of Memory.” Even for successful jobs, GC times are high. At the same time, jobs are using the CPU of the Hadoop node during GC.

##### Garbage collection in Mapper

In CAL, the logging event is represented as a CAL record. A CAL transaction is a logical concept that is composed of multiple records.

A transaction logs events related to a user request. Each transaction might involve other transactions in response to its user request. That is, transactions are organized in a tree-like hierarchical structure. Each transaction only knows its parent transaction, but the child transaction wants to know its root transaction. However, a transaction is not always processed before its children. Hence, all transaction information needs to be kept for traceability. And for any transaction, we have no idea when its children's transactions have been totally received. We need keep all those transactions in order to give the upcoming transaction information about its parent transaction or its children's transactions.

To summarize, Figure 2 shows a typical tree-like hierarchy of transactions. Transaction F is the root transaction. It will involve transaction B and G to handle a user request. If we already have transaction F, then transaction B, and finally transaction C, we can’t help C find its root transaction F until the transaction D comes in. Transaction B should always be kept in memory so that its child transaction A can be reached, because B has no idea how many children it has, same as other transactions.

Figure 2: Transaction Hierarchy

Unfortunately, this situation will cause out of memory (OOM). A transaction should have a time window property. If the time window is t, and the start timestamp of transaction is ts, all child transactions should happen before ts + t.

In my experiment, the time window was set at 5 min. We verified this assumption with logging data from 12 applications with the largest log volume. By discarding transactions that are out of this time window, 10 of 12 applications could promise almost 100% accuracy. At the same time, we set a whitelist for the remaining two applications, disabling this function for them.

The time window reduces the Hadoop job duration effectively and increases success rate from 93% to 97%.

Figure 3: Transaction and Metrics

Previously, mapper would read all records for that transaction in memory, then extract useful metrics once we got entire transaction, as shown in Figure 3. However, we can get metrics while reading records; whole transaction information is not required.

The combiner could reduce data transfers between the mapper and reducer tasks. But, when the output of mapper is even larger, GC time would be high because of sorting. So, we pre-aggregate in mapper to decrease output intermediate record numbers.

##### Garbage collection in reducer

Reducer was facing a GC issue similar to the Mapper's issue.

In MR, metrics are extracted from raw logs with its timestamp. Time granularity for a CAL report is 15 minutes, so the timestamp could round up to 4 timestamp values in an hour. The reducer of a CAL report job outputs two kinds of files—a base file that is used to store metrics in a 15-minute time granularity and an aggregate file that is used to store metrics in 1 hour time granularity.

In Reducer, the input record is sorted according to its key. The previous key format is on the left in Figure 4. Aggregation keeps records until records for last timestamp comes in. We changed the key of input of reducer to the format on the right in Figure 4. Memory usage in reducer was reduced effectively.

Optimization shouldn’t change the output of a job, even data order. In the base file, a record is sorted by timestamp. So we write records for 15-minutes granularity into different temporary files according to its timestamp, and merge them together when reducer has completed.

Figure 4: Aggregation in Reducer

This solution solves failure in Reducer and makes Reducer scalable for increasing input.

#### Data skew

Another obvious problem is data skew. While checking the map task and reduce task in a Hadoop Job, we learned that a map task executed time differences from 3s to more than 1 hour. The reduce task experiences a similar problem.

According to the formula, assigning even input records to mapper or reducer, max $T_{Map}$and max $T_{Reduce}$can be minimized.

For data skew in Mapper, CombineFileInputFormat can help.

In previous CAL MR jobs, CombineFileInputFormat was not enabled. But now, combining log files from same application and setting the max input file size of each mapper to 256MB in CombineFileInputFormat of MR job decreases the mapper task number to around half of what was before.

For data skew in Reducer, Partition takes over. In a CAL report, there are two concepts, report name and metrics name. For each kind of report, there are multiple metrics. Previously, the partition strategy used a hash of the report name. Now, it’s changed to a hash of the report name and metrics name. Data skew is much better.

#### Repetitive computation

In the expression in the Hadoop Job duration section, the job duration should be proportional to the input record number. In my experiment, there are two data sets. Data set A is 20MB and data set B is 100MB. An MR job with input set A takes 90 minutes to complete. A job with B as input takes only 8 minutes. To figure out what is going on, we need to look into the log data.

In a CAL log, there are two types of logs: SQL and general logs. The general log is an event log. The SQL log is related to DB operations like SQL statements. The general log might reference the SQL log. Parsing the SQL log is more time consuming.

Accordingly, we counted the SQL logs in data sets A and B. They are almost equal. Counting the general log that referenced SQL log provides some idea; the number for B is much larger than A.

The implementation suffers some repetitive computation on the reference SQL log part. Each SQL would be parsed every time, if referenced. To solve this issue, we cached the parsed result of the SQL log and reused it once referenced. The job for A can now complete in 4 minutes.

#### Resource usage

After the optimization of memory for mapper, reducer, the data skew problem, and the repetitive computation issue, we also need to also look at resource usage.

The action item to reduce resource usage includes:

1. Combining small files to reduce mapper number using CombineFileInputFormat.
2. Reducing memory usage.
3. Optimizing time cost.

Item 1 was addressed in the Data skew section. The solution for item 2 was addressed in the GC problem section. Item 3 was addressed in the repetitive computation chapter. With all these items addressed, the usage of Hadoop cluster decreased from about 50% to less than 19%.

### Job verification and monitoring

Optimization work should always care about data quality. The result for a user shouldn't be changed. Before we do this work, the verification plan should come first. We can use a metric with the idempotent property for evaluation.

Another important issue is monitoring—bringing the KPI we care about, the success rate, resource usage, and job duration into a dashboard to observe trends during optimization.

### Optimization results

Before optimization, a CAL report job required 50% resources in the Hadoop cluster to complete. Now, it can be completed with 19% capacity. Figure 5 shows the memory resource usage on Hadoop Cluster. The left side shows the situation before optimization, the right is the current situation.

Figure 5: Queue Resource Usage in Hadoop Eagle

We can save more than 50% relative computing resources, which equates to nearly ~200 Hadoop nodes in the Hadoop Cluster.

Figure 6: Compute Usage in Hadoop Eagle

The success rate has increased from 92.5% to about 99.9%. A failure rate of 00.1% is due to jobs running more than 1 hour. These jobs are killed because they need more complex computing.

95% of the Hadoop jobs now have a running time of about 40 minutes, down from 90 minutes before optimization.

Figure 7: CAL Report MR Job Running Time Trend

Because the CAL log volume increases 70% YoY, the volume will continue to increase 70% each year. We need to use CombineFileInputFormat and Partition to make MR scale.

### Conclusion

Data volumes increase with each passing day. Hadoop MR is a straightforward solution for big data analysis. Coupled with increasing computing resources to handle data volume increases, optimization can be lead to major cost savings. These optimizations for MR jobs have helped save more than 50% resources than before and have increased success rates to 99.9%.