Source code for xenonpy.inverse.iqspr.estimator

#  Copyright (c) 2022. yoshida-lab. All rights reserved.
#  Use of this source code is governed by a BSD-style
#  license that can be found in the LICENSE file.

from copy import deepcopy
from types import MethodType
from typing import Union

import numpy as np
import pandas as pd
from scipy.stats import norm
from sklearn.base import BaseEstimator
from sklearn.linear_model import BayesianRidge

from xenonpy.descriptor.base import BaseDescriptor, BaseFeaturizer
from xenonpy.inverse.base import BaseLogLikelihood


[docs]class GaussianLogLikelihood(BaseLogLikelihood): def __init__(self, descriptor: Union[BaseFeaturizer, BaseDescriptor], *, targets={}, **estimators: BaseEstimator): """ Gaussian loglikelihood. Parameters ---------- descriptor: BaseFeaturizer or BaseDescriptor Descriptor calculator. estimators: BaseEstimator Gaussian estimators follow the scikit-learn style. These estimators must provide a method named ``predict`` which accesses descriptors as input and returns ``(mean, std)`` in order. By default, BayesianRidge_ will be used. .. _BayesianRidge: https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.BayesianRidge.html#sklearn-linear-model-bayesianridge targets: dictionary Upper and lower bounds for each property to calculate the Gaussian CDF probability """ if estimators: self._mdl = deepcopy(estimators) else: self._mdl = {} if not isinstance(descriptor, (BaseFeaturizer, BaseDescriptor)): raise TypeError('<descriptor> must be a subclass of <BaseFeaturizer> or <BaseDescriptor>') self.descriptor = descriptor self.descriptor.on_errors = 'nan' self.targets = deepcopy(targets) def __getitem__(self, item): return self._mdl[item] def __setitem__(self, key, value): if not (hasattr(value, 'predict') and isinstance(value.predict, MethodType)): raise TypeError('estimator must be a regressor in scikit-learn style') self._mdl[key] = deepcopy(value)
[docs] def update_targets(self, *, reset=False, **targets): """ Update/set the target area. Parameters ---------- reset: bool If ``true``, reset target area. targets: tuple[float, float] Target area. Should be a tuple which have down and up boundary. e.g: ``target1=(10, 20)`` equal to ``target1 should in range [10, 20]``. """ if reset: self.targets = {} for k, v in targets.items(): if not isinstance(v, tuple) or len(v) != 2 or v[1] <= v[0]: raise ValueError('must be a tuple with (low, up) boundary') self.targets[k] = v
[docs] def remove_estimator(self, *properties: str): """ Remove estimators from estimator set. Parameters ---------- properties : str The name of properties will be removed from estimator set. """ if not properties: self._mdl = {} self.targets = {} else: for p in properties: del self._mdl[p] del self.targets[p]
[docs] def predict(self, smiles, **kwargs): fps = self.descriptor.transform(smiles, return_type='df') fps_ = fps.dropna() tmp = {} for k, v in self._mdl.items(): if isinstance(v, BayesianRidge): tmp[k + ': mean'], tmp[k + ': std'] = v.predict(fps_, return_std=True) else: tmp[k + ': mean'], tmp[k + ': std'] = v.predict(fps_, **kwargs) tmp = pd.DataFrame(data=tmp, index=fps_.index) return pd.DataFrame(data=tmp, index=fps.index)
# todo: implement scale function
[docs] def fit(self, smiles, y=None, *, X_scaler=None, y_scaler=None, **kwargs): """ Default - automatically remove NaN data rows Parameters ---------- smiles: list[str] SMILES for training. y: pandas.DataFrame Target properties for training. X_scaler: Scaler (optional, not implement) Scaler for transform X. y_scaler: Scaler (optional, not implement) Scaler for transform y. kwargs: dict Parameters pass to BayesianRidge initialization. """ if self._mdl: raise RuntimeError('estimators have been set.' 'If you want to re-train these estimators,' 'please use `remove_estimator()` method first.') if not isinstance(y, (pd.DataFrame, pd.Series)): raise TypeError('please package all properties into a pd.DataFrame or pd.Series') # remove NaN from X desc = self.descriptor.transform(smiles, return_type='df').reset_index(drop=True) y = y.reset_index(drop=True) desc.dropna(inplace=True) y = pd.DataFrame(y.loc[desc.index]) for c in y: y_ = y[c] # get target property. # remove NaN from y_ y_.dropna(inplace=True) desc_ = desc.loc[y_.index] desc_ = desc_.values mdl = BayesianRidge(compute_score=True, **kwargs) mdl.fit(desc_, y_) self._mdl[c] = mdl
# log_likelihood returns a dataframe of log-likelihood values of each property & sample
[docs] def log_likelihood(self, smis, *, log_0=-1000.0, **targets): def _avoid_overflow(ll_): # log(exp(log(UP) - log(C)) - exp(log(LOW) - log(C))) + log(C) # where C = max(log(UP), max(LOW)) ll_c = np.max(ll_) ll_ = np.log(np.exp(ll_[1] - ll_c) - np.exp(ll_[0] - ll_c)) + ll_c return ll_ # self.update_targets(reset=False, **targets): for k, v in targets.items(): if not isinstance(v, tuple) or len(v) != 2 or v[1] <= v[0]: raise ValueError('must be a tuple with (low, up) boundary') self.targets[k] = v if not self.targets: raise RuntimeError('<targets> is empty') if isinstance(smis, (pd.Series, pd.DataFrame)): ll = pd.DataFrame(np.full((len(smis), len(self._mdl)), log_0), index=smis.index, columns=self._mdl.keys()) else: ll = pd.DataFrame(np.full((len(smis), len(self._mdl)), log_0), columns=self._mdl.keys()) # 1. apply prediction on given sims # 2. reset returns' index to [0, 1, ..., len(smis) - 1], this should be consistent with ll's index # 3. drop all rows which have NaN value(s) pred = self.predict(smis).reset_index(drop=True).dropna(axis='index', how='any') # because pred only contains available data # 'pred.index.values' should eq to the previous implementation idx = pred.index.values # calculate likelihood for k, (low, up) in self.targets.items(): # k: target; v: (low, up) # predict mean, std for all smiles mean, std = pred[k + ': mean'], pred[k + ': std'] # calculate low likelihood low_ll = norm.logcdf(low, loc=np.asarray(mean), scale=np.asarray(std)) # calculate up likelihood up_ll = norm.logcdf(up, loc=np.asarray(mean), scale=np.asarray(std)) # zip low and up likelihood to a 1-dim array then save it. # like: [(tar_low_smi1, tar_up_smi1), (tar_low_smi2, tar_up_smi2), ..., (tar_low_smiN, tar_up_smiN)] lls = zip(low_ll, up_ll) ll[k].iloc[idx] = np.array([*map(_avoid_overflow, list(lls))]) return ll