Generally, data compression techniques are used to conserve space and network bandwidth. Widely used compression techniques include Gzip, bzip2, lzop, and 7-Zip. According to performance benchmarks, lzop is one of the fastest compression algorithms, while bzip2 has a high compression ratio but is very slow. Gzip offers the lowest level of compression. Gzip is based on the DEFLATE algorithm, which is a combination of LZ77 and Huffman coding. The zlib software library writes compressed data with a Gzip wrapper. (Note that in this post we will use Gzip and zlib interchangeably.) With Gzip, compression is as fast as or faster than serial writes on disk, and the compression ratio is far better than with lzop. For decompression as well, Gzip performs better than other algorithms.

These are the reasons why Gzip is one of the most popular and widely used data compression techniques in the industry. However, Gzip has limitations: you cannot randomly access a Gzip file, and you cannot split a Gzip file in the case of Hadoop map-reduce jobs. As a result, Gzip is slower in those scenarios, and it does not leverage the power of parallel processing.

The eBay Cloud Platform team is happy to announce the open-source project GZinga, which aims to provide two additional capabilities for Gzip-compressed files:

  • Seekable: Random search within Gzip files
  • Splittable: Division of a Gzip file into multiple chunks

Motivation

It is common to collect logs from production applications and use them to debug and triage issues. Log files are generally rolled over periodically (hourly, daily) or based on size. To save disk space, log files are stored in a compressed format such as Gzip.

In most outage scenarios, developers are interested in looking at logs for a particular activity or around a certain timestamp. Scanning an entire file to find the log for a particular time period is costly. The data for these logs can be stored on commodity storage like Hadoop. However, to take advantage of Hadoop’s capabilities, small chunks of large Gzip files need to be processed in parallel (for files beyond a few hundred MBs).

The idea of GZinga originated as a means to provide optimal performance for reading/processing Gzip files. Though we have looked at options for log files, the technique we’ve developed can apply to other use cases—textual documents, web crawls, weather data, etc.

Seekable Gzip: write

In this section, we will discuss generating Gzip files with meta information that will be used to perform random access in those files. We use two techniques from the Gzip specification—multiple compressed blocks and the Gzip header flags format—to make files seekable.

Multiple compressed blocks

A Gzip file consists of a series of “members” (compressed data sets). The format of each member is specified as follows:

   <Header>Compressed Data<Footer>

As a Gzip file can have a series of such members, an actual Gzip file can look like this:

   <Header>Compressed Data<Footer><Header>Compressed Data</Footer>….

Such multiple compressed blocks can be achieved using the Z_FULL_FLUSH option, according to the zlib manual. These blocks help us in reading a file from any arbitrary location after jumping to its inner header location. The main advantages of this feature are that it is not necessary to uncompress the entire file and it is possible to read from different locations. Only the requested compressed block will be uncompressed, which improves performance considerably.

As the deflater gets reset for every block, compression efficiency (both size and processing) diminishes. However, we’ve observed that when data is compressed using zlib, then the impact on the amount of compression is minimal. The following table shows the difference in compression and timing for logs, with and without the multiple-blocks approach. In all of these scenarios, 60 blocks are written—1 per minute for an hourly log file.

Original file size (in MB) Time taken with compressed blocks Time taken without compressed blocks Δ increase in time File size with compressed blocks (in MB) File size without compressed blocks (in MB) % increase in file size
8.12 0.41 0.387 0.023 1.078 1.076 0.19
105.5 3.685 3.49 0.195 13.982 13.981 0.007
527.86 14.54 14.12 0.420 69.904 69.903 0.001
2111.47 53.44 52.6 0.840 279.614 279.6 0.0005

 

It is evident that when the size per block is greater than 1 MB, the processing and space overhead is minimal and can be ignored. (Note that a smaller block size may not be suitable to use, as it is reasonably fast to decompress such blocks).

Header flags format

The header section has provision for extra comments, according to the Gzip specification. Each header has the following format :

+---+---+---+---+---+---+---+---+---+---+
|ID1|ID2|CM |FLG|     MTIME     |XFL|OS | (more-->)
+---+---+---+---+---+---+---+---+---+---+
  1   2   3   4   5   6   7   8   9   10

The 4th byte of the header represents the flag information:

bit 0   FTEXT
bit 1   FHCRC
bit 2   FEXTRA
bit 3   FNAME
bit 4   FCOMMENT
bit 5   reserved
bit 6   reserved
bit 7   reserved

Bit 4 can store a zero-terminated file comment at the end of the block. We can use this flag to store an index for the file. The interface for providing the index is shown below in the GZipOutputStreamRandomAccess class :

class GZipOutputStreamRandomAccess extends DeflaterOutputStream {
private Map<Long, Long> offsetMap = new LinkedHashMap<Long, Long>(); //will maintain index where value provides byte offset for given key

/** This method adds current byte offset (in gzip file) for given key.*/
public void addOffset(Long key) {
    }

/**Writes header with extra comment which contains entries from offsetMap.*/
private void writeHeaderWithComment() {
    }
}

The comment section in the header field will contain information about the index collected up until that point. It should be noted that comment blocks are ignored by standard libraries that don’t look for comments or extra fields. The comments are added as a name / value pair as shown below:

+---+---+---+---+---------------+---+---+-----------------------------------+
|ID1|ID2|CM |FLG|     MTIME     |XFL|OS | key1:value1;key2:value2;........;0|
+---+---+---+---+---+---+---+---+---+---+-----------------------------------+
  1   2   3   4   5   6   7   8   9   10  Index in specific format ending with 0

At the time of the file close operation, an extra block is added with a header that contains information about all indices, and a footer without any data. This extra block can be used in locating the header deterministically while reading, as described in the next section. The effective size of the file increases due to the comment block as described below (in a test with 60 blocks):

Original file size (in MB) Time taken with index data(in ms) Time taken without index data (in ms) Δ increase in time File size with index data (in MB) File size without index data (in MB) % increase in file size
8.12 0.582 0.387 0.195 1.154 1.08 6.85
105.5 3.926 3.49 0.436 14.09 13.98 0.79
527.86 15 14.12 0.880 70.02 69.9 0.17
2111.47 54.35 52.6 1.750 279.72 279.6 0.04

Seekable Gzip: read

To take advantage of an index written in the header, the file will first be open for random access and the last few chunks of data will be read. Then the header needs to be located and the index loaded into memory. Byte comparison (reading in reverse order) will be employed to locate the header, and then the index information will be extracted. Based on the requested index, the read marker will be jumped to the required location and the stream will be passed to Gzip for uncompression of the data. The interface for jumping to the desired location and reading the metadata information is shown in the below GZipInputStreamRandomAccess class:

class GZipInputStreamRandomAccess extends GZiPInputStream {


/** Return metadata information for given file.*/

public Map<Long, Long> getMetadata();


/** This method jump to location for specified key. If specified key does not exist, then it jumps to beginning of file.*/

public void jumpToIndex(Long index) throws IOException;

}

Reading the index and then jumping to the desired location improves performance dramatically. The following table provides the numbers for jumping to the last block of data and reading one record from there:

Original file size (in MB) Compressed file size (in MB) Time taken with index data (in sec) Time taken without index data (in sec)
8.12 1.08 0.162 0.266
105.5 13.98 0.166 0.87
527.86 69.9 0.167 3.59
2111.47 279.6 0.17 13.71

 

We also observed that it takes the same amount of time irrespective of which index needs to be jumped to before reading one record. Overall, this approach provides significant improvement in performance, with the extent of improvement depending on file size (e.g., 10-50x gain).

Random access in Hadoop

SeekableGZipDataInputStream is an extension of Hadoop’s FSDataInputStream class to enable random access for Gzip files that are stored inside Hadoop. Using this extended class, one can jump to any location in a Gzip file stored in Hadoop and read required data from there with much faster performance. Here is an example of using this class to perform random access:

FSDataInputStream fin = fs.open(new Path("testfile"));
long len = fs.getFileStatus(new Path("testfile")).getLen();
SeekableGZipDataInputStream sin = new SeekableGZipDataInputStream(fin, len);
GZipInputStreamRandomAccess gzin = new GZipInputStreamRandomAccess(sin);

Splittable Gzip

In many Hadoop production environments, you get Gzipped files as the raw input. When putting these Gzipped files into Hadoop, an MR job runs with only 1 map task, as a Gzip file is not splittable. This fact is the biggest disadvantage to Gzip, because it defeats the real power of Hadoop. But when we generate Gzip files with the above Seekable approach, then it is possible to split a Gzip file and feed it to an individual map task for processing.

Determining the split

A Gzip file with multiple compressed blocks allows for splitting the file based on the start of a new header. In our approach, we implemented a SplittableCompressionCodec interface for the GzipCodec class. Using this interface, when a split is being invoked with the “start” and “end” position of the split provided, it locates the start of the header from the provided “start” (and “end”) position and sets the new “start” (and “end”) pointer once located. For example, given the following split as the original (or default):

gzinga

The new “start” and “end” location will be set as shown below:

gzinga2

Compressed data before the “start” position will be processed by the previous split (both the “start” and “end” positions will point to the next header).

Configuration changes

In order to use this feature, one needs to set the io.compression.codec property in the configuration object before submitting the MR job. Instead of org.apache.hadoop.io.compress.GzipCodec for Gzip files, the value should be io.gzinga.hadoop.SplittableGzipCodec:

io.gzinga.hadoop.SplittableGzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec

This value can also be set in the mapred-site.xml file to take effect for all MR jobs.

Another important property to set is for splitting the size. Property mapreduce.input.fileinputformat.split.maxsize indicates the maximum split size. The number of splits will depend upon the value of this property and the actual Gzip file size. The greater the number of splits, the faster the MR job. Below are performance numbers indicating improvement with respect to splits (where the input Gzip file size is 280MB and the MR job is to perform the wc command on the file):

Split Size (MB) # of Splits Avg Map Time (sec) Job Time (sec)
32 9 24 50
64 5 36 55
128 3 52 79
300 1 127 139

 

As these performance numbers show, the map task took 127 seconds when run with a single split, compared to 52 seconds when run with 3 map tasks. Also note that when the split size decreases (and the number of splits increases), the map task time decreases accordingly.

Conclusion

Gzip provides one of the best compression algorithms and is widely used in the industry. But with its lack of support for random access search and file splitting, it is not able to leverage the power of parallel and distributed processing systems like Hadoop. With the introduction of the Seekable and Splittable features, Hadoop access can be achieved with high performance.