Source code for interpretdl.interpreter.abc_interpreter
import abc
import sys
import re
import numpy as np
import warnings
from ..common.python_utils import versiontuple2tuple
# Ensure compatibility with Python 2/3
ABC = abc.ABC if sys.version_info >= (3, 4) else abc.ABCMeta(str('ABC'), (), {})
[docs]class Interpreter(ABC):
"""
Interpreter is the base abstract class for all Interpreters.
The implementation of any Interpreter should at least
**(1)** prepare :py:attr:`predict_fn` that outputs probability predictions, gradients or other desired
intermediate results of the model, and
**(2)** implement the core function :py:meth:`interpret` of the interpretation algorithm.
In general, we find this implementation is practical, makes the code more readable and can highlight the core
function of the interpretation algorithm.
This kind of implementation works for all post-poc interpretation algorithms. While some algorithms may have
different features and other fashions of implementations may be more suitable for them, our style of implementation
can still work for most of them. So we follow this design for all Interpreters in this library.
Three sub-abstract Interpreters that implement :py:meth:`_build_predict_fn` are currently provided in this file:
:class:`InputGradientInterpreter`, :class:`InputOutputInterpreter`, :class:`IntermediateLayerInterpreter`. For each
of them, the implemented :py:attr:`predict_fn` can be used by several different algorithms. Therefore, the further
implementations can focus on the core algorithm. More sub-abstract Interpreters will be provided if necessary.
.. warning:: ``use_cuda`` would be deprecated soon. Use ``device`` directly.
"""
def __init__(self, model: callable, device: str, **kwargs):
"""
Args:
model (callable): A model with :py:func:`forward` and possibly :py:func:`backward` functions.
device (str): The device used for running ``model``, options: ``"cpu"``, ``"gpu:0"``, ``"gpu:1"``
etc.
"""
self.device = device
self.model = model
self.predict_fn = None
if 'use_cuda' in kwargs and kwargs['use_cuda'] in [True, False]:
warnings.warn('``use_cuda`` would be deprecated soon. Use ``device`` directly.', stacklevel=2)
self.device = 'gpu:0' if kwargs['use_cuda'] and device[:3] == 'gpu' else 'cpu'
assert self.device[:3] in ['cpu', 'gpu']
[docs] def interpret(self, **kwargs):
"""Main function of the interpreter."""
raise NotImplementedError
[docs] def _build_predict_fn(self, **kwargs):
""" Build :py:attr:`predict_fn` for interpreters. This will be called by :py:meth:`interpret`. """
raise NotImplementedError
[docs] def _env_setup(self):
"""Prepare the environment setup. This is not always necessary because the setup can be done within the
function of :py:func:`_build_predict_fn`.
"""
#######################################################################
# This is a simple implementation for disabling gradient computation. #
#######################################################################
import paddle
assert versiontuple2tuple(paddle.__version__) >= (2, 2, 1)
if not paddle.is_compiled_with_cuda() and self.device[:3] == 'gpu':
print("Paddle is not installed with GPU support. Change to CPU version now.")
self.device = 'cpu'
# globally set device.
paddle.set_device(self.device)
# does not need gradients at all.
self.model.eval()
[docs]class InputGradientInterpreter(Interpreter):
"""This is one of the sub-abstract Interpreters.
:class:`InputGradientInterpreter` are used by input gradient based Interpreters. Interpreters that are derived from
:class:`InputGradientInterpreter` include :class:`GradShapCVInterpreter`, :class:`IntGradCVInterpreter`,
:class:`SmoothGradInterpreter`.
This Interpreter implements :py:func:`_build_predict_fn` that returns input gradient given an input.
"""
def __init__(self, model: callable, device: str, **kwargs):
"""
Args:
model (callable): A model with :py:func:`forward` and possibly :py:func:`backward` functions.
device (str): The device used for running ``model``, options: ``"cpu"``, ``"gpu:0"``, ``"gpu:1"``
etc.
"""
Interpreter.__init__(self, model, device, **kwargs)
assert hasattr(model, 'forward'), \
"model has to be " \
"an instance of paddle.nn.Layer or a compatible one."
[docs] def _build_predict_fn(self, rebuild: bool = False, gradient_of: str = 'probability'):
"""Build ``predict_fn`` for input gradients based algorithms.
The model is supposed to be a classification model.
Args:
rebuild (bool, optional): forces to rebuild. Defaults to ``False``.
gradient_of (str, optional): computes the gradient of
[``"loss"``, ``"logit"`` or ``"probability"``] *w.r.t.* input data. Defaults to ``"probability"``.
Other options can get similar results while the absolute scale might be different.
"""
if self.predict_fn is not None:
assert callable(self.predict_fn), "predict_fn is predefined before, but is not callable." \
"Check it again."
if self.predict_fn is None or rebuild:
assert gradient_of in ['loss', 'logit', 'probability']
self._env_setup()
def predict_fn(inputs, labels=None):
"""predict_fn for input gradients based interpreters,
for image classification models only.
Args:
inputs ([type]): scaled inputs.
labels ([type]): can be None.
Returns:
[type]: gradients, labels
"""
import paddle
# assert len(data.shape) == 4 # [bs, h, w, 3]
assert labels is None or \
(isinstance(labels, (list, np.ndarray)) and len(labels) == inputs.shape[0])
if isinstance(inputs, tuple):
tensor_inputs = []
for inp in inputs:
tmp = paddle.to_tensor(inp)
tmp.stop_gradient = False
tensor_inputs.append(tmp)
tensor_inputs = tuple(tensor_inputs)
else:
tensor_inputs = paddle.to_tensor(inputs)
tensor_inputs.stop_gradient = False
tensor_inputs = (tensor_inputs, )
# get logits and probas, [bs, num_c]
logits = self.model(*tensor_inputs)
num_samples, num_classes = logits.shape[0], logits.shape[1]
probas = paddle.nn.functional.softmax(logits, axis=-1)
# get predictions.
pred = paddle.argmax(logits, axis=1)
if labels is None:
labels = pred.numpy()
# get gradients
if gradient_of == 'loss':
# cross-entropy loss
loss = paddle.nn.functional.cross_entropy(logits, paddle.to_tensor(labels), reduction='sum')
else:
# logits or probas
labels = np.array(labels).reshape((num_samples, ))
labels_onehot = paddle.nn.functional.one_hot(paddle.to_tensor(labels), num_classes=num_classes)
if gradient_of == 'logit':
loss = paddle.sum(logits * labels_onehot, axis=1)
else:
loss = paddle.sum(probas * labels_onehot, axis=1)
loss.backward()
gradients = tensor_inputs[0].grad
if isinstance(gradients, paddle.Tensor):
gradients = gradients.numpy()
return gradients, labels, probas
self.predict_fn = predict_fn
[docs]class InputOutputInterpreter(Interpreter):
"""This is one of the sub-abstract Interpreters.
:class:`InputOutputInterpreter` are used by input-output correlation based Interpreters. Interpreters that are derived
from :class:`InputOutputInterpreter` include :class:`OcclusionInterpreter`, :class:`LIMECVInterpreter`,
:class:`SmoothGradInterpreter`.
This Interpreter implements :py:func:`_build_predict_fn` that returns the model's prediction given an input.
"""
def __init__(self, model: callable, device: str, **kwargs):
"""
Args:
model (callable): A model with :py:func:`forward` and possibly :py:func:`backward` functions.
device (str): The device used for running ``model``, options: ``"cpu"``, ``"gpu:0"``, ``"gpu:1"``
etc.
"""
Interpreter.__init__(self, model, device, **kwargs)
assert hasattr(model, 'forward'), \
"model has to be " \
"an instance of paddle.nn.Layer or a compatible one."
[docs] def _build_predict_fn(self, rebuild: bool = False, output: str = 'probability'):
"""Build :py:attr:`predict_fn` for Input-Output based algorithms.
The model is supposed to be a classification model.
Args:
rebuild (bool, optional): forces to rebuild. Defaults to ``False``.
output (str, optional): computes the logit or probability. Defaults: ``"probability"``. Other options can
get similar results while the absolute scale might be different.
"""
if self.predict_fn is not None:
assert callable(self.predict_fn), "predict_fn is predefined before, but is not callable." \
"Check it again."
if self.predict_fn is None or rebuild:
assert output in ['logit', 'probability']
self._env_setup()
def predict_fn(inputs, label):
"""predict_fn for input gradients based interpreters,
for image classification models only.
Args:
inputs ([type]): [description]
label ([type]): can be None.
Returns:
[type]: [description]
"""
import paddle
# assert len(inputs.shape) == 4 # [bs, h, w, 3]
with paddle.no_grad():
inputs = tuple(paddle.to_tensor(inp) for inp in inputs) if isinstance(inputs, tuple) \
else (paddle.to_tensor(inputs), )
logits = self.model(*inputs) # get logits, [bs, num_c]
probas = paddle.nn.functional.softmax(logits, axis=-1) # get probabilities.
pred = paddle.argmax(probas, axis=-1) # get predictions.
if label is None:
label = pred.numpy() # label is an integer.
if output == 'logit':
return logits.numpy(), label, probas
else:
return probas.numpy(), label, probas
self.predict_fn = predict_fn
[docs]class IntermediateLayerInterpreter(Interpreter):
"""This is one of the sub-abstract Interpreters.
:class:`IntermediateLayerInterpreter` exhibits features from intermediate layers to produce explanations.
This interpreter extracts intermediate layers' features, but no gradients involved.
Interpreters that are derived from :class:`IntermediateLayerInterpreter` include
:class:`RolloutInterpreter`, :class:`ScoreCAMInterpreter`.
This Interpreter implements :py:func:`_build_predict_fn` that returns the model's intermediate outputs given an
input.
"""
def __init__(self, model: callable, device: str, **kwargs):
"""
Args:
model (callable): A model with :py:func:`forward` and possibly :py:func:`backward` functions.
device (str): The device used for running ``model``, options: ``"cpu"``, ``"gpu:0"``, ``"gpu:1"``
etc.
"""
Interpreter.__init__(self, model, device, **kwargs)
assert hasattr(model, 'forward'), \
"model has to be " \
"an instance of paddle.nn.Layer or a compatible one."
[docs] def _build_predict_fn(self, rebuild: bool = False, target_layer: str = None, target_layer_pattern: str = None):
"""Build :py:attr:`predict_fn` for IntermediateLayer based algorithms.
The model is supposed to be a classification model.
``target_layer`` and ``target_layer_pattern`` cannot be set at the same time. See the arguments below.
Args:
rebuild (bool, optional): forces to rebuild. Defaults to ``False``.
target_layer (str, optional): the name of the desired layer whose features will output. This is used when
there is only one layer to output. Conflict with ``target_layer_pattern``. Defaults to ``None``.
target_layer_pattern (str, optional): the pattern name of the layers whose features will output. This is
used when there are several layers to output and they share a common pattern name. Conflict with
``target_layer``. Defaults to ``None``.
"""
if self.predict_fn is not None:
assert callable(self.predict_fn), "predict_fn is predefined before, but is not callable." \
"Check it again."
if self.predict_fn is None or rebuild:
assert not (target_layer is None and target_layer_pattern is None), 'one of them must be given.'
assert target_layer is None or target_layer_pattern is None, 'they cannot be given at the same time.'
self._env_setup()
def predict_fn(data):
import paddle
import re
def target_layer_pattern_match(layer_name):
return re.match(target_layer_pattern, layer_name)
def target_layer_match(layer_name):
return layer_name == target_layer
match_func = target_layer_match if target_layer is not None else target_layer_pattern_match
target_feature_maps = []
def hook(layer, input, output):
target_feature_maps.append(output.numpy())
hooks = []
for name, v in self.model.named_sublayers():
if match_func(name):
h = v.register_forward_post_hook(hook)
hooks.append(h)
assert len(hooks) > 0, f"No target layers are found in the given model, \
the list of layer names are \n \
{[n for n, v in self.model.named_sublayers()]}"
with paddle.no_grad():
data = paddle.to_tensor(data)
logits = self.model(data)
# hooks has to be removed.
for h in hooks:
h.remove()
probas = paddle.nn.functional.softmax(logits, axis=1)
predict_label = paddle.argmax(probas, axis=1) # get predictions.
return target_feature_maps, probas.numpy(), predict_label.numpy()
self.predict_fn = predict_fn
[docs]class TransformerInterpreter(Interpreter):
"""This is one of the sub-abstract Interpreters.
:class:`TransformerNLPInterpreter` are used by Interpreters for Transformer based model. Interpreters that are derived from
:class:`TransformerNLPInterpreter` include :class:`BTNLPInterpreter`, :class:`GANLPInterpreter`.
This Interpreter implements :py:func:`_build_predict_fn` that returns servral variables and gradients in each layer.
"""
def __init__(self, model: callable, device: str, **kwargs):
"""
Args:
model (callable): A model with :py:func:`forward` and possibly :py:func:`backward` functions.
device (str): The device used for running ``model``, options: ``"cpu"``, ``"gpu:0"``, ``"gpu:1"``
etc.
"""
Interpreter.__init__(self, model, device, **kwargs)
assert hasattr(model, 'forward'), \
"model has to be " \
"an instance of paddle.nn.Layer or a compatible one."
[docs] def _build_predict_fn(
self,
rebuild: bool = False,
embedding_name: str or None = None,
attn_map_name: str or None = None,
attn_v_name: str or None = None,
attn_proj_name: str or None = None,
gradient_of: str or None = None):
"""Build ``predict_fn`` for transformer based algorithms.
The model is supposed to be a classification model.
Args:
rebuild (bool, optional): forces to rebuild. Defaults to ``False``.
embedding_name (str, optional): the layer name for embedding, if in need.
attn_map_name (str, optional): the layer name for attention weights, if in need.
attn_v_name (str, optional): the layer name for attention value.
attn_proj_name (str, optional): the layer name for attention projection, if in need.
nlp (bool, default to False): whether the input data is for language test.
"""
if self.predict_fn is not None:
assert callable(self.predict_fn), "predict_fn is predefined before, but is not callable." \
"Check it again."
if self.predict_fn is None or rebuild:
self._env_setup()
def predict_fn(inputs, label=None, scale: float or None=None):
"""predict_fn for input gradients based interpreters,
for image classification models only.
Args:
inputs ([type]): scaled input data.
label ([type]): can be None.
scale (float, optional): noise scale for intergrated gradient and smooth gradient
Returns:
[type]:
"""
import paddle
if isinstance(inputs, tuple):
_inputs = []
for inp in inputs:
inp = paddle.to_tensor(inp)
inp.stop_gradient = False
_inputs.append(inp)
inputs = tuple(_inputs)
else:
inputs = paddle.to_tensor(inputs)
inputs.stop_gradient = False
inputs = tuple(paddle.to_tensor(inp) for inp in inputs) if isinstance(inputs, tuple) \
else (paddle.to_tensor(inputs), )
# when alpha is not None
def hook(layer, input, output):
if scale is not None:
output = scale * output
return output
# to obtain the attention weights
block_attns = []
def block_attn_hook(layer, input, output):
block_attns.append(output)
# to obtain the input of each attention block
block_inputs = []
def block_input_hook(layer, input):
block_inputs.append(input)
# to obtain the value and projection weights
block_values = []
def block_value_hook(layer, input, output):
block_values.append(output)
# apply hooks in the forward pass
block_projs = []
hooks = []
for n, v in self.model.named_sublayers():
if attn_map_name is not None and re.match(attn_map_name, n):
h = v.register_forward_post_hook(block_attn_hook)
hooks.append(h)
elif scale is not None and embedding_name is not None and re.match(embedding_name, n):
h = v.register_forward_post_hook(hook)
hooks.append(h)
elif attn_proj_name is not None and re.match(attn_proj_name, n):
block_projs.append(v.weight)
elif attn_v_name is not None and re.match(attn_v_name, n):
h = v.register_forward_pre_hook(block_input_hook)
hooks.append(h)
h = v.register_forward_post_hook(block_value_hook)
hooks.append(h)
logits = self.model(*inputs)
for h in hooks:
h.remove()
proba = paddle.nn.functional.softmax(logits, axis=1)
preds = paddle.argmax(proba, axis=1)
if label is None:
label = preds.numpy()
label_onehot = paddle.nn.functional.one_hot(paddle.to_tensor(label), num_classes=logits.shape[1])
block_attns_grads = []
if gradient_of == 'probability' or gradient_of is None:
target = paddle.sum(proba * label_onehot, axis=1)
target.backward()
elif gradient_of == 'logit':
target = paddle.sum(logits * label_onehot, axis=1)
target.backward()
else:
raise ValueError("`gradient_of` should be one of [logits, probability].")
for i, attn in enumerate(block_attns):
grad = attn.grad.numpy()
block_attns_grads.append(grad)
block_attns[i] = attn.numpy()
target.clear_gradient()
for i, inp in enumerate(block_inputs):
block_inputs[i] = inp[0].numpy()
for i, value in enumerate(block_values):
# check whether q,k,v are concatenated.
d_inp = block_inputs[i].shape[-1]
d_value = value.shape[-1]
if d_inp == d_value:
block_values[i] = value[0].numpy()
elif d_inp * 3 == d_value:
b, s, _ = value.shape
value = value.reshape((b, s, 3, -1)) # 3 == [q,k,v], b == 1.
block_values[i] = value[0, :, 2].numpy()
else:
raise ValueError("Report this issue to InterpretDL.")
for i, proj in enumerate(block_projs):
block_projs[i] = proj.numpy()
return block_attns, block_attns_grads, block_inputs, block_values, block_projs, proba.numpy(), label
self.predict_fn = predict_fn
class IntermediateGradientInterpreter(Interpreter):
"""This is one of the sub-abstract Interpreters.
:class:`IntermediateGradientInterpreter` exhibits both features and gradients from intermediate layers to produce
explanations. Interpreters that are derived from :class:`IntermediateGradientInterpreter` include
:class:``, :class:``.
This Interpreter implements :py:func:`_build_predict_fn` that returns the model's intermediate outputs given an
input.
"""
def __init__(self, model: callable, device: str = 'gpu:0') -> None:
"""
Args:
model (callable): A model with :py:func:`forward` and possibly :py:func:`backward` functions.
device (str): The device used for running ``model``, options: ``"cpu"``, ``"gpu:0"``, ``"gpu:1"``
etc.
"""
Interpreter.__init__(self, model, device)
def _build_predict_fn(self, rebuild=False, layer_name='word_embeddings', gradient_of='probability'):
if self.predict_fn is not None:
assert callable(self.predict_fn), \
"predict_fn is predefined before, but is not callable. Check it again."
return
if self.predict_fn is None or rebuild:
assert gradient_of in ['loss', 'logit', 'probability']
self._env_setup()
def predict_fn(inputs, label=None, scale=None, noise_amount=None):
import paddle
inputs = tuple(paddle.to_tensor(inp) for inp in inputs) if isinstance(inputs, tuple) \
else (paddle.to_tensor(inputs), )
target_feature_map = []
def hook(layer, input, output):
if scale is not None:
output = scale * output
if noise_amount is not None:
bias = paddle.normal(std=noise_amount * output.mean(), shape=output.shape)
output = output + bias
target_feature_map.append(output)
return output
hooks = []
for name, v in self.model.named_sublayers():
if layer_name in name:
h = v.register_forward_post_hook(hook)
hooks.append(h)
logits = self.model(*inputs) # get logits, [bs, num_c]
for h in hooks:
h.remove()
probas = paddle.nn.functional.softmax(logits, axis=1) # get probabilities.
preds = paddle.argmax(probas, axis=1) # get predictions.
if label is None:
label = preds.numpy() # label is an integer.
if gradient_of == 'loss':
# loss
loss = paddle.nn.functional.cross_entropy(logits, paddle.to_tensor(label), reduction='sum')
else:
# logits or probas
label_onehot = paddle.nn.functional.one_hot(paddle.to_tensor(label), num_classes=probas.shape[-1])
if gradient_of == 'logit':
loss = paddle.sum(logits * label_onehot, axis=1)
else:
loss = paddle.sum(probas * label_onehot, axis=1)
loss.backward()
gradients = target_feature_map[0].grad
loss.clear_gradient()
if isinstance(gradients, paddle.Tensor):
gradients = gradients.numpy()
return gradients, label, target_feature_map[0].numpy(), probas.numpy()
self.predict_fn = predict_fn