Skip to content

autora.theorist.bsr.regressor

BSRRegressor

Bases: BaseEstimator, RegressorMixin

Bayesian Symbolic Regression (BSR)

A MCMC-sampling-based Bayesian approach to symbolic regression -- a machine learning method that bridges X and y by automatically building up mathematical expressions of basic functions. Performance and speed of BSR depends on pre-defined parameters.

This class is intended to be compatible with the Scikit-Learn Estimator API.

Examples:

>>> import numpy as np
>>> num_samples = 1000
>>> X = np.linspace(start=0, stop=1, num=num_samples).reshape(-1, 1)
>>> y = np.sqrt(X)
>>> estimator = BSRRegressor()
>>> estimator = estimator.fit(X, y)
>>> estimator.predict([[1.5]])

Attributes:

Name Type Description
roots_ Optional[List[List[Node]]]

the root(s) of the best-fit symbolic regression (SR) tree(s)

betas_ Optional[List[ndarray]]

the beta parameters of the best-fit model

train_errs_ Optional[List[List[float]]]

the training losses associated with the best-fit model

Source code in temp_dir/bsr/src/autora/theorist/bsr/regressor.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class BSRRegressor(BaseEstimator, RegressorMixin):
    """
    Bayesian Symbolic Regression (BSR)

    A MCMC-sampling-based Bayesian approach to symbolic regression -- a machine learning method
    that bridges `X` and `y` by automatically building up mathematical expressions of basic
    functions. Performance and speed of `BSR` depends on pre-defined parameters.

    This class is intended to be compatible with the
    [Scikit-Learn Estimator API](https://scikit-learn.org/stable/developers/develop.html).

    Examples:

        >>> import numpy as np
        >>> num_samples = 1000
        >>> X = np.linspace(start=0, stop=1, num=num_samples).reshape(-1, 1)
        >>> y = np.sqrt(X)
        >>> estimator = BSRRegressor()
        >>> estimator = estimator.fit(X, y)
        >>> estimator.predict([[1.5]])

    Attributes:
        roots_: the root(s) of the best-fit symbolic regression (SR) tree(s)
        betas_: the beta parameters of the best-fit model
        train_errs_: the training losses associated with the best-fit model
    """

    def __init__(
        self,
        tree_num: int = 3,
        itr_num: int = 5000,
        alpha1: float = 0.4,
        alpha2: float = 0.4,
        beta: float = -1,
        show_log: bool = False,
        val: int = 100,
        last_idx: int = -1,
        prior_name: str = "Uniform",
    ):
        """
        Arguments:
            tree_num: pre-specified number of SR trees to fit in the model
            itr_num: number of iterations steps to run for the model fitting process
            alpha1, alpha2, beta: the hyper-parameters of priors
            show_log: whether to output certain logging info
            val: number of validation steps to run for each iteration step
            last_idx: the index of which latest (most best-fit) model to use
                (-1 means the latest one)
        """
        self.tree_num = tree_num
        self.itr_num = itr_num
        self.alpha1 = alpha1
        self.alpha2 = alpha2
        self.beta = beta
        self.show_log = show_log
        self.val = val
        self.last_idx = last_idx
        self.prior_name = prior_name

        # attributes that are not set until `fit`
        self.roots_: Optional[List[List[Node]]] = None
        self.betas_: Optional[List[np.ndarray]] = None
        self.train_errs_: Optional[List[List[float]]] = None

        self.X_: Optional[Union[np.ndarray, pd.DataFrame]] = None
        self.y_: Optional[Union[np.ndarray, pd.DataFrame]] = None

    def predict(self, X: Union[np.ndarray, pd.DataFrame]) -> np.ndarray:
        """
        Applies the fitted model to a set of independent variables `X`,
        to give predictions for the dependent variable `y`.

        Arguments:
            X: independent variables in an n-dimensional array
        Returns:
            y: predicted dependent variable values
        """
        if isinstance(X, np.ndarray):
            X = pd.DataFrame(X)

        check_is_fitted(self, attributes=["roots_"])

        k = self.tree_num
        n_test = X.shape[0]
        tree_outs = np.zeros((n_test, k))

        assert self.roots_ and self.betas_
        for i in np.arange(k):
            tree_out = self.roots_[-self.last_idx][i].evaluate(X)
            tree_out.shape = tree_out.shape[0]
            tree_outs[:, i] = tree_out

        ones = np.ones((n_test, 1))
        tree_outs = np.concatenate((ones, tree_outs), axis=1)
        _beta = self.betas_[-self.last_idx]
        output = np.matmul(tree_outs, _beta)

        return output

    def fit(
        self, X: Union[np.ndarray, pd.DataFrame], y: Union[np.ndarray, pd.DataFrame]
    ):
        """
        Runs the optimization for a given set of `X`s and `y`s.

        Arguments:
            X: independent variables in an n-dimensional array
            y: dependent variables in an n-dimensional array
        Returns:
            self (BSR): the fitted estimator
        """
        # train_data must be a dataframe
        if isinstance(X, np.ndarray):
            X = pd.DataFrame(X)
        train_errs: List[List[float]] = []
        roots: List[List[Node]] = []
        betas: List[np.ndarray] = []
        itr_num = self.itr_num
        k = self.tree_num
        beta = self.beta

        if self.show_log:
            _logger.info("Starting training")
        while len(train_errs) < itr_num:
            n_feature = X.shape[1]
            n_train = X.shape[0]

            ops_name_lst, ops_weight_lst, ops_priors = get_prior_dict(
                prior_name=self.prior_name
            )

            # List of tree samples
            root_lists: List[List[Node]] = [[] for _ in range(k)]

            sigma_a_list = []  # List of sigma_a, for each component tree
            sigma_b_list = []  # List of sigma_b, for each component tree

            sigma_y = invgamma.rvs(1)  # for output y

            # Initialization
            for count in np.arange(k):
                # create a new root node
                root = Node(0)
                sigma_a = invgamma.rvs(1)
                sigma_b = invgamma.rvs(1)

                # grow a tree from the root node
                if self.show_log:
                    _logger.info("Grow a tree from the root node")

                grow(
                    root,
                    ops_name_lst,
                    ops_weight_lst,
                    ops_priors,
                    n_feature,
                    sigma_a=sigma_a,
                    sigma_b=sigma_b,
                )

                # put the root into list
                root_lists[count].append(root)
                sigma_a_list.append(sigma_a)
                sigma_b_list.append(sigma_b)

            # calculate beta
            if self.show_log:
                _logger.info("Calculate beta")
            # added a constant in the regression by fwl
            tree_outputs = np.zeros((n_train, k))

            for count in np.arange(k):
                temp = root_lists[count][-1].evaluate(X)
                temp.shape = temp.shape[0]
                tree_outputs[:, count] = temp

            constant = np.ones((n_train, 1))  # added a constant
            tree_outputs = np.concatenate((constant, tree_outputs), axis=1)
            scale = np.max(np.abs(tree_outputs))
            tree_outputs = tree_outputs / scale
            epsilon = (
                np.eye(tree_outputs.shape[1]) * 1e-6
            )  # add to the matrix to prevent singular matrrix
            yy = np.array(y)
            yy.shape = (yy.shape[0], 1)
            _beta = np.linalg.inv(
                np.matmul(tree_outputs.transpose(), tree_outputs) + epsilon
            )
            _beta = np.matmul(_beta, np.matmul(tree_outputs.transpose(), yy))
            output = np.matmul(tree_outputs, _beta)
            # rescale the beta, above we scale tree_outputs for calculation by fwl
            _beta /= scale

            total = 0
            accepted = 0
            errs = []
            total_list = []

            tic = time.time()

            if self.show_log:
                _logger.info("While total < ", self.val)
            while total < self.val:
                switch_label = False
                for count in range(k):
                    curr_roots = []  # list of current components
                    for i in np.arange(k):
                        curr_roots.append(root_lists[i][-1])
                    # pick the root to be changed
                    sigma_a = sigma_a_list[count]
                    sigma_b = sigma_b_list[count]

                    # the returned root is a new copy
                    if self.show_log:
                        _logger.info("new_prop...")
                    res, root, sigma_y, sigma_a, sigma_b = prop_new(
                        curr_roots,
                        count,
                        sigma_y,
                        beta,
                        sigma_a,
                        sigma_b,
                        X,
                        y,
                        ops_name_lst,
                        ops_weight_lst,
                        ops_priors,
                    )
                    if self.show_log:
                        _logger.info("res:", res)
                        print(root)

                    total += 1
                    # update sigma_a and sigma_b
                    sigma_a_list[count] = sigma_a
                    sigma_b_list[count] = sigma_b

                    if res:
                        # flag = False
                        accepted += 1
                        # record newly accepted root
                        root_lists[count].append(copy.deepcopy(root))

                        tree_outputs = np.zeros((n_train, k))

                        for i in np.arange(k):
                            temp = root_lists[count][-1].evaluate(X)
                            temp.shape = temp.shape[0]
                            tree_outputs[:, i] = temp

                        constant = np.ones((n_train, 1))
                        tree_outputs = np.concatenate((constant, tree_outputs), axis=1)
                        scale = np.max(np.abs(tree_outputs))
                        tree_outputs = tree_outputs / scale
                        epsilon = (
                            np.eye(tree_outputs.shape[1]) * 1e-6
                        )  # add to prevent singular matrix
                        yy = np.array(y)
                        yy.shape = (yy.shape[0], 1)
                        _beta = np.linalg.inv(
                            np.matmul(tree_outputs.transpose(), tree_outputs) + epsilon
                        )
                        _beta = np.matmul(
                            _beta, np.matmul(tree_outputs.transpose(), yy)
                        )

                        output = np.matmul(tree_outputs, _beta)
                        # rescale the beta, above we scale tree_outputs for calculation
                        _beta /= scale

                        error = 0
                        for i in np.arange(n_train):
                            error += (output[i, 0] - y[i]) * (output[i, 0] - y[i])

                        rmse = np.sqrt(error / n_train)
                        errs.append(rmse)

                        total_list.append(total)
                        total = 0

                    if len(errs) > 100:
                        lapses = min(10, len(errs))
                        converge_ratio = 1 - np.min(errs[-lapses:]) / np.mean(
                            errs[-lapses:]
                        )
                        if converge_ratio < 0.05:
                            # converged
                            switch_label = True
                            break
                if switch_label:
                    break

            if self.show_log:
                for i in np.arange(0, len(y)):
                    _logger.info(output[i, 0], y[i])

            toc = time.time()
            tictoc = toc - tic
            if self.show_log:
                _logger.info("Run time: {:.2f}s".format(tictoc))

                _logger.info("------")
                _logger.info(
                    "Mean rmse of last 5 accepts: {}".format(np.mean(errs[-6:-1]))
                )

            train_errs.append(errs)
            roots.append(curr_roots)
            betas.append(_beta)

        self.roots_ = roots
        self.train_errs_ = train_errs
        self.betas_ = betas
        self.X_, self.y_ = X, y
        return self

    def _model(self, last_ind: int = 1) -> List[str]:
        """
        Return the models in the last-i-th iteration, default `last_ind = 1` refers to the
        last (final) iteration.
        """
        models = []
        assert self.roots_
        for i in range(self.tree_num):
            models.append(self.roots_[-last_ind][i].get_expression())
        return models

    def _complexity(self) -> int:
        """
        Return the complexity of the final models, which equals to the sum of nodes in all
        expression trees.
        """
        cp = 0
        assert self.roots_
        for i in range(self.tree_num):
            root_node = self.roots_[-1][i]
            num = len(get_all_nodes(root_node))
            cp = cp + num
        return cp

