The Expertmaker Accelerator is a well-proven data processing framework that provides fast data access, parallel execution, and automatic organization of source code, input data, and results. It can be used for daily data analysis tasks as well as operating as a live recommendation system with hundreds of thousands of large data files and many CPUs. The Accelerator is now released as open source by eBay.

The Accelerator runs on computers ranging from laptops to rack servers, handles datasets of billions of rows with ease, and keeps order in hundreds of thousands of input files, computations, and results.

Data throughput is typically in the range of millions of rows per second. On a fast computer, tens or hundreds of millions of rows per second is a reality. For example, adding all values in a column may run faster than 1000 million rows per second, all in the Python programming language.

The Accelerator was developed by the Swedish AI company Expertmaker, first deployed in 2012, and since then it has been the core tool in a number of studies and live production recommendation systems. In 2016, Expertmaker was acquired by eBay. eBay is now making the Expertmaker Accelerator available to the open source community under the Apache License, Version 2.

Design Goals

The Accelerator is designed bottom up for high performance with the following main design goals:

  • It should be easy to process data in parallel on multiple CPUs. After all, computers have shipped with multiple CPU cores for many years.
  • Data throughput should be as fast as possible. Even a small laptop should handle millions of rows of data with ease.
  • Old results should be reused, if available, and not recomputed. Similarly, sharing results between multiple users should be effortless.
  • A data science project may have lots (hundreds of thousands) of input files and lots of source code and intermediate results. The Accelerator should remove the need for manual administration and bookkeeping of data files, computations, results, and how they are related.

The Accelerator is intended to work on log files. Log files are a powerful way of storing data and includes transaction logs, event logs, database dumps, and more.

Main Functionality

The atomic operation of the Accelerator is building jobs. Building a job is the process of executing some program with input data and parameters and storing the result, i.e. the output, together with all information required to do the computation to a directory on disk. The resulting so called job directory will contain both the computed result and all information that was required to compute this result.

As we will see later, jobs can be anything from simple or complex calculations to containers of large datasets. Jobs can also link to each other, so that a new job can depend on one or more older jobs.

Key Features

Two key features stand out, result reuse and data streaming.

Result reuse

Before building a new job, the Accelerator checks whether the job has been built before. If it already exists, it will not be built again. Instead, the Accelerator will return a link to the existing job. This saves execution time and helps sharing results between users. More importantly, it provides visibility and ensures determinism.

On top of that, there is a mechanism that stores job information in sessions into a database, which helps administrating jobs and the relations to each other.

Data streaming

Streaming continuous chunks of data from disk to CPU is much more efficient than performing a random query in a database. Streaming is the optimum way to achieve high bandwidth from disk to CPU. It is cache oblivious and may make good use of the operating system’s RAM-based disk buffers. The Accelerator is built on the idea that in many data science and machine learning projects, all or a large part of the data is involved in the computations.

Technical Overview

Let’s take an overview of the Accelerator. For a more detailed overview and technical details, please see the Accelerator User’s Reference Manual.

High-level view

The Accelerator is a client-server based application, and figure 1 shows what it looks like from a high level.

overview2

Figure 1. High level view of the Accelerator framework

On the left side there is a runner client. To the right, there are two servers, called daemon and urd, where urd is optional. The runner program runs scripts, called build scripts, that execute jobs on the daemon server. This server will load and store information and results for all jobs executed using the workdirs file system-based database. In parallel, information about all jobs covered by a build script will be stored by the urd server into the job logs file system database. urd is responsible for job bookkeeping. This includes storing and retrieving sessions, or lists, of related previously executed jobs.

Jobs: executing code

Jobs are created by the execution of small programs called methods. Methods are written in Python 2 or Python 3, and sometimes in the C programming language. In a method, some reserved function names are used to execute code sequentially or in parallel and to pass parameters and results. A method that is under execution or has completed execution is called a job.

Basic job running: “Hello, World”

A simple “hello world” program is used to illustrate job building. We create a program (method) with the following contents:

def synthesis():
  return "hello world"

This program does not take any input parameters. It just returns a string and exits. In order to execute it, we create a build script that calls the method, like this:

def main(urd):
  jid = urd.build('hello_world')

When execution of the method is completed, a single link, called a jobid, is the only thing that is returned to the user. (In the example above, it is stored in the jid variable.) The jobid points to a directory where the result from the execution is stored, together with all information that was needed to run the job plus some profiling information. See figure 2.

