Custom Planners And Executors¶
By switching out the executor_collection
and/or the planner
, we can specify a
different way of running the cycle.
Easier Seeding With A Smarter Planner¶
In this example, we use the Controller
which allows much more control over execution
order. It considers the last available result and picks the matching next step. This means
that seeding is relatively simple.
# Uncomment the following line when running on Google Colab
# !pip install autora
import numpy as np
from autora.experimentalist.pipeline import make_pipeline
from autora.variable import VariableCollection, Variable
from sklearn.linear_model import LinearRegression
from autora.workflow import Controller
from itertools import takewhile
def ground_truth(x):
return x + 1
variables = VariableCollection(
independent_variables=[Variable(name="x1", allowed_values=range(11))],
dependent_variables=[Variable(name="y", value_range=(-20, 20))],
)
example_experimentalist = make_pipeline(
[variables.independent_variables[0].allowed_values])
def get_example_synthetic_experiment_runner():
rng = np.random.default_rng(seed=180)
def runner(x):
return ground_truth(x) + rng.normal(0, 0.1, x.shape)
return runner
example_synthetic_experiment_runner = get_example_synthetic_experiment_runner()
example_theorist = LinearRegression()
def monitor(state):
print(f"MONITOR: Generated new {state.history[-1].kind}")
cycle_with_last_result_planner = Controller(
monitor=monitor,
variables=variables,
experimentalist=example_experimentalist,
experiment_runner=example_synthetic_experiment_runner,
theorist=example_theorist,
)
When we run this cycle starting with no data, we generate an experimental condition first:
_ = list(takewhile(lambda c: len(c.state.models) < 2, cycle_with_last_result_planner))
MONITOR: Generated new CONDITION MONITOR: Generated new OBSERVATION MONITOR: Generated new MODEL MONITOR: Generated new CONDITION MONITOR: Generated new OBSERVATION MONITOR: Generated new MODEL
However, if we seed the same cycle with observations, then its first Executor will be the theorist:
controller_with_seed_observation = Controller(
monitor=monitor,
variables=variables,
theorist=example_theorist,
experimentalist=example_experimentalist,
experiment_runner=example_synthetic_experiment_runner,
)
seed_observation = example_synthetic_experiment_runner(np.linspace(0,5,10))
controller_with_seed_observation.seed(observations=[seed_observation])
_ = next(controller_with_seed_observation)
MONITOR: Generated new MODEL
Arbitrary Execution Order (Toy Example)¶
In some cases, we need to change the order of execution of different steps completely. This might be useful in cases when different experimentalists or theorists are needed at different times in the cycle, e.g. for initial seeding, or if the order of execution is the subject of the experiment.
In this example, we use a planner which suggests a different random operation at each step, demonstrating arbitrary execution order. We do this by modifying the planner attribute of an existing controller
This might be useful in cases when different experimentalists or theorists are needed at different times in the cycle, e.g. for initial seeding.
from autora.workflow.planner import random_operation_planner
def monitor(state):
print(f"MONITOR: Generated new {state.history[-1].kind}")
controller_with_random_planner = Controller(
planner=random_operation_planner,
monitor=monitor,
variables=variables,
theorist=example_theorist,
experimentalist=example_experimentalist,
experiment_runner=example_synthetic_experiment_runner,
)
The random_operation_planner
depends on the python random number generator, so we seed it first:
from random import seed
seed(42)
We also want to watch the logging messages from the cycle:
import logging
import sys
logging.basicConfig(format='%(levelname)s: %(message)s', stream=sys.stdout,
level=logging.INFO)
Now we can evaluate the cycle and watch its behaviour:
def step(controller_):
try:
_ = next(controller_)
except Exception as e:
print(f"FAILED: with {e=}")
The first step, the theorist is selected as the random Executor, and it fails because it depends on there being observations to theorize against:
step(controller_with_random_planner) # i = 0
INFO: getting step_name='theorist' INFO: running next_function=<function from_theorist_estimator.<locals>._executor_theorist at 0x14f594dc0> FAILED: with e=AssertionError('observations=[] needs at least one entry for model fitting')
The second step, a new condition is generated.
step(controller_with_random_planner) # i = 1
INFO: getting step_name='experimentalist' INFO: running next_function=<function from_experimentalist_pipeline.<locals>._executor_experimentalist at 0x14f594dc0> MONITOR: Generated new CONDITION
... which is repeated on the third step as well:
step(controller_with_random_planner) # i = 2
INFO: getting step_name='experimentalist' INFO: running next_function=<function from_experimentalist_pipeline.<locals>._executor_experimentalist at 0x14f595750> MONITOR: Generated new CONDITION
On the fourth step, we generate another error when trying to run the theorist:
step(controller_with_random_planner) # i = 3
INFO: getting step_name='theorist' INFO: running next_function=<function from_theorist_estimator.<locals>._executor_theorist at 0x14f5955a0> FAILED: with e=AssertionError('observations=[] needs at least one entry for model fitting')
On the fifth step, we generate a first real observation, so that the next time we try to run a theorist we are successful:
step(controller_with_random_planner) # i = 4
INFO: getting step_name='experiment_runner' INFO: running next_function=<function from_experiment_runner_callable.<locals>._executor_experiment_runner at 0x107119630> MONITOR: Generated new OBSERVATION
By the ninth iteration, there are observations which the theorist can use, and it succeeds.
_ = list(takewhile(lambda c: len(c.state.models) < 1, controller_with_random_planner))
INFO: getting step_name='experimentalist' INFO: running next_function=<function from_experimentalist_pipeline.<locals>._executor_experimentalist at 0x14f596200> MONITOR: Generated new CONDITION INFO: getting step_name='experimentalist' INFO: running next_function=<function from_experimentalist_pipeline.<locals>._executor_experimentalist at 0x1071195a0> MONITOR: Generated new CONDITION INFO: getting step_name='experimentalist' INFO: running next_function=<function from_experimentalist_pipeline.<locals>._executor_experimentalist at 0x14f5964d0> MONITOR: Generated new CONDITION INFO: getting step_name='theorist' INFO: running next_function=<function from_theorist_estimator.<locals>._executor_theorist at 0x14f596830> MONITOR: Generated new MODEL
Arbitrary Executors And Planners¶
In some cases, we need to go beyond adding different orders of planning the three
experimentalist
, experiment_runner
and theorist
and build more complex cycles with
different Executors for different states.
For instance, there might be a situation where at the start, the main "active" experimentalist can't be run as it needs one or more models as input. Once there are at least two models, then the active experimentalist can be run. One method to handle this is to run a "seed" experimentalist until the main experimentalist can be used.
In these cases, we need full control over (and have full responsibility for) the planners and executors.
The model we'll try to discover is:
def ground_truth(x, m=3.5, c=1):
return m * x + c
rng = np.random.default_rng(seed=180)
def experiment_runner(x):
return ground_truth(x) + rng.normal(0, 0.1)
variables = VariableCollection(
independent_variables=[Variable(name="x1", value_range=(-10, 10))],
dependent_variables=[Variable(name="y", value_range=(-100, 100))],
)
We now define a planner which chooses a different experimentalist when supplied with no data versus some data.
from autora.workflow.planner import last_result_kind_planner
from autora.state.history import History
def seeding_planner(state):
# We're going to reuse the "last_result_kind_planner" planner, and modify its output.
next_function = last_result_kind_planner(state)
if next_function == "experimentalist":
if len(state.models) >= 2:
return "main_experimentalist"
else:
return "seed_experimentalist"
else:
return next_function
Now we can see what would happen with a particular state. If there are no results, then we get the seed experimentalist:
seeding_planner(History())
'seed_experimentalist'
... and we also get the seed experimentalist if the last result was a model and there are less than two models:
seeding_planner(History(models=['a single model']))
'seed_experimentalist'
whereas if we have at least two models to work on, we get the main experimentalist:
seeding_planner(History(models=['a model', 'another model']))
'main_experimentalist'
If we had a condition last, we choose the experiment runner next:
seeding_planner(History(conditions=['a condition']))
'experiment_runner'
If we had an observation last, we choose the theorist next:
seeding_planner(History(observations=['an observation']))
'theorist'
Now we need to define an executor collection to handle the actual execution steps.
from autora.experimentalist.pipeline import make_pipeline, Pipeline
from autora.experimentalist.sampler.random_sampler import random_sample
from functools import partial
Wen can run the seed pipeline with no data:
experimentalist_which_needs_no_data = make_pipeline([
np.linspace(*variables.independent_variables[0].value_range, 1_000),
partial(random_sample, n=10)]
)
np.array(experimentalist_which_needs_no_data())
array([ 6.71671672, -0.73073073, -5.05505506, 6.13613614, 0.03003003, 4.59459459, 2.79279279, 5.43543544, -1.65165165, 8.0980981 ])
... whereas we need some model for this sampler:
from autora.experimentalist.sampler.model_disagreement import model_disagreement_sampler
experimentalist_which_needs_a_model = Pipeline([
('pool', np.linspace(*variables.independent_variables[0].value_range, 1_000)),
('sampler', partial(model_disagreement_sampler, num_samples=5)),])
experimentalist_which_needs_a_model()
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[26], line 5 1 from autora.experimentalist.sampler.model_disagreement import model_disagreement_sampler 2 experimentalist_which_needs_a_model = Pipeline([ 3 ('pool', np.linspace(*variables.independent_variables[0].value_range, 1_000)), 4 ('sampler', partial(model_disagreement_sampler, num_samples=5)),]) ----> 5 experimentalist_which_needs_a_model() File ~/Developer/autora-workflow/venv/lib/python3.10/site-packages/autora/experimentalist/pipeline.py:171, in Pipeline.__call__(self, ex, **params) 169 assert isinstance(pipe, Pipe) 170 all_params_for_pipe = merged_params.get(name, dict()) --> 171 results.append(pipe(results[-1], **all_params_for_pipe)) 173 return results[-1] TypeError: model_disagreement_sampler() missing 1 required positional argument: 'models'
We'll have to provide the models during the cycle run.
We need a reasonable theorist for this situation. For this problem, a linear regressor will suffice.
t = LinearRegression()
Let's test the theorist for the ideal case – lots of data:
X = np.linspace(*variables.independent_variables[0].value_range, 1_000).reshape(-1, 1)
tfitted = t.fit(X, experiment_runner(X))
f"m = {tfitted.coef_[0][0]:.2f}, c = {tfitted.intercept_[0]:.2f}"
'm = 3.50, c = 1.04'
This seems to work fine.
Now we can define the executor component. We'll use a factory method to generate the collection:
from autora.workflow.executor import (ChainedFunctionMapping, from_experimentalist_pipeline,
from_experiment_runner_callable, from_theorist_estimator)
executor_collection = ChainedFunctionMapping(
seed_experimentalist=
[from_experimentalist_pipeline, experimentalist_which_needs_no_data],
main_experimentalist=
[from_experimentalist_pipeline, experimentalist_which_needs_a_model],
experiment_runner=[from_experiment_runner_callable, experiment_runner],
theorist=[from_theorist_estimator, LinearRegression()],
)
We need some special parameters to handle the main experimentalist, so we specify those:
params = {"main_experimentalist": {"sampler": {"models": "%models%"}}}
We now instantiate the controller:
from autora.workflow.base import BaseController
from autora.state.history import History
c = BaseController(
state=History(variables=variables, params=params),
planner=seeding_planner,
executor_collection=executor_collection
)
c
<autora.workflow.base.BaseController at 0x14f8ed570>
class PrintHandler(logging.Handler):
def emit(self, record):
print(self.format(record))
On the first step, we generate a condition sampled randomly across the whole domain (as we expected):
next(c).state.history[-1]
INFO: getting step_name='seed_experimentalist' INFO: running next_function=<function from_experimentalist_pipeline.<locals>._executor_experimentalist at 0x14f85feb0>
Result(data=array([ 9.4994995 , -8.17817818, -1.19119119, 8.6986987 , 7.45745746, -6.93693694, 8.05805806, -1.45145145, -5.97597598, 1.57157157]), kind=ResultKind.CONDITION)
After three more steps, we generate a new condition, which again is sampled across the whole domain. Here we iterate the controller until we've got two sets of conditions:
_ = list(takewhile(lambda c: len(c.state.conditions) < 2, c))
c.state.history[-1]
INFO: getting step_name='experiment_runner' INFO: running next_function=<function from_experiment_runner_callable.<locals>._executor_experiment_runner at 0x14f85fe20> INFO: getting step_name='theorist' INFO: running next_function=<function from_theorist_estimator.<locals>._executor_theorist at 0x107118b80> INFO: getting step_name='seed_experimentalist' INFO: running next_function=<function from_experimentalist_pipeline.<locals>._executor_experimentalist at 0x14f85fe20>
Result(data=array([ 1.57157157, -3.93393393, -0.47047047, -4.47447447, 8.43843844, 6.17617618, -3.49349349, -8.998999 , 4.93493493, 2.25225225]), kind=ResultKind.CONDITION)
Once we have two models:
_ = list(takewhile(lambda c: len(c.state.models) < 2, c))
c.state.models
INFO: getting step_name='experiment_runner' INFO: running next_function=<function from_experiment_runner_callable.<locals>._executor_experiment_runner at 0x14f85f9a0> INFO: getting step_name='theorist' INFO: running next_function=<function from_theorist_estimator.<locals>._executor_theorist at 0x107118b80>
[LinearRegression(), LinearRegression()]
... when we run the next step, we'll get the main experimentalist. This samples five points from the extreme parts of the problem domain where the disagreement between the two models is the greatest:
next(c).state.history[-1]
INFO: getting step_name='main_experimentalist' INFO: running next_function=<function from_experimentalist_pipeline.<locals>._executor_experimentalist at 0x14f85f9a0> WARNING: new_conditions=array([-10. , -9.97997998, -9.95995996, -9.93993994, -9.91991992]) is an ndarray, so variable confusion is a possibility
Result(data=array([-10. , -9.97997998, -9.95995996, -9.93993994, -9.91991992]), kind=ResultKind.CONDITION)