__init__(tree_num=3, itr_num=5000, alpha1=0.4, alpha2=0.4, beta=-1, show_log=False, val=100, last_idx=-1, prior_name='Uniform')

Parameters:

Name Type Description Default
tree_num int

pre-specified number of SR trees to fit in the model

3
itr_num int

number of iterations steps to run for the model fitting process

5000
alpha1, (alpha2, beta)

the hyper-parameters of priors

required
show_log bool

whether to output certain logging info

False
val int

number of validation steps to run for each iteration step

100
last_idx int

the index of which latest (most best-fit) model to use (-1 means the latest one)

-1
Source code in temp_dir/bsr/src/autora/theorist/bsr/regressor.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    tree_num: int = 3,
    itr_num: int = 5000,
    alpha1: float = 0.4,
    alpha2: float = 0.4,
    beta: float = -1,
    show_log: bool = False,
    val: int = 100,
    last_idx: int = -1,
    prior_name: str = "Uniform",
):
    """
    Arguments:
        tree_num: pre-specified number of SR trees to fit in the model
        itr_num: number of iterations steps to run for the model fitting process
        alpha1, alpha2, beta: the hyper-parameters of priors
        show_log: whether to output certain logging info
        val: number of validation steps to run for each iteration step
        last_idx: the index of which latest (most best-fit) model to use
            (-1 means the latest one)
    """
    self.tree_num = tree_num
    self.itr_num = itr_num
    self.alpha1 = alpha1
    self.alpha2 = alpha2
    self.beta = beta
    self.show_log = show_log
    self.val = val
    self.last_idx = last_idx
    self.prior_name = prior_name

    # attributes that are not set until `fit`
    self.roots_: Optional[List[List[Node]]] = None
    self.betas_: Optional[List[np.ndarray]] = None
    self.train_errs_: Optional[List[List[float]]] = None

    self.X_: Optional[Union[np.ndarray, pd.DataFrame]] = None
    self.y_: Optional[Union[np.ndarray, pd.DataFrame]] = None