If we try to run the job again it will not execute, because the Accelerator remembers that a job exactly like this has been run in the past. Instead of running the job again, it immediately returns the jobid pointing to the previous run. From a user’s perspective, this means that there is no difference between job running and job result re-use, except for the execution time. In order to make the job execute again, we have to change either the source code or the job’s input parameters.

job0

Figure 2. The method hello world is used to build the job test-0

Linking jobs

Assume that the hello_world job that we just built was computationally expensive, and that it returned a result that we’d like to use as input to further processing. To keep things simple, we demonstrate the principle by creating a method named print_result that just reads and prints the result from the previous job to stdout. Here it is:

import blob

jobids = ('hello_world_job',)

def synthesis(): 
  x = blob.load(jobid=jobids.hello_world_job)
  print(x)

This method expects the hello_world_job input parameter to be provided at execution time, and we will see next how to do this. The method then reads the result from the provided jobid and assigns it to the variable x, which is then printed to stdout. In this example, the method is not very useful, because it does not store anything in the job directory, so there is no result to be recalled later. However, to build this job, we extend the build script like this:

def main(urd):
  jid = urd.build('hello_world') 
  urd.build('print_result', jobids=dict(hello_world_job=jid))

When we run the build script, only the print_result job will be built, since the hello_world job was built previously.

Figure 3 illustrates the situation. Note the direction of the arrow. The second job, test-1 has test-0 as input parameter, but test-0 does not know of any jobs run in the future.

job0job1

Figure 3. Job test-0 is used as input to the print result job

Job execution flow and result passing

So far we’ve seen how to create, connect, and run simple jobs. Now we turn our focus to the methods. There are three functions in a method that are called from the Accelerator when a method is being executed, and they are prepare(), analysis(), and synthesis(). All three may exist in the same method, and at least one is required. When the method executes, they are called sequentially.

Figure 4 shows the execution order from top to bottom. Colored branches represent data passed between functions. prepare() is executed first, and its return value is available to both the analysis() and synthesis() functions. There are slices (a configurable parameter) number of parallel analysis() processes, and their outputs are available to the synthesis() function, which is executed last.

Return values from any of the three functions may be stored in the job’s directory, making them available to other jobs.

prepanasyn

Figure 4. Execution flow and result propagation in a method

Job parameters

We’ve already seen how jobids from completed jobs can be used as input to new jobs. The jobid parameter is one of three kinds of input parameters that a job can take. The others are a flexible option dictionary parameter and a set of pointers to datasets.

Datasets: storing data

The dataset is the Accelerator’s default storage type for small or large quantities of data, designed for parallel processing and high performance. Datasets are built on top of jobs, so datasets are created by methods and stored in job directories, just like any job result. A single job may contain any number of datasets, making it possible to write example jobs that split an input dataset into several new datasets.

Internally, data in a dataset is stored in a row-column format. All columns are stored, entropy coded, and accessed independently to avoid the overhead of reading data that is not necessary for each actual processing task. Data is also sliced into a fixed number of slices to allow efficient parallel access. Datasets may be hashed, so that slicing is based on the hash value of a given column. Hashing will group all dataset rows with the same value in the hashed column into one slice.

In many practical applications, hashing separates data so that parallel processes may execute independently, minimizing the need for complicated merging operations. This is explained further in the Parallel dataset access and hashing section.

Importing data

Let’s have a look at the common operation of importing a file, i.e. creating a dataset of the file’s contents. There is a standard method bundled with the Accelerator designed for this. The csvimport method has been used on many different file types and can parse a plethora of comma separated value (CSV) file formats and store the data as a dataset. The created dataset is stored in the resulting job, and the name of the dataset will, by default, be the jobid plus the string default, but a custom string may be used instead. See figure 5.

import file3

Figure 5. Importing file0.txt

Linking datasets: chaining

Just like jobs can be linked, datasets can link to each other, too. Since datasets are built on top of jobs, this is straightforward. For example, assume that we’ve just imported file0.txt into imp-0 and that there is more data stored in file1.txt. We can import the latter file and supply a link to the previous dataset, see figure 6. Since the datasets are linked, the imp-1 (or imp-1/default) dataset reference can now be used to access all data imported from both files.

