PyKrylov: Accelerating Machine Learning Research at eBay

A recent eBay Tech Blog article1 presented the Unified AI platform called Krylov. In this article, we show how Krylov users interact with the platform to build and manage powerful workflows in a pythonic and efficient way.

The experience while accessing the AI platform and running machine learning (ML) training code on the platform must be smooth and easy for the researchers. Migrating any ML code from a local environment to the platform should not require any refactoring of the code at all. Infrastructure configuration overhead should be minimal. Our mission while developing PyKrylov was to abstract the ML logic from the infrastructure and Krylov core components (Figure 1) as much as possible in order to achieve the best experience for the platform users.

Screen Shot 2020 01 10 at 2.42.07 PM

Figure 1. Simple layered representation of Krylov components.

PyKrylov is the pythonic interface to Krylov that is used by researchers and engineers company wide to access eBay’s AI platform. PyKrylov was built by researchers for researchers, and has increased the productivity of researchers wanting to use  Krylov’s powerful compute resources. Onboarding new and existing code to the platform is as easy as writing a few lines of additional code to the ML logic without changing the existing implementation. The overhead that comes with PyKrylov is minimal, as shown in the below Hello World! example. The user can develop and start the code in her local environment, but the hello_world function will be executed on the Krylov platform and not in users’ local environments.

    def hello_world():
        print('Hello World!')
    if __name__ == '__main__':
        import pykrylov
        session = pykrylov.Session()
        task = pykrylov.Task(hello_world)
        run_id = session.submit(task)
PyKrylov is ML framework-agnostic and works with any machine learning library that is available on the market, including but not limited to PyTorch and Tensorflow. It is even possible to run tasks on Hadoop and Spark. The user can configure each task in different ways, such as specifying a custom Docker image, specifying compute resources, execution parameters, etc. In the below example, the task is configured to run on a V100 GPU resource using a PyTorch docker image. The users do the configuration via simple Python code, and they don’t have to deal with additional JSON or YAML files.
    task.run_on_gpu(model='v100')
    task.add_execution_parameter('docker', 'pytorch_image:latest')
Workflows

Model training is usually a multi-step workflow that is represented using a collection of tasks and dependencies specified as a directed acyclic graph (DAG). Creating workflows in PyKrylov can be achieved in a very natural way. Figure 2 shows a simple sequential workflow where the execution starts with data_prep and ends with the finish task. 

Screen Shot 2019 12 18 at 11.48.23 PM

Figure 2. A sequential workflow representing a simple ML pipeline.

In PyKrylov, the above workflow can be simply created using an OrderedDict class that comes with Python. In the below code fragment, this time the Session.submit() function submits the workflow to the AI platform instead of submitting a single task.

    def data_prep():
        print('do some data prepping')

    def train():
        print('train a cool model')

    def test():
        print('test your model')

    def finish():
        print('do some closing work')

    task_data = pykrylov.Task(data_prep)
    task_train = pykrylov.Task(train)
    task_test = pykrylov.Task(test)
    task_finish = pykrylov.Task(finish)

    seq_workflow = OrderedDict({
        task_data: [task_train],
        task_train: [task_test],
        task_test: [task_finish]
    })

    run_id = session.submit(seq_workflow)

It is also possible to submit tasks that are implemented in a programming language other than Python. Through a bash script, the ShellTask class in PyKrylov enables the user to run code in any programming language preferred by the user.

Additionally, you can convert the sequential workflow into a parallel workflow using the parallelize() function that comes with PyKrylov. 

    parallel_wf = pykrylov.parallelize(
                      workflow=seq_workflow,
                      start=train,
                      end=test,
                      parameter='lr',
                      value_list=[0.1, 0.3, 0.5]
    )

The DAG representation of parallel_wf generated after the above code snippet is shown in Figure 3. The workflow starts with the data_prep task and after the completion of the task, three parallel flows are started with the train function. The finish function is executed only after all three instances of the test function are completed.

Screen Shot 2019 12 21 at 4.42.11 PM

Figure 3. A simple parallel workflow for hyperparameter tuning.

It is also possible to define parallel workflows from scratch with OrderedDict definitions. However, PyKrylov users prefer to use the workflow modification functions to create parallel workflows. Bigger workflows can be created by chaining the parallelize() function for every hyperparameter (e.g. batch size and dimension). Another way of easily creating hyperparameter tuning workflows is to use the grid_search(), random_search() and parameter_grid() functions that are implemented in PyKrylov similar to the scikit learn package.

    parallel_wf = pykrylov.grid_search(
                      workflow=seq_workflow,
                      parameters_list = pykrylov.parameter_grid({
                          'lr' = [0.1, 0.3, 0.5],
                          'dim' = [100, 200, 300],
                      }),
                      start=train,
                      end=test

    ) 

The final workflow looks like the DAG depicted in Figure 4.

Screen Shot 2019 12 21 at 5.10.23 PM

Figure 4. A complex parallel workflow for hyperparameter tuning.

Manage Workflow Status