fit(X, y)

Runs the optimization for a given set of Xs and ys.

Parameters:

Name Type Description Default
X Union[ndarray, DataFrame]

independent variables in an n-dimensional array

required
y Union[ndarray, DataFrame]

dependent variables in an n-dimensional array

required

Returns: self (BSR): the fitted estimator

Source code in temp_dir/bsr/src/autora/theorist/bsr/regressor.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def fit(
    self, X: Union[np.ndarray, pd.DataFrame], y: Union[np.ndarray, pd.DataFrame]
):
    """
    Runs the optimization for a given set of `X`s and `y`s.

    Arguments:
        X: independent variables in an n-dimensional array
        y: dependent variables in an n-dimensional array
    Returns:
        self (BSR): the fitted estimator
    """
    # train_data must be a dataframe
    if isinstance(X, np.ndarray):
        X = pd.DataFrame(X)
    train_errs: List[List[float]] = []
    roots: List[List[Node]] = []
    betas: List[np.ndarray] = []
    itr_num = self.itr_num
    k = self.tree_num
    beta = self.beta

    if self.show_log:
        _logger.info("Starting training")
    while len(train_errs) < itr_num:
        n_feature = X.shape[1]
        n_train = X.shape[0]

        ops_name_lst, ops_weight_lst, ops_priors = get_prior_dict(
            prior_name=self.prior_name
        )

        # List of tree samples
        root_lists: List[List[Node]] = [[] for _ in range(k)]

        sigma_a_list = []  # List of sigma_a, for each component tree
        sigma_b_list = []  # List of sigma_b, for each component tree

        sigma_y = invgamma.rvs(1)  # for output y

        # Initialization
        for count in np.arange(k):
            # create a new root node
            root = Node(0)
            sigma_a = invgamma.rvs(1)
            sigma_b = invgamma.rvs(1)

            # grow a tree from the root node
            if self.show_log:
                _logger.info("Grow a tree from the root node")

            grow(
                root,
                ops_name_lst,
                ops_weight_lst,
                ops_priors,
                n_feature,
                sigma_a=sigma_a,
                sigma_b=sigma_b,
            )

            # put the root into list
            root_lists[count].append(root)
            sigma_a_list.append(sigma_a)
            sigma_b_list.append(sigma_b)

        # calculate beta
        if self.show_log:
            _logger.info("Calculate beta")
        # added a constant in the regression by fwl
        tree_outputs = np.zeros((n_train, k))

        for count in np.arange(k):
            temp = root_lists[count][-1].evaluate(X)
            temp.shape = temp.shape[0]
            tree_outputs[:, count] = temp

        constant = np.ones((n_train, 1))  # added a constant
        tree_outputs = np.concatenate((constant, tree_outputs), axis=1)
        scale = np.max(np.abs(tree_outputs))
        tree_outputs = tree_outputs / scale
        epsilon = (
            np.eye(tree_outputs.shape[1]) * 1e-6
        )  # add to the matrix to prevent singular matrrix
        yy = np.array(y)
        yy.shape = (yy.shape[0], 1)
        _beta = np.linalg.inv(
            np.matmul(tree_outputs.transpose(), tree_outputs) + epsilon
        )
        _beta = np.matmul(_beta, np.matmul(tree_outputs.transpose(), yy))
        output = np.matmul(tree_outputs, _beta)
        # rescale the beta, above we scale tree_outputs for calculation by fwl
        _beta /= scale

        total = 0
        accepted = 0
        errs = []
        total_list = []

        tic = time.time()

        if self.show_log:
            _logger.info("While total < ", self.val)
        while total < self.val:
            switch_label = False
            for count in range(k):
                curr_roots = []  # list of current components
                for i in np.arange(k):
                    curr_roots.append(root_lists[i][-1])
                # pick the root to be changed
                sigma_a = sigma_a_list[count]
                sigma_b = sigma_b_list[count]

                # the returned root is a new copy
                if self.show_log:
                    _logger.info("new_prop...")
                res, root, sigma_y, sigma_a, sigma_b = prop_new(
                    curr_roots,
                    count,
                    sigma_y,
                    beta,
                    sigma_a,
                    sigma_b,
                    X,
                    y,
                    ops_name_lst,
                    ops_weight_lst,
                    ops_priors,
                )
                if self.show_log:
                    _logger.info("res:", res)
                    print(root)

                total += 1
                # update sigma_a and sigma_b
                sigma_a_list[count] = sigma_a
                sigma_b_list[count] = sigma_b

                if res:
                    # flag = False
                    accepted += 1
                    # record newly accepted root
                    root_lists[count].append(copy.deepcopy(root))

                    tree_outputs = np.zeros((n_train, k))

                    for i in np.arange(k):
                        temp = root_lists[count][-1].evaluate(X)
                        temp.shape = temp.shape[0]
                        tree_outputs[:, i] = temp

                    constant = np.ones((n_train, 1))
                    tree_outputs = np.concatenate((constant, tree_outputs), axis=1)
                    scale = np.max(np.abs(tree_outputs))
                    tree_outputs = tree_outputs / scale
                    epsilon = (
                        np.eye(tree_outputs.shape[1]) * 1e-6
                    )  # add to prevent singular matrix
                    yy = np.array(y)
                    yy.shape = (yy.shape[0], 1)
                    _beta = np.linalg.inv(
                        np.matmul(tree_outputs.transpose(), tree_outputs) + epsilon
                    )
                    _beta = np.matmul(
                        _beta, np.matmul(tree_outputs.transpose(), yy)
                    )

                    output = np.matmul(tree_outputs, _beta)
                    # rescale the beta, above we scale tree_outputs for calculation
                    _beta /= scale

                    error = 0
                    for i in np.arange(n_train):
                        error += (output[i, 0] - y[i]) * (output[i, 0] - y[i])

                    rmse = np.sqrt(error / n_train)
                    errs.append(rmse)

                    total_list.append(total)
                    total = 0

                if len(errs) > 100:
                    lapses = min(10, len(errs))
                    converge_ratio = 1 - np.min(errs[-lapses:]) / np.mean(
                        errs[-lapses:]
                    )
                    if converge_ratio < 0.05:
                        # converged
                        switch_label = True
                        break
            if switch_label:
                break

        if self.show_log:
            for i in np.arange(0, len(y)):
                _logger.info(output[i, 0], y[i])

        toc = time.time()
        tictoc = toc - tic
        if self.show_log:
            _logger.info("Run time: {:.2f}s".format(tictoc))

            _logger.info("------")
            _logger.info(
                "Mean rmse of last 5 accepts: {}".format(np.mean(errs[-6:-1]))
            )

        train_errs.append(errs)
        roots.append(curr_roots)
        betas.append(_beta)

    self.roots_ = roots
    self.train_errs_ = train_errs
    self.betas_ = betas
    self.X_, self.y_ = X, y
    return self