Linking datasets containing related content is called chaining, and this is particularly convenient when dealing with data that grows over time, such as log data. Using chaining, we can extend datasets with more rows just by linking, which is a very lightweight operation.

import file0file1

Figure 6. Chaining the import of file1.txt to the previous import of file0.txt.

Adding new columns to a dataset

In the previous section we saw how easy it is to add more lines of data to a dataset using chaining. Chaining is implemented by simply assigning a link. Now we'll see that is it equally simple to add new columns to an existing dataset. Adding columns is a common operation and the Accelerator handles it efficiently using links.

The idea is very simple. Assume that we have a "source'' dataset to which we want to add a new column. We create a new dataset containing only the new column, and while creating it we instruct the Accelerator to link all the source dataset's columns to the new dataset. See Figure 7. Note the difference between adding columns and chaining. In chaining, whole datasets are linked to each other. When adding columns, we link columns from other datasets into the new dataset. Accessing the new one-column dataset will transparently access all the data in the source dataset too, making it indistinguishable from a single dataset.

dataset append column

Figure 7. Adding a new column to an existing dataset

 

Multiple datasets in a job

Typically, a method creates a single dataset in the job directory, but there is no limit on how many datasets that could be created and stored in a single job directory. This leads to some interesting applications.

One application where it is convenient to create multiple datasets is when splitting data into subsets based on some condition. For example, assume that we want to separate a dataset into two disjoint datasets based on a column storing a Boolean value.

filter dataset

Figure 8. job-1 separates the dataset job-0/default into two new datasets, named job-1/train and job-1/test

Figure 8 shows how job-1 has created two datasets, job-1/train and job-1/test, based on the input dataset job-0/default. A third job, job-2, is then accessing the job-1/train dataset.

Let us take a short detour and consider an example of when such dataset splitting makes sense, and how it relates to the design methodology that the Accelerator is based upon. Assume that we have a (perhaps large) dataset that we want to split into, say, a training set and a test set. When splitting, we “physically” separate the data into two sets, while still keeping all the data in the same place. This is good for transparency reasons, and any method following the split may still iterate over both subsets to read the complete data. Furthermore, it is very likely that we’ll read the training and validation datasets many times, so splitting them initially will probably save execution time in the end, even if one of the sets is small.

Parallel dataset access and hashing

As mentioned earlier, data in datasets is stored using multiple files, allowing for fast parallel reads. The slices parameter determines how many slices that the dataset should be partitioned into. This parameter also sets the number of parallel analysis() processes, so that each analysis() process operates on a unique slice of the dataset.

Datasets can be partitioned, or sliced, in different ways. One obvious way is to use round robin, where each consecutive data row is written to the next slice, modulo the number of slices. This leads to datasets with an approximately equal number of rows per slice. Another alternative is to slice based on the hash value of a particular column’s values. Using this method, all rows with the same value in the hash column end up in the same slice. This is efficient for some parallel processing tasks.

It turns out that hashing a dataset is a relatively fast operation, and in many applications this allows for efficient parallel processing without the need of merging the (independent) results. If we make an analogy to map and reduce, using the Accelerator we replace the post-reduce by a pre-hashing. This is particularly efficient if we run the computations many times, since we only do the hashing once.

More on datasets: types and attributes

There are a number of useful types available for dataset columns. They include floating and integer point numbers, Booleans, timestamps, several string types, and json types. Several of these types are designed to make importing data from text files straightforward, without parse errors, overflows, etc.

Furthermore, the dataset has a number of attributes associated with it, such as shape, number of rows, column names and types, and more. An attribute is accessed like this:

datasets = ('source',)

def synthesis():
  print(datasets.source.shape)

Iterators: working with data

Data in a dataset is typically accessed using an iterator that reads and streams one dataset slice to a CPU core. In this section, we’ll have a look at iterators for reading data, how to take advantage of slicing to have parallel processing, and how to efficiently create datasets.

Iterator basics

The first iterator example will be a sequential, i.e. non-parallel, solution to show the basic iterator concepts. A corresponding parallel solution will be presented thereafter.

Assume that we have a dataset with a column named movie containing movie titles, and we want to know the ten most frequent movies. Consider the following example of a complete method:

from collections import Counter

datasets = ('source',)

def synthesis(): 
  c = Counter(datasets.source.iterate(None, 'movie')) 
  return c.most_common(10)