In PyKrylov, tracking and managing workflow status after submission are straightforward. Session.job_show() shows the status of each task and the overall status of the run. Session.job_pause(), Session.job_stop(), and Session.job_resume() allow the users to pause, stop, or resume the runs. When a task is pending, Session.job_info() is very useful to peek at what is going on, e.g. if it is waiting for resources.

    session = pykrylov.Session()
response = session.job_show(run_id)
session.job_stop(run_id)
session.job_resume(run_id)

Distributed Training

Distributed Training leverages multiple machines to reduce training time. Krylov supports popular frameworks like TensorFlow, PyTorch, Keras, or Horovod, which support distributed training natively. Krylov provides stable IPs for pods in a distributed training job, and if a pod goes down during training, Krylov brings it back and provides the same IP so that the training can resume.

The pykrylov.distributed package allows users to launch distributed training workflows on Krylov with their distributed training code in the framework they like. The experience is similar to launching non-distributed training workflows, but PyKrylov automatically generates the configuration files needed for parallelism and services which come with the stable pod IPs. The DistributedTask class enables users to run distributed training implemented in Python, and the DistShellTask class enables users to run distributed training implemented in other languages, as long as it can be started in a shell. Below we show two sample code snippets, one submitting a DT run from Python implemented function mnist_train, and the latter creating a DT run from shell scripts run_chief.sh and worker.sh

    from mnist import train as mnist_train

    parallelism = 2
    task = pykrylov.distributed.DistributedTask(mnist_train,
        name='mnist_task_name',
        parallelism=parallelism)
    task.add_service(name=master_service_name, port=2020)
    task.run_on_gpu(quantity=1, model='m40')

    session = pykrylov.Session()
    run_id = session.submit(task)

 


    from collections import OrderedDict
    chief_task = pykrylov.distributed.DistShellTask('run_chief.sh',
        name='chief_task_name',
        parallelism=1,
        service_name='chiefSVC',
        service_port=22)  # Another way to specify service

    chief_task.run_on_gpu(quantity=1, model='p100')

    worker_task = pykrylov.distributed.DistShellTask('worker.sh',
        name='worker_task_name',
        parallelism=3,
        service_name='workerSVC',
        service_port=22)
    worker_task.run_on_gpu(quantity=1, model='p100')

    session = pykrylov.Session()
    workflow = OrderedDict({chief_task:[], worker_task:[]})
    run_id = session.submit(workflow)

Experiment Management System (EMS)

The search for the best model includes multiple iterations of hyperparameter tuning and running multiple experiments in parallel, and comparing the results obtained in each of them. Before the Experiment Management System (EMS), Krylov users had to do manual bookkeeping of the hyperparameters, workflow information, and other metadata related to the training. EMS provides the ability to track the experiments, manage logs, and manage generated models — regardless of whether the model will be picked for production or not — and visualize training status and logs on Krylov dashboard. Moreover, users can record and visualize computed metrics such as loss and precision values with timestamps.

pykrylov.ems provides a simple pythonic way to create and update experiments, and associate metadata, logs, models, metrics or other files users generate as assets with the experiments.

    config = {
        'lr': 0.0001,
        'model': 'CNN',
        'dataset': 'mnist',
        'epochs': 100
    }

    def train(config, model, optimizer, mnist_dataset):
        exp_id = pykrylov.ems.create_experiment('my_project',
            'my_experiment', configurations=config)
        step = 0
        for epoch in range(config['epochs']):
            for batch in mnist_dataset.train:
                output = model(batch.data)
                loss = calc_loss(output, batch.target)
                pykrylov.ems.write_metric(exp_id, name='loss', value=loss, dimension={'step': step})
                loss.backward()
                optimizer.step()
                step += 1

        precision = calc_precision(mnist_dataset.dev)
        pykrylov.ems.write_metric(exp_id, name='dev_precision', value=precision)

Model Management System (MMS)

Trained models need to be accessible for inference in a production environment. Versioning and tagging the models, as well as recording the metadata associated with the model (e.g. accuracy, training dataset, hyperparameters) is necessary. Without a Model Management System (MMS), this task can become daunting for data scientists, as it requires manual and complicated solutions. For this purpose, the Krylov MMS system is developed to provide a centralized solution to store models, where versioning models and bookkeeping metadata are supported and are seamlessly integrated with training and inferencing. With the pykrylov.mms module in PyKrylov, data scientists can push models to MMS during training at ease. The pykrylov.mms module can also be used locally to upload and download models to/from MMS. The module also provides model discoverability capability to users.

    revision = pykrylov.mms.create_model('my_project',
        'my_model_name', model_file_list, 'my_tag')
    print(pykrylov.mms.show_model('my_project', 'my_model_name', 'my_tag', revision=revision)
    pykrylov.mms.download_revision('my_project', 'my_model_name', 'my_tag','save_to_dir', latest=True)

Conclusions

We have presented PyKrylov and shown how it accelerates machine learning research at eBay. Submitting ML tasks is simplified and configuration overhead is reduced. The user can onboard her code to the platform in a few lines of Python code. In our journey to democratize machine learning, this is only half of the story. Next step for us is to provide researchers the necessary tools for specific domains like NLP and CV. We will provide more details about this in another blog article.


 

1 eBay’s Transformation to a Modern AI Platform