Source code for xenonpy.datatools.splitter

#  Copyright (c) 2021. yoshida-lab. All rights reserved.
#  Use of this source code is governed by a BSD-style
#  license that can be found in the LICENSE file.

from typing import Union, Tuple, Iterable, List

import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from sklearn import utils
from sklearn.base import BaseEstimator
from sklearn.model_selection import train_test_split, KFold

__all__ = ['Splitter']


[docs]class Splitter(BaseEstimator): """ Data splitter for train and test """ def __init__(self, size: int, *, test_size: Union[float, int] = 0.2, k_fold: Union[int, Iterable, None] = None, random_state: Union[int, None] = None, shuffle: bool = True): """ Parameters ---------- size Total sample size. All data must have same length of their first dim, test_size If float, should be between ``0.0`` and ``1.0`` and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. Can be ``0`` if cv is ``None``. In this case, :meth:`~Splitter.cv` will yield a tuple only contains ``training`` and ``validation`` on each step. By default, the value is set to 0.2. k_fold Number of k-folds. If ``int``, Must be at least 2. If ``Iterable``, it should provide label for each element which will be used for group cv. In this case, the input of :meth:`~Splitter.cv` must be a :class:`pandas.DataFrame` object. Default value is None to specify no cv. random_state If int, random_state is the seed used by the random number generator; Default is None. shuffle Whether or not to shuffle the data before splitting. """ if k_fold is None and test_size == 0: raise RuntimeError('<test_size> can be zero only if <cv> is not none') self._k_fold = k_fold self._shuffle = shuffle self._test_size = test_size self._sample_size = np.arange(size) self._test: Union[np.ndarray, None] = None self._train: Union[np.ndarray, None] = None self._cv_indices: List[Tuple[np.ndarray, np.ndarray]] = [] self._random_state = random_state self.roll(random_state) @property def size(self): return self._sample_size.size @property def shuffle(self): return self._shuffle @property def test_size(self): return self._test_size @property def k_fold(self): return self._k_fold @property def random_state(self): return self._random_state
[docs] def roll(self, random_state: int = None): if self._test_size == 0: if self._shuffle: self._train = utils.shuffle(self._sample_size) else: self._train = self._sample_size else: if isinstance(self._k_fold, Iterable): k_fold_labels: pd.Series = pd.Series(self._k_fold).reset_index(drop=True) unique_labels = k_fold_labels.unique() test_size = round(unique_labels.size * self._test_size) if isinstance(self._test_size, float) else round(unique_labels.size * (self._test_size / self.size)) test_lables: pd.Series = pd.Series(unique_labels).sample(test_size, random_state=random_state) self._train, self._test = k_fold_labels[~k_fold_labels.isin(test_lables)].index.values, k_fold_labels[k_fold_labels.isin(test_lables)].index.values else: self._train, self._test = train_test_split(self._sample_size, test_size=self._test_size, random_state=random_state, shuffle=self._shuffle) if isinstance(self._k_fold, int): cv = KFold(n_splits=self._k_fold, shuffle=self._shuffle, random_state=random_state) for train, val in cv.split(self._train): self._cv_indices.append((self._train[train], self._train[val])) elif isinstance(self._k_fold, Iterable): tmp: pd.Series = pd.Series(self._k_fold).reset_index(drop=True).iloc[self._train] for g in set(tmp): val = tmp[tmp == g].index.values train = tmp[tmp != g].index.values self._cv_indices.append((train, val))
def _check_input(self, array): if isinstance(array, (list, tuple)): array = np.asarray(array) if not isinstance(array, (np.ndarray, pd.DataFrame, pd.Series)): raise TypeError( f'<arrays> must be list, numpy.ndarray, pandas.DataFrame, or pandas.Series but got {array.__class__}.' ) if array.shape[0] != self.size: raise ValueError( f'parameters <arrays> must have size {self.size} for dim 0 but got {array.shape[0]}' ) return array @staticmethod def _split(array, *idx): # all to np.array if isinstance(array, np.ndarray): return [array[i] for i in idx] if isinstance(array, (DataFrame, Series)): return [array.iloc[i] for i in idx]
[docs] def cv(self, *arrays, less_for_train=False): """ Split data with cross-validation. Parameters ---------- *arrays: DataFrame, Series, ndarray, list Data for split. Must be a Sequence of indexables with same length / shape[0]. If None, return the split indices. less_for_train: bool If true, use less data set for train. E.g. ``[1, 2, 3, 4, 5, 6, 7, 8, 9, 0]`` with 5 cv will be split into ``[1, 2]`` and ``[3, 4, 5, 6, 7, 8, 9, 0]``. Usually, ``[1, 2]`` (less one) will be used for val. With ``less_for_train=True``, ``[1, 2]`` will be used for train. Default is ``False``. Yields ------- tuple list containing split of inputs with cv. if inputs are None, only return the indices of split. if ``test_size`` is 0, test data/index will not return. """ if self._k_fold is None: raise RuntimeError('parameter <cv> must be set') for train, val in self._cv_indices: if less_for_train: tmp = train train = val val = tmp if len(arrays) == 0: if self._test is not None: yield train, val, self._test else: yield train, val else: ret = [] for array in arrays: array = self._check_input(array) if self._test is not None: ret.extend(self._split(array, train, val, self._test)) else: ret.extend(self._split(array, train, val)) yield tuple(ret) return
[docs] def split(self, *arrays: Union[np.ndarray, pd.DataFrame, pd.Series]): """ Split data. Parameters ---------- *arrays Dataset for split. Size of dim 0 must be equal to :meth:`~Splitter.size`. If None, return the split indices. Returns ------- tuple List containing split of inputs. if inputs are None, only return the indices of splits. if ``test_size`` is 0, test data/index will not return. """ if self._test is None: raise RuntimeError('split action is illegal because `test_size` is none') if len(arrays) == 0: return self._train, self._test ret = [] for array in arrays: array = self._check_input(array) ret.extend(self._split(array, self._train, self._test)) return tuple(ret)