Update 3

kubernetes is too complicated. Instead I’ve written a replacement for dask-ec2, I’ve called daskec2lite. See new post.

Update 2

DASK EC2 has been deprecated. It is now recommended people use kubernetes. I’ve not experimented with this yet.

Update 1

pip was upgraded to 9.0.2 a few days ago, which has caused problems. Basically the error message people will get when using dask-ec2 will be of the form "pip.installed' is not available".

I fixed this in dask by making two changes:

  • To line 167 in salt.py, I specified that I wanted the older version of pip installed (added 9.0.1 to the string): pip==9.0.1.
  • Line 48 in formulas/salt/dask/distributed/init.sls I removed a reference (I think I added originally!) to python36-pip and just used python3-pip.

This version of dask-ec2 is on github.

DASK

DASK is a library (for python) which lets you distribute computation over a cluster. DASK_EC2 is another module (closely related) which allows you to use AWS EC2 framework for creating the cluster etc. Just a quick note: DASK is good if your problem is embarrassingly parallel. Examples I’ve come across regularly, include:

  • Cross-validation
  • Fitting multiple datasets (e.g. separate patients)
  • Parameter-grid search

I’ve found that DASK_ec2 isn’t being maintained at the moment, so I’ve made a repo with some of the changes I’ve needed here. The changes I’ve incorporated:

1. Allowing the use of spot-instances (see https://github.com/dask/dask-ec2/pull/66)
2. Fixed a bug to allow the distributed computers to use 16.04 (see https://github.com/dask/dask-ec2/issues/98)
3. Version of anaconda being downloaded was out of date (see https://github.com/dask/dask-ec2/issues/38 and https://github.com/dask/dask-ec2/compare/master…lionfish0:master#diff-a7ee77124863ef31e39bc6f1673632c8)

How to install

Get AWS setup

From https://boto3.readthedocs.io/en/latest/guide/quickstart.html (Boto is the Amazon Web Services (AWS) SDK for Python)

sudo apt-get install awscli
pip install boto3

Visit AWS -> IAM -> Add user -> Security Credentials -> Create Access Key. Run aws configure and enter the ID, code, region. Notes, I use for region ‘eu-west-1’, outputformat is blank (leave as JSON).

Test

Try this python code and see if it works.

import boto3
s3 = boto3.resource('s3')
for b in s3.buckets.all():
    print(b.name)

From http://distributed.readthedocs.io/en/latest/ec2.html, it says to install dask-ec2 with pip install dask-ec2 (don’t do this!!!) instead now get from my repo with the above changes incorporated:

pip install git+https://github.com/lionfish0/dask-ec2.git

Sort out keys

Visit AWS->EC2->Key pairs->Create key pair. I called mine “research”. Save the keyfile in .ssh, chmod 600.

Select AMI (instance image we want to use)

Get the AMI we want to use (e.g. ubuntu 16.04). Check https://cloud-images.ubuntu.com/locator/ec2/ and search for e.g. 16.04 LTS eu-west-1 ebs.

Edit: It needs to be an hvm, ebs instance. So I searched for: “eu-west-1 16.04 ebs hvm”.

To start up your cluster on EC2

We can start up the cluster with dask-ec2 but it wants some parameters, including the keyname and keypair. I found I had to also specify the region-name, the ami and tags as the first two have wrong defaults and the tool seems to fail if tags isn’t set either. Also found using ubuntu 16.04 had a SSL wrong version number error which is fixed hopefully if you use my version of the dask-ec2 repo (see https://github.com/dask/dask-ec2/issues/38 ). count specifies the number of on-demand instances (has to be at least 1 at the moment). spot-count is the number of spot instances (combine with the spot-price, which I set to the price of the on-demand instances). The volume-size is the size in Gb of the instance hard disk, and the type is the ec2 instance type. The nprocs is the number of calculations the computer will be given to work with I think. As GPy does a good job at distributing over multiple cores, I just give each instance 2 problems at a time.

dask-ec2 up --keyname research --keypair .ssh/research.pem --region-name eu-west-1 --ami ami-c8b51fb1 --tags research:dp --count 1 --spot-count 5 --spot-price 0.796 --volume-size 10 --type c4.4xlarge --nprocs 2

Eventually after a long time, this will finish with:

Dask.Distributed Installation succeeded
Addresses
---------
Web Interface:    http://54.246.253.159:8787/status
TCP Interface:           54.246.253.159:8786
 
To connect from the cluster
---------------------------
dask-ec2 ssh  # ssh into head node
ipython  # start ipython shell

from dask.distributed import Client, progress
c = Client('127.0.0.1:8786')  # Connect to scheduler running on the head node
 
To connect locally
------------------
Note: this requires you to have identical environments on your local machine and cluster.
 
ipython  # start ipython shell
 
from dask.distributed import Client, progress
e = Client('54.246.253.159:8786')  # Connect to scheduler running on the head node
 
To destroy
----------
 
dask-ec2 destroy
Installing Jupyter notebook on the head node
DEBUG: Uploading file /tmp/tmp1GOH7d to /tmp/.__tmp_copy
DEBUG: Running command sudo -S bash -c 'cp -rf /tmp/.__tmp_copy /srv/pillar/jupyter.sls' on '54.246.253.159'
DEBUG: Running command sudo -S bash -c 'rm -rf /tmp/.__tmp_copy' on '54.246.253.159'
+---------+----------------------+-----------------+
| Node ID | # Successful actions | # Failed action |
+=========+======================+=================+
| node-0  | 17                   | 0               |
+---------+----------------------+-----------------+
Jupyter notebook available at http://54.246.253.159:8888/ 
Login with password: jupyter

Install libraries on cluster

Importantly the remote cluster’s environments have to match the local environment (the version of linux, the modules, the python version, etc all have to match). This is a bit awkward. Finding modules is a problem…I found these not to work out the box. Critically, it failed with “distributed.utils - ERROR - No module named dask_searchcv.methods“. I found I had to intstall the module on each worker:

Either by hand:

local$ dask-ec2 ssh 1
dask1$ conda install dask-searchcv -c conda-forge -y

Or better is to write a python function to do this for us – I run this every time I startup a new cluster, to install all the stuff I know I need.

def install_libraries_on_workers(url):
    """Install libraries if necessary on workers etc.
    
    e.g. if already on server...
    install_libraries_on_workers('127.0.0.1:8786')
    """
    from dask.distributed import Client
    client = Client(url)

    runlist = ['pip install -U pip','sudo apt install libgl1-mesa-glx -y','conda update scipy -y','pip install git+https://github.com/sods/paramz.git','pip install git+https://github.com/SheffieldML/GPy.git','pip install git+https://github.com/lionfish0/dp4gp.git','conda install dask-searchcv -c conda-forge -y', 'pip install git+https://github.com/lionfish0/dask_dp4gp.git', 'pip install numpy', 'conda remove argcomplete -y']#, 'conda install python=3.6 -y']

    for item in runlist:
        print("Installing '%s' on workers..." % item)
        client.run(os.system,item)
        print("Installing '%s' on scheduler..." % item)
        client.run_on_scheduler(os.system,item)    
        #os.system(item) #if you need to install it locally too

Example

Here’s a toy example to demonstrate how to use DASK with GPy

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import GPy
from dask import compute, delayed
from dask.distributed import Client

#adding the delayed line means this won't run immediately when called.
@delayed(pure=True)
def predict(X,Y,Xtest):
    m = GPy.models.GPRegression(X,Y)
    m.optimize()
    predmean, predvar = m.predict(Xtest)
    return predmean[0,0]
    #return np.mean(Y)

values = [np.NaN]*1000
for i in range(1000):
    X = np.arange(0,100)[:,None]
    Y = np.sin(X)+np.random.randn(X.shape[0],1)+X
    Xtest = X[-1:,:]+1
    values[i] = predict(X,Y,Xtest) #this doesn't run straight away!
    
client = Client(ip+':8786')

#here is when we actually run the stuff, on the cloud.
results = compute(*values, get=client.get)

print(results)

On two 16-core computers on AWS, I found this sped up by 59% (130s down to 53s).

More examples etc is available at http://dask.pydata.org/en/latest/use-cases.html

Update

If you did this a while ago, dask and things can get out of date on your local machine. It’s a pain trying to keep it all in sync. One handy command;

conda install -c conda-forge distributed

E.g.

mike@atlas:~$ conda install -c conda-forge distributed
Fetching package metadata .............
Solving package specifications: .

Package plan for installation in environment /home/mike/anaconda3:

The following packages will be UPDATED:

dask: 0.15.4-py36h31fc154_0 --> 0.16.1-py_0 conda-forge
dask-core: 0.15.4-py36h7045e13_0 --> 0.16.1-py_0 conda-forge
distributed: 1.19.1-py36h25f3894_0 --> 1.20.2-py36_0 conda-forge

The following packages will be SUPERSEDED by a higher-priority channel:

conda-env: 2.6.0-h36134e3_1 --> 2.6.0-0 conda-forge

Proceed ([y]/n)? y

dask-core-0.16 100% |################################| Time: 0:00:01 269.93 kB/s
distributed-1. 100% |################################| Time: 0:00:01 597.96 kB/s
dask-0.16.1-py 100% |################################| Time: 0:00:00 1.16 MB/s