Fit Ensemble¶
Ensemble fitting may be helpful in cases where the representative sample is large and/or model parameter or fitting uncertainty should be considered.
Ensemble fitting may:
- Use one or more samples,
- Use one or more models (Pipeline instances), and/or
- Use one or more generations of fitting, with model selection logic on each generation
It is helpful to first read the section Data Sources for a Pipeline showing how to use either a single X matrix or a series of samples from a sampler callable.
Define a Sampler¶
The example below uses a sampler function and args_list (list of unpackable args to sampler) to fit to many samples. The full script can be found here. First the script does some imports and sets up a sampler function that uses band_specs (see also ElmStore) to select a subset of bands in HDF4 files.
import os
from sklearn.cluster import MiniBatchKMeans
from sklearn.decomposition import IncrementalPCA
import numpy as np
from elm.config.dask_settings import client_context
from elm.model_selection.kmeans import kmeans_model_averaging, kmeans_aic
from elm.pipeline import steps, Pipeline
from elm.readers import *
from elm.sample_util.band_selection import select_from_file
from elm.sample_util.metadata_selection import example_meta_is_day
ELM_EXAMPLE_DATA_PATH = os.environ['ELM_EXAMPLE_DATA_PATH']
band_specs = list(map(lambda x: BandSpec(**x),
[{'search_key': 'long_name', 'search_value': "Band 1 ", 'name': 'band_1'},
{'search_key': 'long_name', 'search_value': "Band 2 ", 'name': 'band_2'},
{'search_key': 'long_name', 'search_value': "Band 3 ", 'name': 'band_3'},
{'search_key': 'long_name', 'search_value': "Band 4 ", 'name': 'band_4'},
{'search_key': 'long_name', 'search_value': "Band 5 ", 'name': 'band_5'},
{'search_key': 'long_name', 'search_value': "Band 6 ", 'name': 'band_6'},
{'search_key': 'long_name', 'search_value': "Band 7 ", 'name': 'band_7'}]))
# Just get daytime files
HDF4_FILES = [f for f in glob.glob(os.path.join(ELM_EXAMPLE_DATA_PATH, 'hdf4', '*hdf'))
if example_meta_is_day(load_hdf4_meta(f))]
data_source = {
'sampler': select_from_file,
'band_specs': band_specs,
'args_list': HDF4_FILES,
}
Define Pipeline Steps¶
Next a Pipeline is configured that flattens the separate band rasters to a single 2-D DataArray (See also transform-flatten, uses standard scaling from scikit-learn, then transforms with IncrementalPCA with 2 partial_fit batches before K-Means. The Pipeline constructor also takes a scoring callable and optional scoring_kwargs.
pipeline_steps = [steps.Flatten(),
('scaler', steps.StandardScaler()),
('pca', steps.Transform(IncrementalPCA(n_components=4), partial_fit_batches=2)),
('kmeans', MiniBatchKMeans(n_clusters=4, compute_labels=True)),]
pipe = Pipeline(pipeline_steps, scoring=kmeans_aic, scoring_kwargs=dict(score_weights=[-1]))
See the signature for kmeans_aic here to write a similar scoring function, otherwise scoring defaults to calling the estimator’s .score callable or exception if .score is not defined.
Configure Ensemble¶
Now we can call fit_ensemble after choosing some controls on the size of the ensemble, the number of generations, and the logic for selecting models after each generation.
Here’s an example:
ensemble_kwargs = {
'model_selection': kmeans_model_averaging,
'model_selection_kwargs': {
'drop_n': 2,
'evolve_n': 2,
},
'init_ensemble_size': 4,
'ngen': 3,
'partial_fit_batches': 2,
'saved_ensemble_size': 4,
'models_share_sample': True,
}
In the example above:
ngensets the number of generations to 3- There are 4 initial ensemble members (
init_ensemble_size),- After each generation
kmeans_model_averaging(See API docs) is called on the ensemble withmodel_selection_kwargsare passed.- There are 3
partial_fitbatches forMiniBatchKMeanson every Pipeline instance (partial_fitwithin theIncrementalPCAwas configured in the initialization ofsteps.Transformabove)models_share_sampleis set toTrueso in each generation every ensemble member is fit to the same sample, then on the next generation, every model is fit to the next sample determined bysamplerandargs_listin this case. Ifmodels_share_samplewereFalse, then in each generation every ensemble member would be copied and fit to every sample, repeating the process on each generation.
Fitting with Dask-Distributed¶
In the snippets above, we have a data_source dict with sampler,``band_specs`` and args_list key / values. We can pass this with the ensemble_kwargs ensemble configuration to fit_ensemble as well as predict_many . The data source for predict_many does not necessarily have to be the same one given to fit_ensemble or fit_ea).
Note : If you want dask-distributed as a client, first make sure you are running a dask-scheduler and dask-worker . Read more here on dask-distributed and follow instructions in environment variables .
with client_context() as client:
ensemble_kwargs['client'] = client
pipe.fit_ensemble(**data_source, **ensemble_kwargs)
pred = pipe.predict_many(client=client, **data_source)
Fitting with dask parallelizes over the ensemble members (Pipeline instances) and over the calls to partial_fit - currently transformers in the Pipeline are not parallelized with dask .
Controlling Ensemble Initialization¶
To initialize the ensemble with Pipeline instances that do not all share the same parameters (as above), we could replace init_ensemble_size above with ensemble_init_func
n_clusters_choices = tuple(range(4, 9))
def ensemble_init_func(pipe, **kwargs):
models = []
for c in n_clusters_choices:
new_pipe = pipe.new_with_params(kmeans__n_clusters=c)
models.append(new_pipe)
return models
ensemble_kwargs = {
'model_selection': kmeans_model_averaging,
'model_selection_kwargs': {
'drop_n': 2,
'evolve_n': 2,
},
'ensemble_init_func': ensemble_init_func,
'ngen': 3,
'partial_fit_batches': 2,
'saved_ensemble_size': 4,
'models_share_sample': True,
}
with client_context() as client:
ensemble_kwargs['client'] = client
pipe.fit_ensemble(**data_source, **ensemble_kwargs)
pred = pipe.predict_many(client=client, **data_source)
In the example above, Pipeline.new_with_params(kmeans__n_clusters) uses the scikit-learn syntax for parameter modifications of named steps in a pipeline. In the initialization of Pipeline in the example above, the MiniBatchMeans step was named kmeans, so kmeans__n_clusters=c sets the n_clusters parameter to the K-Means step and the ensemble in this case consists of one Pipeline for each of n_clusters choices in (4, 5, 6, 7, 8).