import numpy as np, warnings, multiprocessing
from copy import deepcopy
from scipy.stats import mode
from joblib import Parallel, delayed
try:
from ._vwrapper import c_calc_v
except ImportError:
pass
#### Helper functions
def _check_2d_inp(X, reshape = False):
X = np.require(X, requirements=["ENSUREARRAY"])
if reshape:
if len(X.shape) == 1:
X = X.reshape((-1, 1))
return X
def _check_fit_input(X, C):
X = _check_2d_inp(X, reshape = True)
C = _check_2d_inp(C, reshape = False)
assert X.shape[0] == C.shape[0]
assert C.shape[1] > 2
C = np.require(C, dtype=np.float64, requirements=["ENSUREARRAY", "C_CONTIGUOUS"])
return X, C
def _standardize_weights(w):
return w * (w.shape[0] / w.sum())
def _check_njobs(njobs):
if njobs < 1:
njobs = multiprocessing.cpu_count()
if njobs is None:
return 1
assert isinstance(njobs, int)
assert njobs >= 1
return njobs
[docs]
class WeightedAllPairs:
"""
Weighted All-Pairs for Cost-Sensitive Classification
Note
----
This implementation also offers the option of weighting each observation
in a pairwise comparison according to the absolute difference in costs
between the two labels. Even though such a method might not enjoy theoretical
bounds on its regret or error, in practice, it can produce better results
than the weighting schema proposed in [1] and [2]
Parameters
----------
base_classifier : object
Base binary classification algorithm. Must have:
* A fit method of the form 'base_classifier.fit(X, y, sample_weights = w)'.
* A predict method.
weight_simple_diff : bool
Whether to weight each sub-problem according to the absolute difference in
costs between labels, or according to the formula described in [1] (See Note)
njobs : int
Number of parallel jobs to run. If it's a negative number, will take the maximum available
number of CPU cores. Note that making predictions with multiple jobs will require a **lot** more
memory. Can also be set after the object has already been initialized.
Attributes
----------
nclasses : int
Number of classes on the data in which it was fit.
classifiers : list of objects
Classifier that compares each two classes. Classes i and j out of n classes, with i<j,
are compared by the classifier at index i*(n-(i+1)/2)+j-i-1.
weight_simple_diff : bool
Whether each sub-problem was weighted according to the absolute difference in
costs between labels, or according to the formula described in [1]
base_classifier : object
Unfitted base regressor that was originally passed.
References
----------
.. [1] Beygelzimer, A., Dani, V., Hayes, T., Langford, J., & Zadrozny, B. (2005)
Error limiting reductions between classification tasks.
.. [2] Beygelzimer, A., Langford, J., & Zadrozny, B. (2008).
Machine learning techniques-reductions between prediction quality metrics.
"""
def __init__(self, base_classifier, weigh_by_cost_diff = True, njobs = -1):
self.base_classifier = base_classifier
self.weigh_by_cost_diff = weigh_by_cost_diff
self.njobs = _check_njobs(njobs)
[docs]
def fit(self, X, C):
"""
Fit one classifier comparing each pair of classes
Parameters
----------
X : array (n_samples, n_features)
The data on which to fit a cost-sensitive classifier.
C : array (n_samples, n_classes)
The cost of predicting each label for each observation (more means worse).
"""
X, C = _check_fit_input(X, C)
self.nclasses = C.shape[1]
ncombs = int( self.nclasses * (self.nclasses - 1) / 2 )
self.classifiers = [ deepcopy(self.base_classifier) for c in range(ncombs) ]
self.classes_compared = [None for i in range(ncombs)]
if self.weigh_by_cost_diff:
V = C
else:
V = self._calculate_v(C)
V = np.require(V, dtype=np.float64, requirements=["ENSUREARRAY", "F_CONTIGUOUS"])
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")\
( delayed(self._fit)(i, j, V, X) for i in range(self.nclasses - 1) for j in range(i + 1, self.nclasses) )
self.classes_compared = np.array(self.classes_compared)
return self
def _fit(self, i, j, V, X):
y = (V[:, i] < V[:, j]).astype('uint8')
w = np.abs(V[:, i] - V[:, j])
valid_cases = w > 0
X_take = X[valid_cases, :]
y_take = y[valid_cases]
w_take = w[valid_cases]
w_take = _standardize_weights(w_take)
ix = self._get_comb_index(i, j)
self.classes_compared[ix] = (j, i)
self.classifiers[ix].fit(X_take, y_take, sample_weight=w_take)
[docs]
def decision_function(self, X, method='most-wins'):
"""
Calculate a 'goodness' distribution over labels
Note
----
Predictions can be calculated either by counting which class wins the most
pairwise comparisons (as in [1] and [2]), or - for classifiers with a 'predict_proba'
method - by taking into account also the margins of the prediction difference
for one class over the other for each comparison.
If passing method = 'most-wins', this 'decision_function' will output the proportion
of comparisons that each class won. If passing method = 'goodness', it sums the
outputs from 'predict_proba' from each pairwise comparison and divides it by the
number of comparisons.
Using method = 'goodness' requires the base classifier to have a 'predict_proba' method.
Parameters
----------
X : array (n_samples, n_features)
Data for which to predict the cost of each label.
method : str, either 'most-wins' or 'goodness':
How to decide the best label (see Note)
Returns
-------
pred : array (n_samples, n_classes)
A goodness score (more is better) for each label and observation.
If passing method='most-wins', it counts the proportion of comparisons
that each class won.
If passing method='goodness', it sums the outputs from 'predict_proba' from
each pairwise comparison and divides it by the number of comparisons.
References
----------
.. [1] Beygelzimer, A., Dani, V., Hayes, T., Langford, J., & Zadrozny, B. (2005)
Error limiting reductions between classification tasks.
.. [2] Beygelzimer, A., Langford, J., & Zadrozny, B. (2008).
Machine learning techniques-reductions between prediction quality metrics.
"""
X = _check_2d_inp(X, reshape = True)
if method == 'most-wins':
return self._decision_function_winners(X)
elif method == 'goodness':
return self._decision_function_goodness(X)
else:
raise ValueError("method must be one of 'most-wins' or 'goodness'.")
[docs]
def predict(self, X, method = 'most-wins'):
"""
Predict the less costly class for a given observation
Note
----
Predictions can be calculated either by counting which class wins the most
pairwise comparisons (as in [1] and [2]), or - for classifiers with a 'predict_proba'
method - by taking into account also the margins of the prediction difference
for one class over the other for each comparison.
Using method = 'goodness' requires the base classifier to have a 'predict_proba' method.
Parameters
----------
X : array (n_samples, n_features)
Data for which to predict minimum cost label.
method : str, either 'most-wins' or 'goodness':
How to decide the best label (see Note)
Returns
-------
y_hat : array (n_samples,)
Label with expected minimum cost for each observation.
References
----------
.. [1] Beygelzimer, A., Dani, V., Hayes, T., Langford, J., & Zadrozny, B. (2005)
Error limiting reductions between classification tasks.
.. [2] Beygelzimer, A., Langford, J., & Zadrozny, B. (2008).
Machine learning techniques-reductions between prediction quality metrics.
"""
X = _check_2d_inp(X, reshape = True)
if method == 'most-wins':
return self._predict_winners(X)
elif method == 'goodness':
goodness = self._decision_function_goodness(X)
if (len(goodness.shape) == 1) or (goodness.shape[0] == 1):
return np.argmax(goodness)
else:
return np.argmax(goodness, axis=1)
else:
raise ValueError("method must be one of 'most-wins' or 'goodness'.")
def _predict_winners(self, X):
winners = np.empty((X.shape[0], len(self.classifiers)), dtype = "int64")
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._predict_winners_single)(c, winners, X) for c in range(len(self.classifiers)))
winners = mode(winners, axis=1)[0].reshape(-1).astype("int64")
if winners.shape[0] == 1:
return winners[0]
else:
return winners
def _predict_winners_single(self, c, winners, X):
winners[:, c] = self.classes_compared[np.repeat(c, X.shape[0]), self.classifiers[c].predict(X).reshape(-1)]
def _decision_function_goodness(self, X):
if 'predict_proba' not in dir(self.classifiers[0]):
raise Exception("'goodness' method requires a classifier with 'predict_proba' method.")
if self.njobs > 1:
goodness = np.zeros((len(self.classifiers), X.shape[0], self.nclasses))
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._decision_function_goodness_single)(c, goodness, X) for c in range(len(self.classifiers)))
return goodness.mean(axis = 0)
else:
goodness = np.zeros((X.shape[0], self.nclasses))
for c in range(len(self.classifiers)):
comp = self.classifiers[c].predict_proba(X)
goodness[:, int(self.classes_compared[c, 0])] += comp[:, 0]
goodness[:, int(self.classes_compared[c, 1])] += comp[:, 1]
return goodness / len(self.classifiers)
def _decision_function_goodness_single(self, c, goodness, X):
comp = self.classifiers[c].predict_proba(X)
goodness[c, :, int(self.classes_compared[c, 0])] += comp[:, 0]
goodness[c, :, int(self.classes_compared[c, 1])] += comp[:, 1]
def _decision_function_winners(self, X):
if self.njobs > 1:
winners = np.zeros((len(self.classifiers), X.shape[0], self.nclasses))
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._decision_function_winners_single)(c, winners, X) for c in range(len(self.classifiers)))
return winners.mean(axis = 0)
else:
winners = np.zeros((X.shape[0], self.nclasses))
for c in range(len(self.classifiers)):
round_comp = self.classes_compared[np.repeat(c, X.shape[0]), self.classifiers[c].predict(X).reshape(-1).astype("int64")]
winners[np.arange(X.shape[0]), round_comp] += 1
return winners / len(self.classifiers)
def _decision_function_winners_single(self, c, winners, X):
round_comp = self.classes_compared[np.repeat(c, X.shape[0]), self.classifiers[c].predict(X).reshape(-1).astype("int64")]
winners[c][np.arange(X.shape[0]), round_comp] += 1
def _calculate_v(self, C):
try:
return c_calc_v(np.require(C, dtype=np.float64, requirements=["ENSUREARRAY", "C_CONTIGUOUS"]), self.njobs)
except NameError:
V = np.empty((C.shape[0], C.shape[1]), dtype = "float64")
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(WeightedAllPairs._calculate_v_single)(None, row, V, C) for row in range(C.shape[0]))
return V
def _calculate_v_single(self, row, V, C):
cost = C[row].copy()
out_order = np.argsort(cost)
cost = cost[out_order] - cost.min()
n = cost.shape[0]
v = np.zeros(n)
rectangle_width = np.diff(cost)
rectangle_height = 1 / ( np.arange(n - 1) + 1 )
v[1: ] = rectangle_width * rectangle_height
V[row] = np.cumsum(v)[ np.argsort(out_order) ]
def _get_comb_index(self, i, j):
return int( i * (self.nclasses - (i + 1) / 2) + j - i - 1 )
class _BinTree:
# constructs a balanced binary tree
# keeps track of which nodes compare which classes
# node_comparisons -> [all nodes, nodes to the left]
# childs -> [child left, child right]
# terminal nodes are negative numbers
# non-terminal nodes refer to the index in 'node_comparisons' for next comparison
def __init__(self,n):
self.n_arr=np.arange(n)
self.node_comparisons=[[None,None,None] for i in range(n-1)]
self.node_counter=0
self.childs=[[None,None] for i in range(n-1)]
self.parents=[None for i in range(n-1)]
self.isterminal=set()
split_point=int(np.ceil(self.n_arr.shape[0]/2))
self.node_comparisons[0][0]=list(self.n_arr)
self.node_comparisons[0][1]=list(self.n_arr[:split_point])
self.node_comparisons[0][2]=list(self.n_arr[split_point:])
self.split_arr(self.n_arr[:split_point],0,True)
self.split_arr(self.n_arr[split_point:],0,False)
self.isterminal=list(self.isterminal)
self.is_at_bottom=[i for i in range(len(self.childs)) if (self.childs[i][0]<=0) and (self.childs[i][1]<=0)]
def split_arr(self,arr,parent_node,direction_left):
if arr.shape[0]==1:
if direction_left:
self.childs[parent_node][0]=-arr[0]
else:
self.childs[parent_node][1]=-arr[0]
self.isterminal.add(parent_node)
return None
self.node_counter+=1
curr_node=self.node_counter
if direction_left:
self.childs[parent_node][0]=curr_node
else:
self.childs[parent_node][1]=curr_node
self.parents[curr_node]=parent_node
split_point=int(np.ceil(arr.shape[0]/2))
self.node_comparisons[curr_node][0]=list(arr)
self.node_comparisons[curr_node][1]=list(arr[:split_point])
self.node_comparisons[curr_node][2]=list(arr[split_point:])
self.split_arr(arr[:split_point],curr_node,True)
self.split_arr(arr[split_point:],curr_node,False)
return None
[docs]
class FilterTree:
"""
Filter-Tree for Cost-Sensitive Multi-Class classification
Parameters
----------
base_classifier : object
Base binary classification algorithm. Must have:
* A fit method of the form 'base_classifier.fit(X, y, sample_weights = w)'.
* A predict method.
njobs : int
Number of parallel jobs to run. If it's a negative number, will take the maximum available
number of CPU cores. Parallelization is only for predictions, not for training.
Attributes
----------
nclasses : int
Number of classes on the data in which it was fit.
classifiers : list of objects
Classifier that compares each two classes belonging to a node.
tree : object
Binary tree with attributes childs and parents.
Non-negative numbers for children indicate non-terminal nodes,
while negative and zero indicates a class (terminal node).
Root is the node zero.
base_classifier : object
Unfitted base regressor that was originally passed.
References
----------
.. [1] Beygelzimer, A., Langford, J., & Ravikumar, P. (2007).
Multiclass classification with filter trees.
"""
def __init__(self, base_classifier, njobs = -1):
self.base_classifier = base_classifier
self.njobs = _check_njobs(njobs)
[docs]
def fit(self, X, C):
"""
Fit a filter tree classifier
Note
----
Shifting the order of the classes within the cost array will produce different
results, as it will build a different binary tree comparing different classes
at each node.
Parameters
----------
X : array (n_samples, n_features)
The data on which to fit a cost-sensitive classifier.
C : array (n_samples, n_classes)
The cost of predicting each label for each observation (more means worse).
"""
X,C = _check_fit_input(X,C)
C = np.require(C, dtype=np.float64, requirements=["ENSUREARRAY", "F_CONTIGUOUS"])
nclasses=C.shape[1]
self.tree=_BinTree(nclasses)
self.classifiers=[deepcopy(self.base_classifier) for c in range(nclasses-1)]
classifier_queue=self.tree.is_at_bottom
next_round=list()
already_fitted=set()
labels_take=-np.ones((X.shape[0],len(self.classifiers)))
while True:
for c in classifier_queue:
if c in already_fitted or (c is None):
continue
child1, child2 = self.tree.childs[c]
if (child1>0) and (child1 not in already_fitted):
continue
if (child2>0) and (child2 not in already_fitted):
continue
if child1<=0:
class1=-np.repeat(child1,X.shape[0]).astype("int64")
else:
class1=labels_take[:, child1].astype("int64")
if child2<=0:
class2=-np.repeat(child2,X.shape[0]).astype("int64")
else:
class2=labels_take[:, child2].astype("int64")
cost1=C[np.arange(X.shape[0]),np.clip(class1,a_min=0,a_max=None)]
cost2=C[np.arange(X.shape[0]),np.clip(class2,a_min=0,a_max=None)]
y=(cost1<cost2).astype('uint8')
w=np.abs(cost1-cost2)
valid_obs=w>0
if child1>0:
valid_obs=valid_obs&(labels_take[:,child1]>=0)
if child2>0:
valid_obs=valid_obs&(labels_take[:,child2]>=0)
X_take=X[valid_obs,:]
y_take=y[valid_obs]
w_take=w[valid_obs]
w_take=_standardize_weights(w_take)
self.classifiers[c].fit(X_take,y_take,sample_weight=w_take)
labels_arr=np.c_[class1,class2].astype("int64")
labels_take[valid_obs,c]=labels_arr[np.repeat(0,X_take.shape[0]),\
self.classifiers[c].predict(X_take).reshape(-1).astype('uint8')]
already_fitted.add(c)
next_round.append(self.tree.parents[c])
if c==0 or (len(classifier_queue)==0):
break
classifier_queue=list(set(next_round))
next_round=list()
if (len(classifier_queue)==0):
break
return self
[docs]
def predict(self, X):
"""
Predict the less costly class for a given observation
Note
----
The implementation here happens in a Python loop rather than in some
NumPy array operations, thus it will be slower than the other algorithms
here, even though in theory it implies fewer comparisons.
Parameters
----------
X : array (n_samples, n_features)
Data for which to predict minimum cost label.
method : str, either 'most-wins' or 'goodness':
How to decide the best label (see Note)
Returns
-------
y_hat : array (n_samples,)
Label with expected minimum cost for each observation.
"""
X = _check_2d_inp(X, reshape = True)
if X.shape[0] == 1:
return self._predict(X)
else:
shape_single = list(X.shape)
shape_single[0] = 1
pred = np.empty(X.shape[0], dtype = "int64")
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._predict)(row, pred, shape_single, X) for row in range(X.shape[0]))
return pred
def _predict(self, row, pred, shape_single, X):
curr_node = 0
X_single = X[row].reshape(shape_single)
while True:
go_right = self.classifiers[curr_node].predict(X_single)
if go_right:
curr_node = self.tree.childs[curr_node][0]
else:
curr_node = self.tree.childs[curr_node][1]
if curr_node <= 0:
pred[row] = -curr_node
return None
[docs]
class CostProportionateClassifier:
"""
Cost-Proportionate Rejection Sampling
Turns a binary classifier with no native sample weighting method into a
binary classifier that supports sample weights.
Parameters
----------
base_classifier : object
Binary classifier used for predicting in each sample. Must have:
* A fit method of the form 'base_classifier.fit(X, y)'.
* A predict method.
n_samples : int
Number of samples taken. One classifier is fit per sample.
njobs : int
Number of parallel jobs to run. If it's a negative number, will take the maximum available
number of CPU cores.
random_state : None, int, RandomState, or Generator
Seed or object to use for random number generation. If passing an integer,
will be used as seed, otherwise, if passing a numpy ``Generator`` or
``RandomState``, will use it directly.
Attributes
----------
n_samples : int
Number of samples taken. One classifier is fit per sample.
classifiers : list of objects
Classifier that was fit to each sample.
base_classifier : object
Unfitted base classifier that was originally passed.
extra_rej_const : float
Extra rejection constant used for sampling (see 'fit' method).
References
----------
.. [1] Beygelzimer, A., Langford, J., & Zadrozny, B. (2008).
Machine learning techniques-reductions between prediction quality metrics.
"""
def __init__(self, base_classifier, n_samples=10, extra_rej_const=1e-1,
njobs = -1, random_state = None):
self.base_classifier = base_classifier
self.n_samples = n_samples
self.extra_rej_const = extra_rej_const
self.njobs = _check_njobs(njobs)
if isinstance(random_state, float):
random_state = int(random_state)
if isinstance(random_state, int):
self.random_state = np.random.default_rng(random_state)
elif random_state is None:
self.random_state = np.random.default_rng()
else:
if not isinstance(random_state, np.random.Generator) \
and not isinstance(random_state, np.random.RandomState) \
and (random_state != np.random):
raise ValueError("Received invalid 'random_state'.")
self.random_state = random_state
[docs]
def fit(self, X, y, sample_weight=None):
"""
Fit a binary classifier with sample weights to data.
Note
----
Examples at each sample are accepted with probability = weight/Z,
where Z = max(weight) + extra_rej_const.
Larger values for extra_rej_const ensure that no example gets selected in
every single sample, but results in smaller sample sizes as more examples are rejected.
Parameters
----------
X : array (n_samples, n_features)
Data on which to fit the model.
y : array (n_samples,) or (n_samples, 1)
Class of each observation.
sample_weight : array (n_samples,) or (n_samples, 1)
Weights indicating how important is each observation in the loss function.
"""
assert self.extra_rej_const >= 0
if sample_weight is None:
sample_weight = np.ones(y.shape[0])
else:
if isinstance(sample_weight, list):
sample_weight = np.array(sample_weight)
if len(sample_weight.shape):
sample_weight = sample_weight.reshape(-1)
assert sample_weight.shape[0] == X.shape[0]
assert sample_weight.min() > 0
Z = sample_weight.max() + self.extra_rej_const
sample_weight = sample_weight / Z # sample weight is now acceptance prob
self.classifiers = [deepcopy(self.base_classifier) for c in range(self.n_samples)]
take_all = self.random_state.random(size = (self.n_samples, X.shape[0]))
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")\
(delayed(self._fit)(c, take_all, X, y, sample_weight) \
for c in range(self.n_samples))
return self
def _fit(self, c, take_all, X, y, sample_weight):
take = take_all[c] <= sample_weight
self.classifiers[c].fit(X[take, :], y[take])
[docs]
def decision_function(self, X, aggregation = 'raw'):
"""
Calculate how preferred is positive class according to classifiers
Note
----
If passing aggregation = 'raw', it will output the proportion of the classifiers
that voted for the positive class.
If passing aggregation = 'weighted', it will output the average predicted probability
for the positive class for each classifier.
Calculating it with aggregation = 'weighted' requires the base classifier to have a
'predict_proba' method.
Parameters
----------
X : array (n_samples, n_features):
Observations for which to determine class likelihood.
aggregation : str, either 'raw' or 'weighted'
How to compute the 'goodness' of the positive class (see Note)
Returns
-------
pred : array (n_samples,)
Score for the positive class (see Note)
"""
if aggregation == 'weighted':
if 'predict_proba' not in dir(self.classifiers[0]):
raise Exception("'aggregation='weighted'' is only available for classifiers with 'predict_proba' method.")
preds = np.empty((X.shape[0], self.n_samples), dtype = "float64")
if aggregation == "raw":
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._decision_function_raw)(c, preds, X) for c in range(self.nsamples))
elif aggregation == "weighted":
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._decision_function_weighted)(c, preds, X) for c in range(self.nsamples))
else:
raise ValueError("'aggregation' must be one of 'raw' or 'weighted'.")
return preds.mean(axis = 1).reshape(-1)
def _decision_function_raw(self, c, preds, X):
preds[c, :] = self.classifiers[c].predict(X).reshape(-1)
def _decision_function_weighted(self, c, preds, X):
preds[c, :] = self.classifiers[c].predict_proba(X)[:, 1].reshape(-1)
[docs]
def predict(self, X, aggregation = 'raw'):
"""
Predict the class of an observation
Note
----
If passing aggregation = 'raw', it will output the class that most classifiers outputted,
breaking ties by predicting the positive class.
If passing aggregation = 'weighted', it will weight each vote from a classifier according
to the probabilities predicted.
Predicting with aggregation = 'weighted' requires the base classifier to have a
'predict_proba' method.
Parameters
----------
X : array (n_samples, n_features):
Observations for which to predict their class.
aggregation : str, either 'raw' or 'weighted'
How to compute the 'goodness' of the positive class (see Note)
Returns
-------
pred : array (n_samples,)
Predicted class for each observation.
"""
return ( self.decision_function(X,aggregation) >= .5 ).astype("int64")
[docs]
class WeightedOneVsRest:
"""
Weighted One-Vs-Rest Cost-Sensitive Classification
Note
----
This will convert the problem into one sub-problem per class.
If passing weight_simple_diff=True, the observations for each subproblem
will be weighted according to the difference between the cost of the label being
predicted and the minimum cost of any other label.
If passing weight_simple_diff=False, they will be weighted according to the formula
described in [1], originally meant for the All-Pairs variant.
The predictions are taken to be the maximum value of the decision functions of
each One-Vs-Rest classifier. If the classifier has no method 'decision_function' or
'predict_proba', it will output the class that whatever classifier considered correct,
breaking ties by choosing the smallest index.
Parameters
----------
base_classifier : object
Base binary classification algorithm. Must have:
* A fit method of the form 'base_classifier.fit(X, y, sample_weight = w)'.
* A predict method.
weight_simple_diff : bool
Whether to weight each sub-problem according to the absolute difference in
costs between labels, or according to the formula described in [1] (See Note)
njobs : int
Number of parallel jobs to run. If it's a negative number, will take the maximum available
number of CPU cores.
Attributes
----------
nclasses : int
Number of classes on the data in which it was fit.
classifiers : list of objects
Classifier that predicts each class.
weight_simple_diff : bool
Whether each sub-problem was weighted according to the absolute difference in
costs between labels, or according to the formula described in [1].
base_classifier : object
Unfitted base regressor that was originally passed.
References
----------
.. [1] Beygelzimer, A., Dani, V., Hayes, T., Langford, J., & Zadrozny, B. (2005, August).
Error limiting reductions between classification tasks.
"""
def __init__(self, base_classifier, weight_simple_diff = False, njobs = -1):
self.base_classifier = base_classifier
self.weight_simple_diff = weight_simple_diff
self.njobs = _check_njobs(njobs)
[docs]
def fit(self, X, C):
"""
Fit one weighted classifier per class
Parameters
----------
X : array (n_samples, n_features)
The data on which to fit a cost-sensitive classifier.
C : array (n_samples, n_classes)
The cost of predicting each label for each observation (more means worse).
"""
X, C = _check_fit_input(X, C)
C = np.require(C, dtype=np.float64, requirements=["ENSUREARRAY", "F_CONTIGUOUS"])
self.nclasses = C.shape[1]
self.classifiers = [deepcopy(self.base_classifier) for i in range(self.nclasses)]
if not self.weight_simple_diff:
C = WeightedAllPairs._calculate_v(self, C)
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._fit)(c, X, C) for c in range(self.nclasses))
return self
def _fit(self, c, X, C):
cols_rest = [i for i in range(self.nclasses)]
del cols_rest[c]
cost_choice = C[:, c]
cost_others = C[:, cols_rest].min(axis = 1)
w = np.abs(cost_choice - cost_others)
y = ( cost_choice < cost_others ).astype('uint8')
valid_cases = w > 0
X_take = X[valid_cases, :]
y_take = y[valid_cases]
w_take = w[valid_cases]
w_take = _standardize_weights(w_take)
self.classifiers[c].fit(X_take, y_take, sample_weight = w_take)
[docs]
def decision_function(self, X):
"""
Calculate a 'goodness' distribution over labels
Parameters
----------
X : array (n_samples, n_features)
Data for which to predict the cost of each label.
Returns
-------
pred : array (n_samples, n_classes)
A goodness score (more is better) for each label and observation.
If passing apply_softmax=True, these are standardized to sum up to 1 (per row).
"""
X = _check_2d_inp(X)
preds = np.empty((X.shape[0], self.nclasses))
available_methods = dir(self.classifiers[0])
if "decision_function" in available_methods:
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._decision_function_decision_function)(c, preds, X) for c in range(self.nclasses))
apply_softmax = True
elif "predict_proba" in available_methods:
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._decision_function_predict_proba)(c, preds, X) for c in range(self.nclasses))
apply_softmax = False
elif "predict" in available_methods:
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self.decision_function_predict)(c, preds, X) for c in range(self.nclasses))
apply_softmax = False
else:
raise ValueError("'base_classifier' must have at least one of 'decision_function', 'predict_proba', 'Predict'.")
if apply_softmax:
preds = np.exp(preds - preds.max(axis=1).reshape((-1, 1)))
preds = preds / preds.sum(axis=1).reshape((-1, 1))
return preds
def _decision_function_decision_function(self, c, preds, X):
preds[:, c] = self.classifiers[c].decision_function(X).reshape(-1)
def _decision_function_predict_proba(self, c, preds, X):
preds[:, c] = self.classifiers[c].predict_proba(X)[:, 1].reshape(-1)
[docs]
def decision_function_predict(self, c, preds, X):
preds[:, c] = self.classifiers[c].predict(X).reshape(-1)
[docs]
def predict(self, X):
"""
Predict the less costly class for a given observation
Parameters
----------
X : array (n_samples, n_features)
Data for which to predict minimum cost label.
Returns
-------
y_hat : array (n_samples,)
Label with expected minimum cost for each observation.
"""
X = _check_2d_inp(X)
return np.argmax(self.decision_function(X), axis=1)
[docs]
class RegressionOneVsRest:
"""
Regression One-Vs-Rest
Fits one regressor trying to predict the cost of each class.
Predictions are the class with the minimum predicted cost across regressors.
Parameters
----------
base_regressor : object
Regressor to be used for the sub-problems. Must have:
* A fit method of the form 'base_classifier.fit(X, y)'.
* A predict method.
njobs : int
Number of parallel jobs to run. If it's a negative number, will take the maximum available
number of CPU cores.
Attributes
----------
nclasses : int
Number of classes on the data in which it was fit.
regressors : list of objects
Regressor that predicts the cost of each class.
base_regressor : object
Unfitted base regressor that was originally passed.
References
----------
.. [1] Beygelzimer, A., Langford, J., & Zadrozny, B. (2008).
Machine learning techniques-reductions between prediction quality metrics.
"""
def __init__(self, base_regressor, njobs = -1):
self.base_regressor = base_regressor
self.njobs = _check_njobs(njobs)
[docs]
def fit(self, X, C):
"""
Fit one regressor per class
Parameters
----------
X : array (n_samples, n_features)
The data on which to fit a cost-sensitive classifier.
C : array (n_samples, n_classes)
The cost of predicting each label for each observation (more means worse).
"""
X, C = _check_fit_input(X, C)
C = np.require(C, dtype=np.float64, requirements=["ENSUREARRAY", "F_CONTIGUOUS"])
self.nclasses = C.shape[1]
self.regressors = [deepcopy(self.base_regressor) for i in range(self.nclasses)]
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._fit)(c, X, C) for c in range(self.nclasses))
return self
def _fit(self, c, X, C):
self.regressors[c].fit(X, C[:, c])
[docs]
def decision_function(self, X, apply_softmax = True):
"""
Get cost estimates for each observation
Note
----
If called with apply_softmax = False, this will output the predicted
COST rather than the 'goodness' - meaning, more is worse.
If called with apply_softmax = True, it will output one minus the softmax on the costs,
producing a distribution over the choices summing up to 1 where more is better.
Parameters
----------
X : array (n_samples, n_features)
Data for which to predict the cost of each label.
apply_softmax : bool
Whether to apply a softmax transform to the costs (see Note).
Returns
-------
pred : array (n_samples, n_classes)
Either predicted cost or a distribution of 'goodness' over the choices,
according to the apply_softmax argument.
"""
X = _check_2d_inp(X, reshape = True)
preds = np.empty((X.shape[0], self.nclasses), dtype = "float64")
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._decision_function)(c, preds, X) for c in range(self.nclasses))
if not apply_softmax:
return preds
else:
preds = np.exp(preds - preds.max(axis=1).reshape((-1, 1)))
preds = preds/ preds.sum(axis=1).reshape((-1, 1))
return 1 - preds
def _decision_function(self, c, preds, X):
preds[:, c] = self.regressors[c].predict(X).reshape(-1)
[docs]
def predict(self, X):
"""
Predict the less costly class for a given observation
Parameters
----------
X : array (n_samples, n_features)
Data for which to predict minimum cost labels.
Returns
-------
y_hat : array (n_samples,)
Label with expected minimum cost for each observation.
"""
X = _check_2d_inp(X)
return np.argmin(self.decision_function(X, False), axis=1)