This will compute and store the ten most common movie titles and their corresponding counts in the source dataset. The code will run on a single CPU core, because of the synthesis() function, which is called only once. The iterate method therefore has to read through all slices, one at a time, in a serial fashion, and this is reflected by the first argument to the iterator being None. We rely on the Counter class to do the counting and sorting.

Parallel execution

The Accelerator is designed for parallel processing, which is mainly provided by the combination of sliced datasets and parallel analysis() calls. The following modification implements a parallel version of the movie example:

def analysis(sliceno):
  return Counter(datasets.source.iterate(sliceno, 'movie'))

def synthesis(analysis_res)
  c = analysis_res.merge_auto()
  return c.most_common(10)

Here, iterate is run inside the analysis() function. This function is forked once for each slice, and the argument sliceno will contain an integer between zero and the number of slices minus one. The returned value from the analysis() functions will be available as input to the synthesis() function in the analysis_res Python iterable. It is possible to merge the results explicitly, but analysis_res comes with a rather magic method, merge_auto(), which merges the results from all slices into one based on the data type. It can for example merge Counters, sets, and composed types like dicts of Counters, and so on.

Iterating over several columns

Since each column is stored independently in a dataset, there is no overhead in reading more than one of the dataset’s columns in parallel. Iterating over several columns is straightforward by feeding a list of column names to iterate, like in this example:

from collections import defaultdict

datasets = ('source',)

def analysis(sliceno):
  user2movieset = defaultdict(set)
  for user, movie in datasets.source.iterate(sliceno, ('user', 'movie')):
    user2movieset[user].add(movie)
  return user2movieset

This example creates a lookup dictionary from users to sets of movies, and stores it to disk for future use. A special case is iterating over all columns, which is done by specifying an empty list of columns or by using the value None.

...
def analysis(sliceno):
  for rowdata in datasets.source.iterate(sliceno, None): 
    print(rowdata)
    break

This example will print the first row for each slice of a dataset and then exit.

5.5.4 Iterating over dataset chains

The iterate function iterates over a single dataset. There is a corresponding function, iterate_chain, that is used for iterating over chains of datasets. This function takes a number of arguments, such as

  • length, i.e. the number of datasets to iterate over. By default, it will iterate over all datasets in the chain.
  • callbacks, functions that can be called before and/or after each dataset in a chain. Very useful for aggregating data between datasets.
  • stop_id which stops iterating at a certain dataset id. This jobid could be from another job’s parameters, so we can, for example, iterate exactly over all new datasets not covered by a previous job. This makes is simple to do delta updates, i.e. read previous result, append anything that is new, and store.
  • range, which allows for iterating over a range of data based on a column’s values.

The range option is based on the max/min values that are automatically stored for each column in a dataset at creation time. Assuming that a chain is sorted, one can for example set

range={'timestamp', ('2018-01-01', '2018-01-31')}

in order to get rows within the specified range only. range is quite costly, since it requires each row in the dataset chain to be checked against the range criterion. Therefore, there is a sloppy version that iterates over complete datasets in the chain that contains at least one row with a value within the range. This particularly efficient if the datasets are sorted on the column that is used for range checking.

Iterating over hashed datasets

Depending on how the parallel processing is implemented in a method, some methods will only work if the input datasets are hashed on a certain column. To make sure this is the case, there is an optional hashlabel parameter to the iterators that will cause a failure if the supplied column name does not correspond to the dataset’s hashlabel.

Dataset translators and filters

The iterator may perform data translation and filtering on-the-fly using the translators and filters options. Specifying translators and filters as options makes it easier to write methods that are less application specific and more generic.

Here is an example of how a dictionary can be fed into the iterator to translate a column’s values:

mapper = {2: 'HUMANLIKE', 4: 'LABRADOR', 5: 'STARFISH',}

for animal in datasets.source.iterate(sliceno, 'NUM_LEGS', translator={'NUM_LEGS': mapper,}):
  ...

This will iterate over the NUM_LEGS column, and map numbers to strings according to the mapper dict. The translator is not limited to dictionaries, it could also be a callable, i.e. a function.

Filters are used to discard rows from the iterator on-the-fly. They work similarly to translators.

Urd: keeping track of things