predict(X)

Applies the fitted model to a set of independent variables X, to give predictions for the dependent variable y.

Parameters:

Name Type Description Default
X Union[ndarray, DataFrame]

independent variables in an n-dimensional array

required

Returns: y: predicted dependent variable values

Source code in temp_dir/bsr/src/autora/theorist/bsr/regressor.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def predict(self, X: Union[np.ndarray, pd.DataFrame]) -> np.ndarray:
    """
    Applies the fitted model to a set of independent variables `X`,
    to give predictions for the dependent variable `y`.

    Arguments:
        X: independent variables in an n-dimensional array
    Returns:
        y: predicted dependent variable values
    """
    if isinstance(X, np.ndarray):
        X = pd.DataFrame(X)

    check_is_fitted(self, attributes=["roots_"])

    k = self.tree_num
    n_test = X.shape[0]
    tree_outs = np.zeros((n_test, k))

    assert self.roots_ and self.betas_
    for i in np.arange(k):
        tree_out = self.roots_[-self.last_idx][i].evaluate(X)
        tree_out.shape = tree_out.shape[0]
        tree_outs[:, i] = tree_out

    ones = np.ones((n_test, 1))
    tree_outs = np.concatenate((ones, tree_outs), axis=1)
    _beta = self.betas_[-self.last_idx]
    output = np.matmul(tree_outs, _beta)

    return output