DASK is a library (for python) which lets you distribute computation over a cluster. DASK_EC2 is another module (closely related) which allows you to use AWS EC2 framework for creating the cluster etc. Just a quick note: DASK is good if your problem is embarrassingly parallel. Examples I’ve come across regularly, include:
- Fitting multiple datasets (e.g. separate patients)
- Parameter-grid search
I’ve found that DASK_ec2 isn’t being maintained at the moment, so I’ve made a repo with some of the changes I’ve needed here. The changes I’ve incorporated:
1. Allowing the use of spot-instances (see https://github.com/dask/dask-ec2/pull/66)
2. Fixed a bug to allow the distributed computers to use 16.04 (see https://github.com/dask/dask-ec2/issues/98)
3. Version of anaconda being downloaded was out of date (see https://github.com/dask/dask-ec2/issues/38 and https://github.com/dask/dask-ec2/compare/master…lionfish0:master#diff-a7ee77124863ef31e39bc6f1673632c8)
How to install
Get AWS setup
From https://boto3.readthedocs.io/en/latest/guide/quickstart.html (Boto is the Amazon Web Services (AWS) SDK for Python)
sudo apt-get install awscli
pip install boto3
Visit AWS -> IAM -> Add user -> Security Credentials -> Create Access Key. Run aws configure and enter the ID, code, region. Notes, I use for region ‘eu-west-1’, outputformat is blank (leave as JSON).
Try this python code and see if it works.
import boto3 s3 = boto3.resource('s3') for b in s3.buckets.all(): print(b.name)
From http://distributed.readthedocs.io/en/latest/ec2.html, it says to install dask-ec2 with
pip install dask-ec2 (don’t do this!!!) instead now get from my repo with the above changes incorporated:
pip install git+https://github.com/lionfish0/dask-ec2.git
Sort out keys
Visit AWS->EC2->Key pairs->Create key pair. I called mine “research”. Save the keyfile in .ssh, chmod 600.
Select AMI (instance image we want to use)
Get the AMI we want to use (e.g. ubuntu 16.04). Check https://cloud-images.ubuntu.com/locator/ec2/ and search for e.g. 16.04 LTS eu-west-1 ebs.
To start up your cluster on EC2
We can start up the cluster with
dask-ec2 but it wants some parameters, including the keyname and keypair. I found I had to also specify the region-name, the ami and tags as the first two have wrong defaults and the tool seems to fail if tags isn’t set either. Also found using ubuntu 16.04 had a SSL wrong version number error which is fixed hopefully if you use my version of the dask-ec2 repo (see https://github.com/dask/dask-ec2/issues/38 ). count specifies the number of on-demand instances (has to be at least 1 at the moment). spot-count is the number of spot instances (combine with the spot-price, which I set to the price of the on-demand instances). The volume-size is the size in Gb of the instance hard disk, and the type is the ec2 instance type. The nprocs is the number of calculations the computer will be given to work with I think. As GPy does a good job at distributing over multiple cores, I just give each instance 2 problems at a time.
dask-ec2 up --keyname research --keypair .ssh/research.pem --region-name eu-west-1 --ami ami-c8b51fb1 --tags research:dp --count 1 --spot-count 5 --spot-price 0.796 --volume-size 10 --type c4.4xlarge --nprocs 2
Eventually after a long time, this will finish with:
Dask.Distributed Installation succeeded Addresses --------- Web Interface: http://184.108.40.206:8787/status TCP Interface: 220.127.116.11:8786 To connect from the cluster --------------------------- dask-ec2 ssh # ssh into head node ipython # start ipython shell from dask.distributed import Client, progress c = Client('127.0.0.1:8786') # Connect to scheduler running on the head node To connect locally ------------------ Note: this requires you to have identical environments on your local machine and cluster. ipython # start ipython shell from dask.distributed import Client, progress e = Client('18.104.22.168:8786') # Connect to scheduler running on the head node To destroy ---------- dask-ec2 destroy Installing Jupyter notebook on the head node DEBUG: Uploading file /tmp/tmp1GOH7d to /tmp/.__tmp_copy DEBUG: Running command sudo -S bash -c 'cp -rf /tmp/.__tmp_copy /srv/pillar/jupyter.sls' on '22.214.171.124' DEBUG: Running command sudo -S bash -c 'rm -rf /tmp/.__tmp_copy' on '126.96.36.199' +---------+----------------------+-----------------+ | Node ID | # Successful actions | # Failed action | +=========+======================+=================+ | node-0 | 17 | 0 | +---------+----------------------+-----------------+ Jupyter notebook available at http://188.8.131.52:8888/ Login with password: jupyter
Install libraries on cluster
Importantly the remote cluster’s environments have to match the local environment (the version of linux, the modules, the python version, etc all have to match). This is a bit awkward. Finding modules is a problem…I found these not to work out the box. Critically, it failed with “
distributed.utils - ERROR - No module named dask_searchcv.methods“. I found I had to intstall the module on each worker:
Either by hand:
local$ dask-ec2 ssh 1
dask1$ conda install dask-searchcv -c conda-forge -y
Or better is to write a python function to do this for us – I run this every time I startup a new cluster, to install all the stuff I know I need.
def install_libraries_on_workers(url): """Install libraries if necessary on workers etc. e.g. if already on server... install_libraries_on_workers('127.0.0.1:8786') """ from dask.distributed import Client client = Client(url) runlist = ['pip install -U pip','sudo apt install libgl1-mesa-glx -y','conda update scipy -y','pip install git+https://github.com/sods/paramz.git','pip install git+https://github.com/SheffieldML/GPy.git','pip install git+https://github.com/lionfish0/dp4gp.git','conda install dask-searchcv -c conda-forge -y', 'pip install git+https://github.com/lionfish0/dask_dp4gp.git', 'pip install numpy', 'conda remove argcomplete -y']#, 'conda install python=3.6 -y'] for item in runlist: print("Installing '%s' on workers..." % item) client.run(os.system,item) print("Installing '%s' on scheduler..." % item) client.run_on_scheduler(os.system,item) #os.system(item) #if you need to install it locally too
Here’s a toy example to demonstrate how to use DASK with GPy
import numpy as np import matplotlib.pyplot as plt %matplotlib inline import GPy from dask import compute, delayed from dask.distributed import Client #adding the delayed line means this won't run immediately when called. @delayed(pure=True) def predict(X,Y,Xtest): m = GPy.models.GPRegression(X,Y) m.optimize() predmean, predvar = m.predict(Xtest) return predmean[0,0] #return np.mean(Y) values = [np.NaN]*1000 for i in range(1000): X = np.arange(0,100)[:,None] Y = np.sin(X)+np.random.randn(X.shape,1)+X Xtest = X[-1:,:]+1 values[i] = predict(X,Y,Xtest) #this doesn't run straight away! client = Client(ip+':8786') #here is when we actually run the stuff, on the cloud. results = compute(*values, get=client.get) print(results)
On two 16-core computers on AWS, I found this sped up by 59% (130s down to 53s).
More examples etc is available at http://dask.pydata.org/en/latest/use-cases.html