We’ve already seen how the Accelerator keeps track of all jobs that have been built, keeping them ready to be reused when possible. While this saves time and ties linked computations together, there is another layer on top that pushes visibility and job reuse even further. This is what the Urd1 server is about.

Urd stores lists of jobs together with their dependencies in a log-file based database. Everything that takes place in a build script may be recorded to Urd. In order to do that, we need a name for the list to store the information in, and we also need a key, in most situations a date, to be able to look it up again.

We’ll use a simple example to explain the concepts. Consider the following build script that is used to import log files into a chain of datasets:

def urd(main):
  now = '2018-03-01'

  urd.begin('import', now)

  previous_job = urd.latest('import').joblist['csvimport'].jobid

  urd.build('csvimport',
    options=dict(filename='log_' + now + '.txt.gz', ...), 
    datasets=dict(previous=previous_job))
  ...

  urd.finish('import')

Everything that goes on between the begin and finish calls is recorded to the import list in Urd, and can be recalled by querying Urd with that particular date. Or, as in the case shown, we can ask Urd to fetch the latest entry in the list. In this example, we use this to create a chain of datasets. We look up the latest entry in the import list, fetch the csvimport jobid, and feed it as previous dataset to the current csvimport.

We can then use this import list for data processing:

def main(urd):
  urd.begin('processing')

  latest_import = urd.latest('import')

  timestamp = latest_import.timestamp
  jid_imp = latest_import.joblist['csvimport'].jobid

  urd.build('process', datasets=dict(source=jid_imp), ... )

  urd.finish('processing', timestamp)

The example above writes to the processing Urd list. It fetches the latest entry in the import list, from which it extracts the jobid to the csvimport job and the timestamp. This timestamp is then used as key when this session is written to the processing list.

Now we have everything bookkept nicely. We can query the processing list to find the latest (or any specific) processing session, and from there we can see which import session it corresponds to. From this entry, in turn, we can immediately find the csvimport and thus the filename of the file being imported. All connections between jobs and inputs are tracked with full visibility! The Accelerator itself makes sure no jobs are rebuilt unless data or code has changed. Taken together, this provides a powerful way of having the Accelerator organize all jobs and related files (including source code) for its users, removing the need for error prone manual administration.

Example urd scenario

Consider the example where we are doing data processing in, say, ten different languages. For each language, we have tens of thousands of files. By importing the files in dataset chains, and storing each import session to Urd with one list per language, we can easily look up any file in any language, and know exactly which data we are processing.

Performance Figures

The startup time for a new job is a fraction of a second. Below is an example of processing times for some different job types.

Preparing the data: import, type, and hash

The example data file is 1.1TB (gz-compressed 280GB), and has 6.3 × 109 rows and 14 columns. The Accelerator is running on a large machine with 72 cores and a fast disk.

operation MB/s Mrows/s
A. csvimport 182 1.0
B. typing 560 3.3
C. hashing (average) 230 1.3

The above values are measured when operating on the full data, six billion lines times 14 columns. The import job (A.) is importing a gz-compressed file. Interestingly, the import runs almost 30% faster than a plain (GNU-) zcat file.gz > /dev/null. On FreeBSD, zcat is faster. The typing job (B.) is typing to: 5 json-lists, 5 numbers, 2 dates, and 2 unicode columns. Each line is on average 172 bytes. The job is reading more than a half gigabyte per second, and simultaneously storing approximately the same amount, so disk bandwidth is above one gigabyte per second. Since hashing speed depends on the column that is hashed, the shown value (row C.) is an average of four hashing jobs, working on different columns.

Working with the data

We compute the operation ∑ (a × b × c), by having one method reading three columns, multiplying their values, and writing the result to a new column that is appended to the dataset. A second job then adds all values in this new column together.

operation Mrows/s
D. read three columns, write one column 77
E. sum one column of floats 1000

As we can see, multiplying three float64 together and writing back to disk is really fast—77 million rows per second. Summing the values together is even faster—above one billion values per second. It takes six seconds to sum all values in the Python language.

Conclusion

The Accelerator is a well-proven tool for fast data processing. On a single computer, processing speed of millions of rows per seconds is possible, and simple tasks may execute at 1000 million rows per second. In addition to fast processing, the Accelerator minimizes manual administration of source and data files, computations, and related results. It has been used successfully in several projects and is now available as open source.

Reference

1 Urd is one of the three Norns in Norse mythology. She represents the past.