Multi-Model Prediction

elm’s predict_many predicts for each estimator in a trained ensemble for one or more samples. predict_many takes some of the same data source keyword arguments that fit_ea and fit_ensemble use. See also Data Sources for a Pipeline - it discusses using a single sample by giving the keyword arguments X or giving a sampler and args_list (list of unpackable args to the sampler callable). The same logic applies for predict_many.

predict_many has a feature to_cube argument that is useful in prediction for spatial data. to_cube=True (True by default) means to convert the 1-D numpy array of predictions from the estimator of a Pipeline instance to a 2-D raster with the coordinates of the input data before the input data were flattened for training. This makes it easy to make xarray-pcolormesh plots of predictions in spatial coordinates that are derived from models trained on spatial satellite and weather data.

Example - SGDClassifier

The following example shows fitting a stochastic gradient descent classifier in ensemble with partial_fit, varying the alpha and penalty parameters to sklearn.linear_model.SGDClassifier and finally predicting from the best models of the ensemble over several input samples.

Import from elm and sklearn

This is a common set of import statements when working with elm

from collections import OrderedDict
from elm.pipeline import Pipeline, steps
from earthio import *
from sklearn.datasets import make_blobs
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score
import numpy as np
import xarray as xr

Define model selection

We can define a callable with a signature like model_selection below to control which models are passed from generation to generation in an ensemble. This function just uses best_idxes (Pareto sorted model fitness from the accuracy_score):

def model_selection(models, best_idxes=None, **kwargs):
    top_n = kwargs['top_n']
    return [models[idx] for idx in best_idxes[:top_n]]

See also model_selection in Controlling Ensemble Initialization.

Define initial ensemble

To vary the parameters of the initial ensemble of Pipeline instances, provide an ensemble_init_func . pipe.new_with_params is used here to create a variety of Pipeline objects that have different SGDClassifier alpha and penalty parameters.

def ensemble_init_func(pipe, **kwargs):
    models = []
    for penalty in ('l1', 'l2'):
        for alpha in (0.0001, 0.001, 0.01):
            new_pipe = pipe.new_with_params(sgd__penalty=penalty, sgd__alpha=alpha)
    return models

See also ensemble_init_func in Controlling Ensemble Initialization.

Control partial_fit and ensemble

The following dict are keywords to pass to fit_ensemble, including setting the number of generations ngen, using partial_fit twice per fitting of each model, and retaining finally the 2 best models (saved_ensemble_size). Note also that partial_fit requires giving the keyword argument classes, a sequence of all known classes, so this is passed via method_kwargs:

ensemble_kwargs = {
    'model_selection': model_selection,
    'model_selection_kwargs': {
        'top_n': 2,
    'ensemble_init_func': ensemble_init_func,
    'ngen': 3,
    'partial_fit_batches': 2,
    'saved_ensemble_size': 2,
    'method_kwargs': {'classes': np.arange(5)},
    'models_share_sample': True,

See also ensemble_kwargs in Controlling Ensemble Initialization.

Define a sampler

The following lines of code use the synthetic data helper make_blobs from sklearn.datasets to create an ElmStore with 5 bands (each band is a DataArray )

rand_X_y = lambda n_samples: make_blobs(centers=[[1,2,3,4,5], [2,3,6,8,9], [3,4,5,10,12]], n_samples=n_samples)
def sampler_train(width, height, **kwargs):
    X, y = rand_X_y(width * height)
    bands = ['band_{}'.format(idx + 1) for idx in range(X.shape[1])]
    es_data = OrderedDict()
    for idx, band in enumerate(bands):
        arr = xr.DataArray(X[:, idx].reshape(height, width),
                   coords=[('y', np.arange(height)),
                           ('x', np.arange(width))],
                   dims=('y', 'x'))
        es_data[band] = arr
    # No geo_transform in attrs of arr, so add_canvas = False
    es = ElmStore(es_data, add_canvas=False)
    sample_weight = None
    return es, y, sample_weight

Testing out sampler_train:

In [42]: X, y, _ = sampler_train(10, 12)

In [43]: X, y
 Dimensions:  (x: 10, y: 12)
   * y        (y) int64 0 1 2 3 4 5 6 7 8 9 10 11
   * x        (x) int64 0 1 2 3 4 5 6 7 8 9
 Data variables:
     band_1   (y, x) float64 0.5343 -1.21 1.241 2.191 3.364 2.115 3.579 3.086 ...
     band_2   (y, x) float64 3.657 3.575 1.164 4.786 4.354 3.74 1.924 3.674 ...
     band_3   (y, x) float64 4.909 2.258 2.761 4.313 5.379 4.145 6.515 5.137 ...
     band_4   (y, x) float64 9.872 5.329 4.786 10.41 10.96 6.878 7.356 10.11 ...
     band_5   (y, x) float64 7.343 5.88 3.924 11.82 11.53 10.16 10.78 11.74 ...
     _dummy_canvas: True
     band_order: ['band_1', 'band_2', 'band_3', 'band_4', 'band_5'],
 array([1, 0, 0, 2, 2, 1, 1, 2, 2, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 2, 2, 2, 0,
        0, 0, 2, 1, 0, 2, 0, 2, 2, 1, 2, 1, 2, 0, 2, 2, 0, 0, 2, 1, 1, 2, 2,
        0, 1, 2, 0, 1, 0, 1, 2, 0, 0, 0, 1, 1, 1, 2, 1, 1, 2, 2, 2, 0, 1, 1,
        2, 0, 2, 2, 1, 0, 1, 2, 1, 0, 0, 1, 1, 1, 2, 1, 0, 2, 1, 0, 1, 2, 0,
        0, 2, 1, 1, 0, 1, 2, 2, 1, 0, 2, 0, 1, 0, 1, 1, 2, 0, 0, 2, 1, 1, 1,
        2, 2, 1, 0, 2]))

Pipeline with scoring

The example below sets up accuracy_score for scoring a Pipeline that will flatten the sample and run SGDClassifier. The scoring_kwargs include greater_is_better (passed to sklearn.model_selection.make_scorer and score_weights determining whether sort models from minimum to maximum fitness (-1) or maximum to minimum (1). Here we are maximimizing the accuracy_score:

pipe = Pipeline([steps.Flatten(),
                 ('sgd', SGDClassifier())],
                 scoring_kwargs=dict(greater_is_better=True, score_weights=[1]))

Read more documentation here on all the options available in elm.pipeline.steps .

Call fit_ensemble

Calling fit_ensemble with an args_list of length 3, we are fitting all models in the ensemble to the same sample in one generation, then proceeding to fitting all models against the next sample in the next generation. In this case we have 3 generations (ngen above) and 3 samples (len(args_list) below) and models_share_sample=True. Each generation will have be a different sample and all models in a generation will be fitted to that sample.

data_source = dict(sampler=sampler_train, args_list=[(100, 120)] * 3)
fitted = pipe.fit_ensemble(**data_source, **ensemble_kwargs)

Call predict_many

We currently have 2 models in the ensemble (see saved_ensemble_size above that set the top N models to keep) and an args_list that will generate 3 samples: predict_many will predict 6 sample - model combinations.

preds = pipe.predict_many(**data_source)

Checking the number of predictions returned:

In [7]: len(preds)
Out[7]: 6

Each item in preds is an ElmStore with a DataArray called predict. In this case that DataArray is a 2-D raster because we used the default keyword argument to_raster=True when predict_many was called. The next snippet shows using the plot attribute of the predict DataArray:

See also documentation on plotting with xarray

p = preds[0]

Predicting from an Ensemble Subset

By default predict_many will look for an attribute on the Pipeline instance called .ensemble, which is expected to be a list of (tag, pipeline) tuples, and predict from each trained Pipeline instance in .ensemble. Alternatively you can pass a list of (tag, pipeline) tuples as ensemble keyword argument. The example below predicts only from the best model in the ensemble (the final ensemble is sorted by model score if scoring was given to Pipeline initialization). There are 3 predictions because there are 3 samples.

In [16]: subset = pipe.ensemble[:1]
In [17]: preds = pipe.predict_many(ensemble=subset, **data_source)
In [18]: len(preds)
Out[18]: 3

Predictions Too Large For Memory

In the examples above, predict_many has returned a list of ElmStore objects. If the number of samples and/or models is large then keeping them all predictions in memory in a list is infeasible. In these cases, pass a serialize argument (callable) to predict_many to serialize prediction ElmStore outputs as they are generated. serialize should have a signature exactly like the example below:

import os
from sklearn.externals import joblib
def serialize(y, X, tag, elm_predict_path):
    dirr = os.path.join(elm_predict_path, tag)
    if not os.path.exists(dirr):
        os.mkdir(dirr) # assuming ELM_PREDICT_PATH in environment
    base = "_".join('{:.02f}'.format(_) for _ in sorted(X.canvas.bounds))
    joblib.dump(y, os.path.join(dirr, base + '.xr'))
    return X.canvas
preds = pipe.predict_many(ensemble=pipe.ensemble[:1], serialize=serialize,**data_source)

In predicting over 3 samples and one model, we have created 3 joblib dump prediction files and returned three Canvas objects

In [27]: preds
(Canvas(geo_transform=(-180, 0.1, 0, 90, 0, -0.1), buf_xsize=10, buf_ysize=10, dims=('y', 'x'), ravel_order='C', zbounds=None, tbounds=None, zsize=None, tsize=None, bounds=BoundingBox(left=-180.0, bottom=90.0, right=-179.1, top=89.1)),
 Canvas(geo_transform=(-180, 0.1, 0, 90, 0, -0.1), buf_xsize=10, buf_ysize=10, dims=('y', 'x'), ravel_order='C', zbounds=None, tbounds=None, zsize=None, tsize=None, bounds=BoundingBox(left=-180.0, bottom=90.0, right=-179.1, top=89.1)),
 Canvas(geo_transform=(-180, 0.1, 0, 90, 0, -0.1), buf_xsize=10, buf_ysize=10, dims=('y', 'x'), ravel_order='C', zbounds=None, tbounds=None, zsize=None, tsize=None, bounds=BoundingBox(left=-180.0, bottom=90.0, right=-179.1, top=89.1)))
(Canvas(geo_transform=(-180, 0.1, 0, 90, 0, -0.1), buf_xsize=10, buf_ysize=10, dims=('y', 'x'), ravel_order='C', zbounds=None, tbounds=None, zsize=None, tsize=None, bounds=BoundingBox(left=-180.0, bottom=90.0, right=-179.1, top=89.1)),
 Canvas(geo_transform=(-180, 0.1, 0, 90, 0, -0.1), buf_xsize=10, buf_ysize=10, dims=('y', 'x'), ravel_order='C', zbounds=None, tbounds=None, zsize=None, tsize=None, bounds=BoundingBox(left=-180.0, bottom=90.0, right=-179.1, top=89.1)),
 Canvas(geo_transform=(-180, 0.1, 0, 90, 0, -0.1), buf_xsize=10, buf_ysize=10, dims=('y', 'x'), ravel_order='C', zbounds=None, tbounds=None, zsize=None, tsize=None, bounds=BoundingBox(left=-180.0, bottom=90.0, right=-179.1, top=89.1)))

Here are some notes on the arguments passed to serialize if given:

  • y is an ElmStore either 1-D or 2-D (see to_raster keyword to predict_many)
  • X is the X ElmStore that was used for prediction (the Pipeline will preserve attrs in X useful for serializing y as in the example above which used the .canvas attribute of X)
  • tag is a unique tag of sample and Pipeline instance
  • elm_predict_path is the root dir for serialization output - ELM_PREDICT_PATH from environment variables.

Parallel Prediction

To run predict_many (or fit_ensemble or fit_ea) in parallel using a dask-distributed client or dask ThreadPool client, use elm.config.client_context as shown here (continuing with the namespace defined by the snippets above)

First make sure you are running a dask-scheduler and dask-worker . Read more here on dask-distributed.

with client_context(dask_executor='DISTRIBUTED', dask_scheduler='') as client:
    ensemble_kwargs['client'] = client
    fitted = pipe.fit_ensemble(**data_source, **ensemble_kwargs)
    preds = pipe.predict_many(client=client, **data_source)

In the example above, client_context could have been called with no arguments if DASK_EXECUTOR and DASK_SCHEDULER environment variables.

With parallel predict_many , each ensemble member / sample combination is a separate task - there is no parallelism within transformations of the Pipeline .