import paddle
import numpy as np
import os, sys
import warnings
from tqdm import tqdm
from interpretdl.common.file_utils import download_and_decompress
from .abc_interpreter import Interpreter
[docs]class TrainingDynamics():
"""
Training Dynamics Interpreter focus the behavior of each training sample by
running a normal SGD training process.
By recording the training dynamics, interpreter can diagnose dataset with
hand-designed features or by learning solution.
After interpretation on the level of data, we can handle better the datasets
with underlying label noises, thus can achieve a better performance on it.
More training dynamics based methods including [Forgetting Events]
and [Dataset Mapping] will be available in this interpreter.
"""
def __init__(self, paddle_model: callable, device: str = 'gpu:0'):
"""
Args:
paddle_model (callable): A model with :py:func:`forward` and possibly :py:func:`backward` functions.
device (str): The device used for running ``paddle_model``, options: ``"cpu"``, ``"gpu:0"``, ``"gpu:1"``
etc.
"""
self.device = device
self.paddle_model = paddle_model
assert self.device[:3] in ['cpu', 'gpu']
if not paddle.is_compiled_with_cuda() and self.device[:3] == 'gpu':
print("Paddle is not installed with GPU support. Change to CPU version now.")
self.device = 'cpu'
# globally set device.
paddle.set_device(self.device)
[docs] def generator(self,
train_loader: callable,
optimizer: paddle.optimizer,
epochs: int):
"""Run the training process and record the forgetting events statistics.
Args:
train_loader (callable): A training data generator.
optimizer (paddle.optimizer): The paddle optimizer.
epochs (int): The number of epochs to train the model.
Returns:
training_dynamics (dict): A pointwise training dynamics(history) for each epoch.
"""
self.paddle_model.train()
training_dynamics = {}
for i in range(epochs):
counter = 0
correct = 0
total = 0
for step_id, (indices, x_train, y_train) in enumerate(train_loader()):
if not isinstance(x_train[0], np.ndarray):
x_train = x_train.numpy()
x_train = paddle.to_tensor(x_train)
y_train = paddle.to_tensor(np.array(y_train).reshape((-1, 1)))
logits = self.paddle_model(x_train)
predicted = paddle.argmax(logits, axis=1).numpy()
bsz = len(predicted)
loss = paddle.nn.functional.softmax_with_cross_entropy(logits, y_train)
avg_loss = paddle.mean(loss)
y_train = y_train.reshape((bsz, )).numpy()
acc = (predicted == y_train).astype(int)
avg_loss.backward()
optimizer.step()
optimizer.clear_grad()
correct += np.sum(acc)
total += bsz
sys.stdout.write('\r')
sys.stdout.write('| Epoch [%3d/%3d] Iter[%3d]\t\tLoss: %.4f Acc@1: %.3f%%' %
(i + 1, epochs, step_id + 1, avg_loss.numpy().item(), 100. * correct / total))
sys.stdout.flush()
#record training dynamics information
with paddle.no_grad():
softmax = paddle.nn.Softmax()
training_dynamics_per_epoch = softmax(logits).detach().cpu().numpy()
if len(indices) == train_loader.batch_size:
for j,index in enumerate(indices):
index = index.item()
training_dynamics_previous = training_dynamics.get(index,[])
training_dynamics_previous.append(training_dynamics_per_epoch[j])
training_dynamics[index] = training_dynamics_previous
else:
for j,index in enumerate(indices):
index = index.item()
training_dynamics_previous = training_dynamics.get(index,[])
training_dynamics_previous.append(np.full([training_dynamics_per_epoch.shape[1],], np.nan))
training_dynamics[index] = training_dynamics_previous
return training_dynamics
[docs] def save(self,logits,assigned_targets,label_flip=None,save_path=None):
"""Save transformed training dynamics .
Args:
save_path (_type_, optional): The filepath to save the processed image.
If None, the image will not be saved. Default: None
"""
if save_path is None:
save_path = 'assets'
if not os.path.exists(save_path):
os.makedirs(save_path)
self.transform(logits=logits,assigned_targets=assigned_targets)
np.savez_compressed(os.path.join(save_path, 'training_dynamics.npz'),
**{'label_flip': label_flip, 'labels': self.labels, 'td': self.training_dynamics})
return self
class LSTM(paddle.nn.Layer):
def __init__(self,input_size=1,hidden_size=64,num_layers=2):
super(LSTM, self).__init__()
# maybe need initialisation
self.classifier = paddle.nn.Linear(in_features=hidden_size,out_features=2)
self.softmax = paddle.nn.Softmax()
self.lstm = paddle.nn.LSTM(input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
time_major=False,)
def forward(self, x):
if len(x.shape) !=3:
x = paddle.unsqueeze(x,axis=2)
out, (_, _) = self.lstm(x)
out = self.classifier(out[:,-1,:])
out = self.softmax(out)
return out
[docs]class BHDFInterpreter(Interpreter):
"""
BHDFInterpreter takes the training dynamics as raw input and lets an LSTM model to predict whether the sample is
mislabeled or clean. We have provided a pre-trained LSTM, which can be directly used for identifying the mislabels.
# TODO: to add the arxiv link.
More details regarding this method can be found in the original paper: [link_to_be_annonced]().
For reproduction experiments, refer to [this repo](https://github.com/Christophe-Jia/mislabel-detection).
"""
def __init__(self, detector: callable = None, device: str = 'gpu:0'):
"""
Args:
detector (callable, optional): A detector model for identifying the mislabeled samples. Defaults to None.
device (str, optional): The device used for running ``detector``, options: ``"cpu"``, ``"gpu:0"``,
``"gpu:1"`` etc. Defaults to 'gpu:0'.
"""
if detector is not None:
self.detector = detector
else:
self.detector = LSTM()
default_detector_path = "assets/noise_detector_trained.pdparams"
if not os.path.exists(default_detector_path):
download_and_decompress(
url="https://github.com/PaddlePaddle/InterpretDL/files/9120427/noise_detector_trained.pdparams.zip",
path="assets/"
)
paddle.Model(self.detector).load(default_detector_path)
Interpreter.__init__(self, self.detector, device)
self._env_setup()
self.detector.eval()
[docs] def interpret(self,
training_dynamics=None,
training_dynamics_path="assets/training_dynamics.npz"):
"""Call this function to rank samples' correctness.
Args:
training_dynamics (dict, optional): Training dynamics is a dictionary, which has keys as follows:
{
`label_flip`: list: The position of label contamination where True indicates label noise;
`labels`: numpy.ndarray: with shape of length of dataset * class number, generated by
``TrainingDynamics.generator``;
`td`: numpy.ndarray: with shape of length of dataset * training epochs * class number, point-wise
probability for each epoch, generated by ``TrainingDynamics.generator``.
}
Returns:
(numpy.ndarray, list): (order,predictions) where order is {ranking of label correctness
in form of data indices list} and predictions is {point-wise predictions as clean}.
"""
assert (training_dynamics is not None) + (training_dynamics_path is not None) == 1, \
"Only one of them should be given."
if training_dynamics is not None:
training_dynamics = paddle.to_tensor(training_dynamics['td'][:,:,0]).astype(paddle.float32)
elif training_dynamics_path is not None:
training_dynamics = paddle.to_tensor(np.load(training_dynamics_path)['td'][:,:,0]).astype(paddle.float32)
else:
raise Exception('Invalid form or path')
dataset = paddle.io.TensorDataset([training_dynamics,paddle.zeros((len(training_dynamics),))])
loader = paddle.io.DataLoader(dataset, batch_size=128, shuffle=False)
predictions=[]
with paddle.no_grad():
for batch_id, data in enumerate(loader()):
x_data = data[0]
predicts = self.detector(x_data).cpu().detach().numpy()
predictions.extend(predicts[:,1])
order = np.argsort(predictions)
return order, predictions