import warnings
import numpy as np
import numpy.typing as npt
from scalib import _scalib_ext
from scalib.config import get_config
import scalib.tools
import scalib.utils
[docs]class LDAClassifier:
r"""Models the leakage :math:`\mathbf{l}` with :math:`n_s` dimensions using
the linear discriminant analysis classifier (LDA) with integrated
dimensionality reduction.
.. deprecated:: 0.6.1
Use ``LdaAcc`` instead.
Based on the training data, linear discriminant analysis build a linear
dimentionality reduction to :math:`p` dimensions that maximizes class
separation.
Then, a multivariate gaussian template is fitted for each class (using the
same covariance matrix for all the classes) in the reduced dimensionality
space to predict leakage likelihood :footcite:p:`LDA`.
Let :math:`\mathbf{W}` be the dimensionality reduction matrix of size
(:math:`p`, :math:`n_s`). The likelihood is
.. math::
\mathsf{\hat{f}}(\mathbf{l} | x) =
\frac{1}{\sqrt{(2\pi)^{p} \cdot |\mathbf{\Sigma} |}} \cdot
\exp^{\frac{1}{2}
(\mathbf{W} \cdot \mathbf{l} - \mathbf{\mu}_x)
\mathbf{\Sigma}
( \mathbf{W} \cdot \mathbf{l}-\mathbf{\mu}_x)'}
where :math:`\mathbf{\mu}_x` is the mean of the leakage for class :math:`x` in
the projected space (:math:`\mu_x = \mathbb{E}(\mathbf{W}\mathbf{l}_x)`, where
:math:`\mathbf{l}_x` denotes the leakage traces of class :math:`x`) and
:math:`\mathbf{\Sigma}` its covariance (:math:`\mathbf{\Sigma} =
\mathbb{Cov}(\mathbf{W}\mathbf{l}_x - \mathbf{\mu}_x)`).
:class:`LDAClassifier` provides the probability of each class with :meth:`predict_proba`
thanks to Bayes' law such that
.. math::
\hat{\mathsf{pr}}(x|\mathbf{l}) = \frac{\hat{\mathsf{f}}(\mathbf{l}|x)}
{\sum_{x^*=0}^{n_c-1} \hat{\mathsf{f}}(\mathbf{l}|x^*)}.
Example
-------
>>> from scalib.modeling import LDAClassifier
>>> import numpy as np
>>> # 5000 traces of length 10, with value between 0 and 255
>>> traces = np.random.randint(0,256,(5000,10),dtype=np.int16)
>>> # classes between 0 and 15
>>> x = np.random.randint(0,16,5000,dtype=np.uint16)
>>> lda = LDAClassifier(16,3)
>>> lda.fit_u(traces, x)
>>> lda.solve()
>>> # predict classes for new traces
>>> nt = np.random.randint(0,256,(20,10),dtype=np.int16)
>>> predicted_proba = lda.predict_proba(nt)
Notes
-----
This should have similar behavior as scikit-learn's `LDA
<https://scikit-learn.org/stable/modules/lda_qda.html#lda-qda>`_, but it
has better performance and numerical properties (at the cost of
flexibility).
References
----------
.. footbibliography::
Parameters
----------
nc :
Number of possible classes (e.g., 256 for 8-bit target). ``nc`` must
be smaller than :math:`2^{16}`.
p :
Number of dimensions in the linear subspace.
"""
def __init__(self, nc: int, p: int):
self.solved = False
self.done = False
self.p = p
self._nc = nc
self._ns = None
self._init = False
if p >= nc:
raise ValueError("p must be at most nc")
warnings.warn(
"LdaClassifier is deprecated. Use LdaAcc instead.", DeprecationWarning
)
[docs] def fit_u(
self,
traces: npt.NDArray[np.int16],
x: npt.NDArray[np.uint16],
gemm_mode: int | None = None,
):
r"""Update statistical model estimates with fresh data.
Parameters
----------
traces :
Array that contains the traces. The array must
be of dimension ``(n,ns)`` and its type must be `int16`.
x :
Labels for each trace. Must be of shape ``(n)`` and
must be `uint16`.
gemm_mode:
Depreciated, kept for API compatibility.
"""
traces = scalib.utils.clean_traces(traces, self._ns)
x = scalib.utils.clean_labels(x[:, np.newaxis], nv=1, multi=True)
if self.done:
raise ValueError("Cannot fit_u after calling .solve(..., done=True).")
if not self._init:
self._init = True
self._ns = traces.shape[1]
self.acc = _scalib_ext.MultiLdaAcc(
self._ns, self._nc, np.arange(self._ns, dtype=np.uint32)[np.newaxis, :]
)
# TODO maybe there is something smarter to do here w.r.t. number of
# threads + investigate exact BLIS behavior.
with scalib.utils.interruptible():
self.acc.fit(traces, x, get_config())
self.solved = False
[docs] def solve(self, done: bool = False):
r"""Estimates the PDF parameters that is the projection matrix
:math:`\mathbf{W}`, the means :math:`\mathbf{\mu}_x` and the covariance
:math:`\mathbf{\Sigma}`.
Parameters
----------
done :
True if the object will not be futher updated (clears some internal
state, saving memory).
Notes
-----
Once this has been called, predictions can be performed.
"""
if not self._init:
raise ValueError("Cannot .solve since .fit_u was never called.")
if self.solved:
raise ValueError(
"Already called .solve() on this object, should not be called twice."
)
with scalib.utils.interruptible():
self.mlda = self.acc.multi_lda(self.p, get_config())
self.solved = True
self.done = done
if done:
del self.acc
[docs] def predict_proba(self, traces: npt.NDArray[np.int16]) -> npt.NDArray[np.float64]:
r"""Computes the probability for each of the classes for the traces.
Parameters
----------
traces :
Array that contains the traces. The array must be of dimension ``(n,ns)``.
Returns
-------
array_like, f64
Probabilities. Shape ``(n, nc)``.
"""
if not self.solved:
raise ValueError(
"Call LDA.solve() before LDA.predict_proba() to compute the model."
)
with scalib.utils.interruptible():
prs = self.mlda.predict_proba(traces, False, get_config())[0]
return prs
def _raw_scores(self, traces: npt.NDArray[np.int16]) -> npt.NDArray[np.float64]:
if not self.solved:
raise ValueError(
"Call LDA.solve() before LDA.predict_proba() to compute the model."
)
with scalib.utils.interruptible():
prs = self.mlda.predict_proba(traces, True, get_config())[0]
return prs
[docs] def get_sw(self):
r"""Return :math:`S_{W}` matrix (within-class scatter)."""
return self.acc.get_sw()[0]
[docs] def get_sb(self):
r"""Return :math:`S_{B}` matrix (between-class scatter)."""
return self.acc.get_sb()[0]
[docs] def get_mus(self):
r"""Return means matrix (classes means). Shape: ``(nc, ns)``."""
return self.acc.get_mus()[0]
[docs]class MultiLDA:
"""Perform LDA on `nv` distinct variables for the same leakage traces.
.. deprecated:: 0.6.1
Use ``LdaAcc`` instead.
While functionally similar to a simple for loop, this enables solving the
LDA problems in parallel in a simple fashion. This also enable easy
handling of Points Of Interest (POIs) in long traces.
Parameters
----------
ncs: array_like, int
Number of classes for each variable. Shape ``(nv,)``.
ps: array_like, int
Number of dimensions to keep after dimensionality reduction for each variable.
Shape ``(nv,)``.
pois: list of array_like, int
Indices of the POIs in the traces for each variable. That is, for
variable ``i``, and training trace ``t``, ``t[pois[i]]`` is the input
datapoints for the LDA.
gemm_mode:
See :func:`LDACLassifier.fit_u`.
Examples
--------
>>> from scalib.modeling import MultiLDA
>>> import numpy as np
>>> # 5000 traces with 50 points each
>>> traces = np.random.randint(0, 256, (5000,50),dtype=np.int16)
>>> # 5 variables (8-bit), and 5000 traces
>>> x = np.random.randint(0, 256, (5000, 5),dtype=np.uint16)
>>> # 10 POIs for each of the 5 variables
>>> pois = [list(range(7*i, 7*i+10)) for i in range(5)]
>>> # Keep 3 dimensions after dimensionality reduction
>>> lda = MultiLDA(5*[256], 5*[3], pois)
>>> lda.fit_u(traces, x)
>>> lda.solve()
>>> # Predict the class for 20 traces.
>>> nt = np.random.randint(0, 256, (20, 50), dtype=np.int16)
>>> predicted_proba = lda.predict_proba(nt)
"""
def __init__(self, ncs, ps, pois, gemm_mode: int | None = None):
self.pois = pois
self.ldas = [LDAClassifier(nc, p) for nc, p in zip(ncs, ps)]
warnings.warn("MultiLDA is deprecated. Use LdaAcc instead.", DeprecationWarning)
[docs] def fit_u(self, traces, x):
"""Update the LDA estimates with new training data.
Parameters
----------
traces : array_like, int16
Array that contains the traces. The array must
be of dimension ``(n,ns)`` and its type must be `int16`.
x : array_like, uint16
Labels for each trace. Must be of shape ``(n, nv)`` and
must be `uint16`.
"""
# Try to avoid the over-subscription of CPUs.
num_threads = get_config().threadpool.n_threads
with scalib.utils.interruptible():
with scalib.tools.ContextExecutor(max_workers=num_threads) as executor:
list(
executor.map(
lambda i: self.ldas[i].fit_u(
np.ascontiguousarray(traces[:, self.pois[i]]),
x[:, i],
),
range(len(self.ldas)),
)
)
[docs] def solve(self, done: bool = False):
"""See `LDAClassifier.solve`."""
# Put as much work as needed to fill rayon threadpool
with scalib.utils.interruptible():
with scalib.tools.ContextExecutor(
max_workers=get_config().threadpool.n_threads
) as executor:
list(executor.map(lambda lda: lda.solve(done), self.ldas))
[docs] def predict_proba(self, traces):
"""Predict probabilities for all variables.
See `LDAClassifier.predict_proba`.
Parameters
----------
traces : array_like, int16
Array that contains the traces. The array must
be of dimension ``(n,ns)``.
Returns
-------
list of array_like, f64
Probabilities. ``nv`` arrays of shape ``(n, nc)``.
"""
# Put as much work as needed to fill rayon threadpool
with scalib.utils.interruptible():
with scalib.tools.ContextExecutor(
max_workers=get_config().threadpool.n_threads
) as executor:
return list(
executor.map(
lambda i: self.ldas[i].predict_proba(traces[:, self.pois[i]]),
range(len(self.ldas)),
)
)
def _raw_scores(self, traces):
return [
self.ldas[i]._raw_scores(traces[:, self.pois[i]])
for i in range(len(self.ldas))
]
[docs] def get_sw(self):
return [lda.get_sw() for lda in self.ldas]
[docs] def get_sb(self):
return [lda.get_sb() for lda in self.ldas]
[docs] def get_mus(self):
return [lda.get_mus() for lda in self.ldas]
[docs]class LdaAcc:
r"""Models the leakage :math:`\mathbf{l}` with :math:`n_s` dimensions using
the linear discriminant analysis classifier (LDA) with integrated
dimensionality reduction.
Based on the training data, linear discriminant analysis build a linear
dimentionality reduction to :math:`p` dimensions that maximizes class
separation, for each target variable.
Then, for each variable, a multivariate gaussian template is fitted for
each class (using the same covariance matrix for all the classes) in the
reduced dimensionality space to predict leakage likelihood
:footcite:p:`LDA`.
Let :math:`\mathbf{W}` be the dimensionality reduction matrix of size
(:math:`p`, :math:`n_s`). The likelihood is
.. math::
\mathsf{\hat{f}}(\mathbf{l} | x) =
\frac{1}{\sqrt{(2\pi)^{p} \cdot |\mathbf{\Sigma} |}} \cdot
\exp^{\frac{1}{2}
(\mathbf{W} \cdot \mathbf{l} - \mathbf{\mu}_x)
\mathbf{\Sigma}
( \mathbf{W} \cdot \mathbf{l}-\mathbf{\mu}_x)'}
where :math:`\mathbf{\mu}_x` is the mean of the leakage for class :math:`x` in
the projected space (:math:`\mu_x = \mathbb{E}(\mathbf{W}\mathbf{l}_x)`, where
:math:`\mathbf{l}_x` denotes the leakage traces of class :math:`x`) and
:math:`\mathbf{\Sigma}` its covariance (:math:`\mathbf{\Sigma} =
\mathbb{Cov}(\mathbf{W}\mathbf{l}_x - \mathbf{\mu}_x)`).
This class is aimed to be used together with :class:`Lda`. In particular, :class:`LdaAcc`
only perform the fitting step, from which a fitted :class:`Lda` instance can be created
for an arbitrary :math:`p`. The solved :class:`Lda` instance provides the probability of
each class with :meth:`predict_proba` thanks to Bayes' law such that
.. math::
\hat{\mathsf{pr}}(x|\mathbf{l}) = \frac{\hat{\mathsf{f}}(\mathbf{l}|x)}
{\sum_{x^*=0}^{n_c-1} \hat{\mathsf{f}}(\mathbf{l}|x^*)}.
Example
-------
>>> from scalib.modeling import Lda, LdaAcc
>>> import numpy as np
>>> # 1000 traces of length 10, with value between 0 and 4091
>>> traces = np.random.randint(0, 4092, (1000, 10), dtype=np.int16)
>>> # classes between 0 and 15 (2 variables)
>>> x = np.random.randint(0, 16, (1000, 2), dtype=np.uint16)
>>> lda_acc = LdaAcc(nc=16, pois=[list(range(5)), list(range(4, 10))])
>>> lda_acc.fit_u(traces, x)
>>> lda = Lda(lda_acc, p=3) # Projection to 3 dimensions.
>>> # predict classes for new traces
>>> new_traces = np.random.randint(0, 256, (20,10), dtype=np.int16)
>>> predicted_proba = lda.predict_proba(new_traces)
Notes
-----
This should have similar behavior as scikit-learn's `LDA
<https://scikit-learn.org/stable/modules/lda_qda.html#lda-qda>`_, but it
has better performance and numerical properties (at the cost of
flexibility).
References
----------
.. footbibliography::
Parameters
----------
nc: int
Number of possible classes (e.g., 256 for 8-bit target). ``nc`` must
be smaller than :math:`2^{16}`.
pois: list of array_like, int
Indices of the POIs in the traces for each variable. That is, for
variable ``i``, and training trace ``t``, ``t[pois[i]]`` is the input
datapoints for the LDA.
"""
def __init__(self, nc: int, pois):
self._nc = nc
self._pois = pois
self._nv = len(self._pois)
self._init = False
[docs] def fit_u(self, traces, x):
"""Update the LDA estimates with new training data.
Parameters
----------
traces : array_like, int16
Array that contains the traces. The array must
be of dimension ``(n,ns)`` and its type must be `int16`.
x : array_like, uint16
Labels for each trace. Must be of shape ``(n, nv)`` and
must be `uint16`.
"""
if not self._init:
self._init = True
self._ns = traces.shape[1]
self._inner = _scalib_ext.MultiLdaAcc(self._ns, self._nc, self._pois)
traces = scalib.utils.clean_traces(traces, self._ns)
x = scalib.utils.clean_labels(x, self._nv)
with scalib.utils.interruptible():
self._inner.fit(traces, x, get_config())
[docs] def get_sw(self):
r"""Return :math:`S_{W}` matrix (within-class scatter)."""
return self._inner.get_sw()
[docs] def get_sb(self):
r"""Return :math:`S_{B}` matrix (between-class scatter)."""
return self._inner.get_sb()
[docs] def get_mus(self):
r"""Return means matrix (classes means) as a list of length ``nv``, where the ``i``-th element is the matrix of shape: ``(nc, len(pois[i]))`` associated to the ``i``-th variable."""
return self._inner.get_mus()
[docs]class Lda:
"""See :class:`LdaAcc`."""
def __init__(self, acc: LdaAcc, p: int):
r"""Estimates the PDF parameters that is the projection matrix
:math:`\mathbf{W}`, the means :math:`\mathbf{\mu}_x` and the covariance
:math:`\mathbf{\Sigma}`.
Parameters
----------
p :
Number of dimensions to keep after dimensionality reduction for each variable.
acc :
Fitted :class:`LdaAcc` instance.
"""
if not acc._init:
raise ValueError("Empty accumulator: .fit_u was never called.")
with scalib.utils.interruptible():
self._inner = acc._inner.multi_lda(p, get_config())
self._nv = acc._nv
self._ns = acc._ns
[docs] def predict_proba(self, traces):
r"""Computes the probability for each of the classes for the traces,
for all variables.
Parameters
----------
traces : array_like, int16
Array that contains the traces. The array must be of dimension ``(n,ns)``.
Returns
-------
list of array_like, f64
Probability distributions. Shape ``(nv, n, nc)``.
"""
traces = scalib.utils.clean_traces(traces, self._ns)
with scalib.utils.interruptible():
return self._inner.predict_proba(traces, False, get_config())
def _raw_scores(self, traces):
r"""Raw scores, i.e., predict_proba w/o softmax (for tests)."""
traces = scalib.utils.clean_traces(traces, self._ns)
with scalib.utils.interruptible():
return self._inner.predict_proba(traces, True, get_config())
[docs] def predict_log2_proba_class(self, traces, x):
r"""Computes the log2 probability for the given class for the traces,
for all variables.
Parameters
----------
traces : array_like, int16
Array that contains the traces. The array must be of dimension ``(n,ns)``.
x : array_like, uint16
Labels for each trace. Must be of shape ``(n, nv)`` and
must be `uint16`.
Returns
-------
list of array_like, f64
Log2 probabilities. Shape ``(nv, n)``.
"""
traces = scalib.utils.clean_traces(traces, self._ns)
x = scalib.utils.clean_labels(x, self._nv)
with scalib.utils.interruptible():
return self._inner.predict_log2_proba_class(traces, x, get_config())
[docs] def project(self, traces: npt.NDArray[np.int16]) -> npt.NDArray[np.float64]:
r"""Project the traces in the sub-linear space.
Parameters
----------
traces:
Array that contains the traces. The array must be of dimension ``(n,ns)``.
Returns
-------
array_like, f64
Projected traces. List of ``nv`` array of shape ``(n, self.p)``
"""
return self._inner.project(traces, get_config())
[docs] def select_vars(self, vars: list[int]) -> "Lda":
r"""Make a new :class:`Lda` with only a subset of the variables (in the
order given by the list).
Parameters
----------
traces :
List of selected variables.
Returns
-------
Lda
A new Lda.
"""
cls = type(self)
new = cls.__new__(cls)
new._inner = self._inner.select_vars(vars)
new._nv = len(vars)
new._ns = self._ns
return new