# Mike's Page

### Informatics, Development, Cycling, Data, Travel ...

Air pollution coregionalised between the US embassy and Makerere campus

Next week I’ll be presenting at Manchester’s Advances in Data Science conference, on the air pollution project. I’ve written an extended abstract on the topic.

We use a custom-crafted kernel for coregionalising between multiple sensors, to allow us to make probabilistic predictions at the level of the high-quality reference sensor, across the whole city, using the low-quality noisy sensors. We estimate the coregionalision parameters using training data we’ve collect – which ideally should include close or colocated measurements from pairings of sensors.

In future we hope to:

1. Include the uncertainty in the coregionalisation (e.g. by integrating over the distribution of the coregionalisation parameters, e.g. using CCD.
2. Allow this coregionalisation to vary over time. This will require non-stationarity, and is probably best achieved using a more flexible, non-analytic solution. E.g. writing the model in STAN
3. .

4. Updating the model in real time. I think another advantage of using a STAN or similar framework would be the gradual inclusion of new MC steps incorporting new data, as we throw out old data, this allows the gradual change of coregionalisation to be incorporated.

## Update

Building flat coregionalisation kernel

We can’t just use the standard coregionalisation kernel, as we’re not just kronecker-product multiplying a coregionalisation matrix with a repeating covariance matrix. Instead we want to element-wise multiply a matrix that expresses the coregionalisation with another matrix that expresses the covariance due to space and time proximity (see above figure).

Here is the GPy kernel code to do this;

import GPy
import numpy as np
from GPy.kern import Kern
from GPy.core.parameterization import Param
#from paramz.transformations import Logexp
#import math
class FlatCoreg(Kern):
"""
"""

def __init__(self, input_dim, active_dims=0, rank=1, output_dim=None, name='flatcoreg'):
super(FlatCoreg, self).__init__(input_dim, active_dims, name)

assert isinstance(active_dims,int), "Can only use one dimension"

W = 0.5*np.random.randn(rank,output_dim)/np.sqrt(rank)
self.W = Param('W', W)
self.link_parameters(self.W) #this just takes a list of parameters we need to optimise.

if X2 is None:
X2 = X.copy()

dK_dW = np.zeros([self.W.shape[1],X.shape[0],X2.shape[0]])
for i,x in enumerate(X):
for j,x2 in enumerate(X2):
wi = int(x[0])
wj = int(x2[0])
dK_dW[wi,i,j] = self.W[0,wj]
dK_dW[wj,i,j] += self.W[0,wi]

def k_xx(X,X2,W,l_time=2.0,l_dist=0.1):
#k_time = np.exp(-(X[0]-X2[0])**2/(2*l_time))
#k_dist = np.exp(-(X[1]-X2[1])**2/(2*l_dist))
k_coreg = coregmat[int(X[2]),int(X2[2])]
return k_coreg #k_time * k_dist * k_coreg

def K(self, X, X2=None):
coregmat = np.array(self.W.T @ self.W)
if X2 is None:
X2 = X
K_xx = np.zeros([X.shape[0],X2.shape[0]])
for i,x in enumerate(X):
for j,x2 in enumerate(X2):
K_xx[i,j] = coregmat[int(x[0]),int(x2[0])]
return K_xx

def Kdiag(self, X):
return np.diag(self.K(X))

k = (GPy.kern.RBF(1,active_dims=[0],name='time')*GPy.kern.RBF(1,active_dims=[1],name='space'))*FlatCoreg(1,output_dim=3,active_dims=2,rank=1)
#k = FlatCoreg(1,output_dim=3,active_dims=2,rank=1)
#k.coregion.kappa.fix(0)


This allows us to make predictions over the whole space in the region of the high quality sensor, with automatic calibration via the W vector.

I’ve started having the same problem as in this issue. I think something else has been updated which has caused the new error. As it says on the dask-ec2 readme, dask-ec2’s project is now deprecated – and so I didn’t try fixing the new bug. I tried for a while using kubernetes (kops, terraform, etc), but it’s quite a pain to set up (not well documented yet maybe) and is serious overkill for what I want (and probably what a lot of people want…). So instead…

It needs a little bit more work but is nearly finished. I’ll hopefully have some time later in the year to get it to a more ‘release’ state, but feel free to use it.
 daskec2lite --help

 usage: daskec2lite [-h] [--pathtokeyfile [PATHTOKEYFILE]] [--keyname [KEYNAME]] [--username [USERNAME]] [--numinstances [NUM_INSTANCES]] [--instancetype [INSTANCE_TYPE]] [--imageid [IMAGEID]] [--spotprice [SPOTPRICE]] [--region [REGION_NAME]] [--wpi [WORKERS_PER_INSTANCE]] [--sgid [SGID]] [--destroy] Create an EC2 spot-price cluster, populate with a dask scheduler and workers. Example: daskec2lite --pathtokeyfile '/home/mike/.ssh/research.pem' --keyname 'research' --username 'mike' --imageid ami-19a58760 --sgid sg-9146afe9 

dask1$conda install dask-searchcv -c conda-forge -y Or better is to write a python function to do this for us – I run this every time I startup a new cluster, to install all the stuff I know I need. def install_libraries_on_workers(url): """Install libraries if necessary on workers etc. e.g. if already on server... install_libraries_on_workers('127.0.0.1:8786') """ from dask.distributed import Client client = Client(url) runlist = ['pip install -U pip','sudo apt install libgl1-mesa-glx -y','conda update scipy -y','pip install git+https://github.com/sods/paramz.git','pip install git+https://github.com/SheffieldML/GPy.git','pip install git+https://github.com/lionfish0/dp4gp.git','conda install dask-searchcv -c conda-forge -y', 'pip install git+https://github.com/lionfish0/dask_dp4gp.git', 'pip install numpy', 'conda remove argcomplete -y']#, 'conda install python=3.6 -y'] for item in runlist: print("Installing '%s' on workers..." % item) client.run(os.system,item) print("Installing '%s' on scheduler..." % item) client.run_on_scheduler(os.system,item) #os.system(item) #if you need to install it locally too  ## Example Here’s a toy example to demonstrate how to use DASK with GPy import numpy as np import matplotlib.pyplot as plt %matplotlib inline import GPy from dask import compute, delayed from dask.distributed import Client #adding the delayed line means this won't run immediately when called. @delayed(pure=True) def predict(X,Y,Xtest): m = GPy.models.GPRegression(X,Y) m.optimize() predmean, predvar = m.predict(Xtest) return predmean[0,0] #return np.mean(Y) values = [np.NaN]*1000 for i in range(1000): X = np.arange(0,100)[:,None] Y = np.sin(X)+np.random.randn(X.shape[0],1)+X Xtest = X[-1:,:]+1 values[i] = predict(X,Y,Xtest) #this doesn't run straight away! client = Client(ip+':8786') #here is when we actually run the stuff, on the cloud. results = compute(*values, get=client.get) print(results)  On two 16-core computers on AWS, I found this sped up by 59% (130s down to 53s). More examples etc is available at http://dask.pydata.org/en/latest/use-cases.html ### Update If you did this a while ago, dask and things can get out of date on your local machine. It’s a pain trying to keep it all in sync. One handy command; conda install -c conda-forge distributed E.g. mike@atlas:~$ conda install -c conda-forge distributed Fetching package metadata ............. Solving package specifications: .

 Package plan for installation in environment /home/mike/anaconda3: The following packages will be UPDATED: dask: 0.15.4-py36h31fc154_0 --> 0.16.1-py_0 conda-forge dask-core: 0.15.4-py36h7045e13_0 --> 0.16.1-py_0 conda-forge distributed: 1.19.1-py36h25f3894_0 --> 1.20.2-py36_0 conda-forge The following packages will be SUPERSEDED by a higher-priority channel: conda-env: 2.6.0-h36134e3_1 --> 2.6.0-0 conda-forge Proceed ([y]/n)? y 

dask-core-0.16 100% |################################| Time: 0:00:01 269.93 kB/s distributed-1. 100% |################################| Time: 0:00:01 597.96 kB/s dask-0.16.1-py 100% |################################| Time: 0:00:00 1.16 MB/s

Note for self:

Just type

debug

in the next cell. (u = up, c = continue)

Use:

import pdb; pdb.set_trace()

A linear model with GPy.

Background: I’m working on a project aiming to extrapolate dialysis patient results over a 100-hour window (I’ll write future blog posts on this!). I’m working with James Fotheringham (Consultant Nephrologist) who brilliantly bridges the gap between clinic and research – allowing ivory-tower researchers (me) to get our expertise applied in the real world to useful applications.

Part of this project is the prediction of the patient’s weight. We’ll consider a simple model.

When a patient has haemodialysis, they will have fluid removed. If this doesn’t happen frequently or successfully enough the patient can experience *fluid overload*. Each dialysis session (in this simplified model) brings the patient’s weight back down to their dry-weight. Then this weight will increase (roughly linearly) as time goes by until their next dialysis session. I model their weight $$w(t,s)$$ with a slow-moving function of time-since-start-of-trial, $$f(t)$$, added to a linear function of time-since-last-dialysis-session, $$g(s)$$:

$$w(t,s) = f(t) + g(s)$$

For now we’ll ignore f, and just consider g. This is a simple linear function $$g(s) = s*w$$. The gradient $$w$$ describes how much their weight increases for each day it’s been since dialysis.

We could estimate this from previous data from that patient (e.g. by fitting a Gaussian Process model to the data). But if the patient is new, we might want to provide a prior distribution on this parameter. We could get this by considering what gradient other similar patients have (e.g. those with the same age, vintage, gender, weight, comorbidity, etc might have a similar gradient).

Side note: Colleagues have recommended that we combine all the patients into one large coregionalised model. This has several problems: excessive computational demands, privacy (having to share the data), interpretability (the gradient might be a useful feature), etc.

Other side note: I plan on fitting another model, of gradients wrt patient demographics etc to make predictions for the priors of the patients.

So our model is: $$g(s) = \phi(s)w$$ where we have a prior on $$w \sim N(\mu_p, \sigma_p^2)$$.

If we find the mean and covariance of $$g(s)$$:

$$E[g(s)] = \phi(s) E[w] = \phi(s) \mu_p$$
$$E[g(s)g(s’)] = \phi(s) E[ww^\top] \phi(s’) = \phi(s) (\mu_p \mu_p^\top + \Sigma_p) \phi(s’)$$

It seems a simple enough problem – but it does require than the prior mean is no longer zero (or flat wrt the inputs). I’m not sure how to do that in GPy.

Update: There is no way of doing this in GPy by default. So I solved this by adding a single point, with a crafted noise, to produce or ‘induce’ the prior we want.

I’ve written this into a simple python function (sorry it’s not on pip, but it seems a little too specialised to pollute python module namespace with). Download from <a href=”https://github.com/lionfish0/experimentation/blob/master/linear_kernel_prior.py”>github</a>. The docstring should explain how to use it (with an example).

My talk at DSA2017 and the website (with realtime predictions of Kampala’s air quality)