DASK

DASK is a library (for python) which lets you distribute computation over a cluster. DASK_EC2 is another module (closely related) which allows you to use AWS EC2 framework for creating the cluster etc. Just a quick note: DASK is good if your problem is embarrassingly parallel. Examples I’ve come across regularly, include:

  • Cross-validation
  • Fitting multiple datasets (e.g. separate patients)
  • Parameter-grid search

I’ve found that DASK_ec2 isn’t being maintained at the moment, so I’ve made a repo with some of the changes I’ve needed here. The changes I’ve incorporated:

1. Allowing the use of spot-instances (see https://github.com/dask/dask-ec2/pull/66)
2. Fixed a bug to allow the distributed computers to use 16.04 (see https://github.com/dask/dask-ec2/issues/98)
3. Version of anaconda being downloaded was out of date (see https://github.com/dask/dask-ec2/issues/38 and https://github.com/dask/dask-ec2/compare/master…lionfish0:master#diff-a7ee77124863ef31e39bc6f1673632c8)

How to install

Get AWS setup

From https://boto3.readthedocs.io/en/latest/guide/quickstart.html (Boto is the Amazon Web Services (AWS) SDK for Python)

sudo apt-get install awscli
pip install boto3

Visit AWS -> IAM -> Add user -> Security Credentials -> Create Access Key. Run aws configure and enter the ID, code, region. Notes, I use for region ‘eu-west-1’, outputformat is blank (leave as JSON).

Test

Try this python code and see if it works.

import boto3
s3 = boto3.resource('s3')
for b in s3.buckets.all():
    print(b.name)

From http://distributed.readthedocs.io/en/latest/ec2.html, it says to install dask-ec2 with pip install dask-ec2 (don’t do this!!!) instead now get from my repo with the above changes incorporated:

pip install git+https://github.com/lionfish0/dask-ec2.git

Sort out keys

Visit AWS->EC2->Key pairs->Create key pair. I called mine “research”. Save the keyfile in .ssh, chmod 600.

Select AMI (instance image we want to use)

Get the AMI we want to use (e.g. ubuntu 16.04). Check https://cloud-images.ubuntu.com/locator/ec2/ and search for e.g. 16.04 LTS eu-west-1 ebs.

To start up your cluster on EC2

We can start up the cluster with dask-ec2 but it wants some parameters, including the keyname and keypair. I found I had to also specify the region-name, the ami and tags as the first two have wrong defaults and the tool seems to fail if tags isn’t set either. Also found using ubuntu 16.04 had a SSL wrong version number error which is fixed hopefully if you use my version of the dask-ec2 repo (see https://github.com/dask/dask-ec2/issues/38 ). count specifies the number of on-demand instances (has to be at least 1 at the moment). spot-count is the number of spot instances (combine with the spot-price, which I set to the price of the on-demand instances). The volume-size is the size in Gb of the instance hard disk, and the type is the ec2 instance type. The nprocs is the number of calculations the computer will be given to work with I think. As GPy does a good job at distributing over multiple cores, I just give each instance 2 problems at a time.

dask-ec2 up --keyname research --keypair .ssh/research.pem --region-name eu-west-1 --ami ami-c8b51fb1 --tags research:dp --count 1 --spot-count 5 --spot-price 0.796 --volume-size 10 --type c4.4xlarge --nprocs 2

Eventually after a long time, this will finish with:

Dask.Distributed Installation succeeded
Addresses
---------
Web Interface:    http://54.246.253.159:8787/status
TCP Interface:           54.246.253.159:8786
 
To connect from the cluster
---------------------------
dask-ec2 ssh  # ssh into head node
ipython  # start ipython shell

from dask.distributed import Client, progress
c = Client('127.0.0.1:8786')  # Connect to scheduler running on the head node
 
To connect locally
------------------
Note: this requires you to have identical environments on your local machine and cluster.
 
ipython  # start ipython shell
 
from dask.distributed import Client, progress
e = Client('54.246.253.159:8786')  # Connect to scheduler running on the head node
 
To destroy
----------
 
dask-ec2 destroy
Installing Jupyter notebook on the head node
DEBUG: Uploading file /tmp/tmp1GOH7d to /tmp/.__tmp_copy
DEBUG: Running command sudo -S bash -c 'cp -rf /tmp/.__tmp_copy /srv/pillar/jupyter.sls' on '54.246.253.159'
DEBUG: Running command sudo -S bash -c 'rm -rf /tmp/.__tmp_copy' on '54.246.253.159'
+---------+----------------------+-----------------+
| Node ID | # Successful actions | # Failed action |
+=========+======================+=================+
| node-0  | 17                   | 0               |
+---------+----------------------+-----------------+
Jupyter notebook available at http://54.246.253.159:8888/ 
Login with password: jupyter

Install libraries on cluster

Importantly the remote cluster’s environments have to match the local environment (the version of linux, the modules, the python version, etc all have to match). This is a bit awkward. Finding modules is a problem…I found these not to work out the box. Critically, it failed with “distributed.utils - ERROR - No module named dask_searchcv.methods“. I found I had to intstall the module on each worker:

Either by hand:

local$ dask-ec2 ssh 1
dask1$ conda install dask-searchcv -c conda-forge -y

Or better is to write a python function to do this for us – I run this every time I startup a new cluster, to install all the stuff I know I need.

def install_libraries_on_workers(url):
    """Install libraries if necessary on workers etc.
    
    e.g. if already on server...
    install_libraries_on_workers('127.0.0.1:8786')
    """
    from dask.distributed import Client
    client = Client(url)

    runlist = ['pip install -U pip','sudo apt install libgl1-mesa-glx -y','conda update scipy -y','pip install git+https://github.com/sods/paramz.git','pip install git+https://github.com/SheffieldML/GPy.git','pip install git+https://github.com/lionfish0/dp4gp.git','conda install dask-searchcv -c conda-forge -y', 'pip install git+https://github.com/lionfish0/dask_dp4gp.git', 'pip install numpy', 'conda remove argcomplete -y']#, 'conda install python=3.6 -y']

    for item in runlist:
        print("Installing '%s' on workers..." % item)
        client.run(os.system,item)
        print("Installing '%s' on scheduler..." % item)
        client.run_on_scheduler(os.system,item)    
        #os.system(item) #if you need to install it locally too

Example

Here’s a toy example to demonstrate how to use DASK with GPy

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import GPy
from dask import compute, delayed
from dask.distributed import Client

#adding the delayed line means this won't run immediately when called.
@delayed(pure=True)
def predict(X,Y,Xtest):
    m = GPy.models.GPRegression(X,Y)
    m.optimize()
    predmean, predvar = m.predict(Xtest)
    return predmean[0,0]
    #return np.mean(Y)

values = [np.NaN]*1000
for i in range(1000):
    X = np.arange(0,100)[:,None]
    Y = np.sin(X)+np.random.randn(X.shape[0],1)+X
    Xtest = X[-1:,:]+1
    values[i] = predict(X,Y,Xtest) #this doesn't run straight away!
    
client = Client(ip+':8786')

#here is when we actually run the stuff, on the cloud.
results = compute(*values, get=client.get)

print(results)

On two 16-core computers on AWS, I found this sped up by 59% (130s down to 53s).

More examples etc is available at http://dask.pydata.org/en/latest/use-cases.html