Recently, eBay completed the migration of more than 20 petabytes of data from a vendor’s analytics platform to an open-source-based Hadoop system built in house. This transition uncoupled eBay’s tech-led reimagination from a third-party service provider. At the same time, it also offered the opportunity for eBay to build a suite of complementary open-source-based systems supporting the analytics user experience.
One challenge we confronted with the migration was designing a SQL execution engine which mirrored the previous platform’s speed, stability and scalability. There was a gap in performance of the customized Spark SQL engine, especially with the SQL execution speed at scale. For example, a query with multiple joins could execute in a matter of seconds on the old tool, while the same query could take several minutes in the new SQL-on-Hadoop engine, particularly when multiple users execute queries concurrently.
To close this gap, eBay’s optimized SQL-on-Hadoop engine offers speed coupled with high availability, security and reliability. The core component is a customized Spark SQL engine built on Apache Spark 2.3.1 with rich security features such as software-based security instead of physical firewalls, view-based access controls for data and the TLS1.2 protocol. Significant optimizations and customizations were made to ensure the new SQL-on-Hadoop engine would provide a seamless bridge between the previous proprietary software and eBay’s own in-house analytics platform.
Figure 1 represents the overall architecture. The gateway is the system’s access point and is Tess deployed. Business intelligence tools like Tableau, Microstrategy or R – as well as any other analytics applications – can use jdbc/odbc protocols to connect to the system and run SQL commands. The gateway is compatible with the Hive thrift protocol, which is responsible for client connection authentication and traffic distribution.
The customized SQL-on-Hadoop engine is a Spark thrift server running in the yarn cluster. eBay domain organizations have dedicated yarn queues to execute their respective workloads, thus avoiding resource contention. When the Spark thrift server is started, a specified number of executors will be allocated and started in the queue. The thrift server and executors are long-running services that help serve all SQL requests coming to that queue. All table metadata is stored in a shared Hive metastore that resides on a separate “Generic Cluster,” and tables are accessible by the system’s executors.
The authentication and cluster/queue access permission check is done in the gateway. Currently it supports two authentication mechanisms: Keystone (eBay’s internal authentication service) and Kerberos. In addition, for database- or table-level access, the engine has SQL-based access controls that can be managed by individual table owners who can grant or revoke access to their databases using queries (sample below). Lastly, the underlying Hadoop Distributed File System (HDFS) cannot be directly accessed by individual users.
GRANT SELECT ON table1 TO USER user1;
GRANT SELECT ON DATABASE db1 TO USER user1;
GRANT SELECT ON table1 TO ROLE role1;
GRANT INSERT ON table1 TO USER user2;
Apache Spark by default does not support update/delete SQL commands. However, this feature was widely used by eBay on the vendor platform. The new SQL-on-Hadoop engine was upgraded with Spark SQL syntax using Delta Lake to support these operations. Besides basic update/delete, it also supports update/delete with join (sample below).
FROM events e, transaction t
SET e.eventDate = t.transactionDate, e.tid = t.id
WHERE e.id = t.id;
eBay users often require the ability to upload large CSV files to an existing database table, or download large datasets from tables onto their local computers. Furthermore, integration with business intelligence tools like Microstrategy and Tableau requires the ability to download large datasets.
The new engine enabled this by providing a powerful Download API for large datasets. The API gives users the option to save the SQL results to HDFS in Parquet or CSV format, after which the user can download the raw data directly to the client side. This API does not require back-and-forth thrift remote procedure calls (RPCs) compared to the typical JDBC Retrieve API. The engine’s new API supports downloading more than 200 GB files and is four times faster than the standard JDBC API.
eBay users often need to create lots of temporary tables when developing personal datasets or when testing new data pipelines. Using a “temp view” to create such temporary tables results in a large and complex SQL execution plan which can cause issues when users want to analyze or optimize the execution plan. To counter this, the new platform was upgraded to support the creation of “volatile” tables. Compared to a “temp view” the volatile tables are materialized, meaning they will be dropped automatically when the session is closed – thus avoiding additional complexity to users’ SQL execution plans while also providing them with the ability to create temp tables in a quick and easy fashion.
In addition to the above features, the SQL-on-Hadoop engine was upgraded with new Spark SQL syntax to increase the ease of SQL writing for users:
- Like any/all – A function to match various patterns or parts of text;
- Drop partition with expression – Support to drop a specific range or section of the partition;
- Compact table support – Used to consolidate small files in HDFS into large files and avoid impacting scan performance due to excessive small files; and
- Supporting column list specification in “insert into” statements – Syntax enabling integration with third-party tools like Adobe.
SQL execution performance was a critical component of this migration. Users were required to provide execution speeds that matched the performance of the vendor system. To achieve that, multiple query acceleration features and techniques were incorporated.
Transparent Data Caching
Production datasets are stored in a shared Hadoop cluster, and most production datasets are huge. This cluster is shared by all domain teams and is always busy. Therefore, the new SQL-on-Hadoop engine cannot scan the shared cluster’s HDFS every time a user wants to access production datasets since the scan performance can be significantly impacted by the instability of the shared cluster.
In contrast, the cluster used for ad-hoc analysis is a dedicated Hadoop cluster with SSD storage, so it is more stable and faster than the shared cluster. A transparent data cache layer was introduced on the dedicated analytics cluster which is used to cache frequently accessed datasets. An airflow job periodically checks the changes to the underlying production dataset copied from the shared cluster. If the job detects changes in the cached dataset, a DISTCP command is used to copy the changed data into the cached HDFS.
The data cache layer is transparent to users. This guarantees that users can always retrieve the latest data while also increasing the scan speed by four times, thus making the new platform more stable.
SQL users require the ability to scan a small portion of large datasets – for example, analyzing a user’s transaction behavior or gathering a user’s page visit statistics. Scanning the whole dataset in such cases can be inefficient and cost valuable system resources.
Spark provides the option to create bucket/partition tables to address this issue, but it still lacks flexibility since the bucket/partition is fixed after the table is created. The new SQL-on-Hadoop engine was upgraded with an index feature to support these types of use cases. Indexes are independent from the data files so they can be applied and removed as needed.
Currently the new platform supports a Bloom filter-type index. Bloom filter is a space-efficient data structure which is used to test whether an element is a member of a set: False positive matches are possible, but false negatives are not. The new engine supports using SQL to create and drop Bloom filter indexes for parquet format tables, as well as file-level and row-group-level Bloom filters.
Index data contains two parts: the index file and the index metadata file. To avoid too many small HDFS files, one index file for a set of data files is created, and the index metadata file describes the index file. Here is the format of index files and metadata files:
When a user’s SQL hits the index, the new engine will pass the index metadata to the Spark executor side for task execution, and the task will prune the files or row groups accordingly.
Adaptive Query Execution
Adaptive Query Execution (AQE) is a highly effective feature in Spark 3.0. It significantly improves SQL performance in many cases. (AQE introduction and implementation documentation can be found in this blog.) The new platform backported the AQE implementation and modified the code to make it compatible with the Spark 2.3 version on which our Hadoop-Spark system is built. In addition, improvements for AQE were also made for better handling of skew joins.
Original skew joins can only handle basic sort-merge join cases. The join operator’s left and right children must be sort-and-shuffle operators, as shown in Figure 2 below:
But, in the new engine, SQL runs into skew joins which don’t match the above patterns. AQE was extended to accommodate more scenarios:
1. Support joins where one side is a bucket table case:
A new operator -PartitionRecombinationExec was added to the bucket table side, as well as duplicate partitions that need to be read multiple times when doing the skew join handling.
2. Support aggregation case:
Skew join handling doesn’t guarantee that each operator result is correct. For example, in the above execution plan, when the left side is skew, after applying the skew join the HashAggregate result may not be correct because it will duplicate the read operation on some partitions. After SortMergeJoin the result will be correct because the duplicated records will be de-duped in the SortMergeJoin operator.
Most of eBay’s data tables have a bucket layout and are more suitable for “sort-merge joins” since they eliminate the need for additional shuffle-and-sort operations. But what happens if tables have different bucket sizes or the join key is different from the bucket key? The new SQL-on-Hadoop engine can handle this scenario with the “MergeSort” or “Re-bucketing” optimization features.
Consider an example where the user wants to join Table A with Table B. If the bucket size of Table A is 100 and the bucket size of Table B is 500, both tables would need to be shuffled before they could be joined. The “MergeSort” feature will identify that the ratio of bucket sizes of Tables A and B is 1:5, and merge every five buckets in Table B into one, thus bringing its overall bucket size to 100 – matching it with the bucket size of Table A. This removes the need for shuffling all data and executes the join operation much faster. Similarly, the re-bucketing feature will take the table with the smaller bucket size (Table A) and further divide each bucket into five buckets, thus increasing its bucket size to 500 and matching it with that of Table B before executing the join operation.
Parquet Read Optimizations
Most of eBay’s data is stored in parquet format. The new engine provides many optimization opportunities for reading parquet files like:
- Reducing the parquet read RPC call: The community version of Spark needs to call the Hadoop namenode multiple times when reading a parquet file, including reading the footer, getting the file status, reading the file contents, etc. On the new platform the whole reading process is optimized and the namenode rpc call is reduced to one third of the original size.
- Introducing a multi-thread file scan: In Spark, when the scan table is a bucket table, the task number is usually the same as the bucket number. Some tables are very large, but the bucket number is not big enough to avoid too many small files being created in HDFS. For example, Table A is a partition and bucket table, and it is partitioned by date column, with more than 7,000 partitions for 20 years’ worth of data. If the bucket number is set to 10,000, then it will have more than 70,000,000 files in HDFS for this table. So, the solution is to make the bucket number smaller, and then one task needs to scan multiple large files. When the files are located in the shared HDFS, the data read will be the bottleneck of the SQL execution. So eBay developed the multi-thread file scan feature: When one task needs to scan multiple files, it can be configured to use multiple threads to do the scan. In some cases it can increase the table scan speed by three or four times.
- Pushing down more filters to parquet: The new SQL-on-Hadoop engine’s Spark pushes more filters to parquet to reduce the data pulling from HDFS.
Dynamic Partition Pruning (DPP) and Runtime Filter
Dynamic Partition Pruning (DPP) is a new feature in Spark 3.0. It is implemented by adding a dynamic-partition-pruning filter if there is a partitioned table and a filter on the dimension table. (A detailed introduction and implementation description can be found in this article.) This feature improves the performance of join queries of partitioned tables using partition columns in a join condition, and was backported to the new SQL-on-Hadoop engine’s Spark version.
DPP and AQE cannot co-exist in the community version – which means when enabling AQE, DPP cannot work – but both features are required for the new SQL-on-Hadoop engine. Therefore the DPP code was refactored to make it work when AQE is enabled.
The new SQL-on-Hadoop engine also implements runtime filters to improve query performance. The implementation is similar to DPP. When a big table is joined with a smaller table, results and statistics are collected from the smaller table and used to scan the big table to perform a data filter before executing the join. This greatly reduces the join records in some cases. You can find a sample illustration in Figure 3 below.
In addition to the above features and strategies, many other improvements to query performance have been made through scheduler changes, lock optimizations in the driver, materialized views and range partitions.
With the optimizations and customizations outlined in this article, the new engine has been onboarded to production and serves all of eBay’s interactive query analysis traffic. It has over 1,200 distinct users per day with more than 260,000 queries running on the new platform, and 80% of the SQLs are answered in 27 seconds or less as shown in Figure 4 below.
The new SQL-on-Hadoop engine’s strong performance has been a key factor in the smooth rollout of Hadoop across eBay. As we continue leveraging data to power eBay’s tech-led reimagination, building our own in-house solution puts us in the driver’s seat for ongoing enhancements and innovations. Stay tuned for additional blogs in this series highlighting how we built our own analytics ecosystem.