"""
Complementary module for the ``ADMM`` convolutional
References
----------
.. [1] 'Multichannel sparse recovery of complex-valued signals using Huber’s criterion',Esa Ollila
Avalaible at: https://arxiv.org/pdf/1504.04184.pdf
.. [2] 'Robust Principal Component Analysis?', Candes & al.
Avalaible at: http://www.columbia.edu/~jw2966/papers/CLMW11-JACM.pdf
.. [3] 'Distributed Optimization and Statistical Learning
via the Alternating Direction Method of Multipliers p.23', Stephen Boyd
Avalaible at: https://web.stanford.edu/~boyd/papers/pdf/admm_distr_stats.pdf
"""
import numpy as np
from scipy import linalg
from pywt import threshold
from sklearn.base import BaseEstimator
[docs]def huber_complex(x, delta):
r"""Huber function applied to a complex array as proposed in [1]_
:math:`\rho_{H, \delta}(x)=\left\{\begin{array}{ll}|x|^{2}, & \text { for }|x| \leq \delta
\\2 \delta|x|-\delta^{2}, & \text { for }|x|>\delta\end{array}\right.`
Parameters
----------
x : complex
array to apply the Huber function
delta : float
threshold parameter
Returns
-------
out : complex
array with the Huber function applied
"""
return ((np.abs(x)<=delta)*np.abs(x)**2 + (np.abs(x)>delta)*(2*delta*np.abs(x)-delta**2))
[docs]def sign_complex_array(x):
r""" Determine the sign of a complex array, based on [1]_
:math:`\operatorname{sign}(e)=\left\{\begin{array}{ll}
e /|e|, & \text { for } e \neq 0 \\
0, & \text { for } e=0
\end{array}\right.`
Parameters
----------
x : complex
array to determine the sign
Returns
-------
out : int
array with the sign of the input array
"""
return np.where(x!=0,x/np.abs(x),0)
[docs]def loss_derivative_function_complex(x, delta):
r""" Derivative of the Huber function, based on [1]_
:math:`\psi_{H, \delta}(x)=\left\{\begin{array}{ll}
x, & \text { for }|x| \leq \delta \\
\delta \operatorname{sign}(x), & \text { for }|x|>\delta
\end{array}\right.`
Parameters
----------
x : complex
array to apply the Huber function
delta : float
threshold parameter
Returns
-------
out : complex
array with the derivative of the Huber function applied
"""
return np.where(np.abs(x)>delta,delta*sign_complex_array(x),x)
[docs]def gradient_huber(S_tilde, X_m_tilde, D_m_tilde, D_m_H_tilde, Z_m_tilde, rho, delta):
r""" Function to compute the gradient of the Huber function of the following
minimization problem:
:math:`\underset{\mathbf{\hat{c}}}{\operatorname{argmin}}
\sum_N\mathcal{H}_\delta(\mathbf{\hat{D}} \cdot \mathbf{\hat{c}}-\mathbf{\hat{y}})+
\frac{\rho}{2}\Big|\Big| {\mathbf{\hat{c}}-\mathbf{\hat{z}}}\Big|\Big|_2^2`
The gradient is computed as follows:
:math:`\nabla_\mathbf{\hat{c}}f=\{\mathbf{\tilde{DH}_m}\}_k\odot\Psi_\delta
\left(\sum_{k}\mathbf{\tilde{D}_m}_k\odot\mathbf{\tilde{X}_m}_k
-\mathbf{\tilde{S}}\right)+\rho({\mathbf{\tilde{X}_m}-\mathbf{\tilde{Z}_m}})`
Parameters
----------
S_tilde : complex
fft of the original signal (Nx,Ny)
X_m_tilde : complex
fft of the coefficients maps :math:`\mathbf{\hat{C}_k}` (Nx,Ny,K)
D_m_tilde : complex
fft of the dictionary :math:`\mathbf{\hat{D}_k}` (Nx,Ny,K)
D_m_H_tilde : complex
fft hermitian of the dictionary :math:`\mathbf{\hat{D}_k}^H` (Nx,Ny,K)
Z_m_tilde : complex
fft (auxiliary variable- dual variable) (Nx,Ny,K)
rho : float
regularization parameter
delta : float
threshold parameter
Returns
-------
out: complex
gradient of the Huber function (NX,NY,K)
"""
r_n_tilde = np.sum(D_m_tilde*X_m_tilde, axis=2,keepdims=True) + S_tilde
xi = loss_derivative_function_complex(r_n_tilde, delta)
nabla_w = 0.5*(xi*D_m_H_tilde)
nabla_q = rho * (X_m_tilde+Z_m_tilde)
return nabla_w+nabla_q
[docs]def proxH(x,seuil,rho):
""" proximal operator of the Huber function
Parameters
----------
x : ndarray
input signal
seuil : float
threshold
rho : float
regularization parameter
Returns
-------
ndarray
result of the proximal operator
"""
return np.where(np.abs(x)<=seuil*(rho+1),x/(rho+1),x-seuil*rho*sign_complex_array(x))
[docs]def thresholding_nuclear(M,i,lambdaS,rho):
r"""Singular value thresholding function
It is the resultant of the proximal operator associated to the
nuclear norm.
:math:`\underset{\mathbf{{L}}}{\operatorname{argmin}}
\lambda||\mathbf{L}||_* +
\frac{\rho_L}{2}\Big|\Big|\mathbf{X}-\mathbf{L}\Big|\Big|_2^2 =
\mathrm{prox}_{||.||_*,\lambda/\rho_L}(\mathbf{X})
\\
\ \mathrm{avec}\ \ \mathrm{prox}_{||.||_*,\lambda/\rho_L}(x)=
\mathcal{T}_{\lambda/\rho_L}\left(x\right)`
Based on [2]_
Parameters
----------
M : float
tensor of dimension Nx,Ny,K
i : int
i th layer to threshold (0<i<K)
lambdaS : float
Parsimony parameter (if existing >0)
rho : float
Penalty parameter >0
Returns
-------
L : float
Result of the minimization for a layer i
"""
[u_temp,Sig_temp,v_temp] = linalg.svd((M)[:,:,i],check_finite=False)
Sig_temp = diag_thresh(u_temp.shape[0],u_temp.shape[0],Sig_temp)
L = (u_temp@threshold(Sig_temp,(lambdaS/rho),'soft')@v_temp)
return L
[docs]def update_rhoLS_adp(er_prim,er_dual,rhoLS):
r"""Adaptation function of the penalty parameters in the
the `ADMM` from the errors made on the primal and dual.
Based on [3]_
Parameters
----------
er_prim : float
error of the primal
er_dual : float
error of the dual
rhoLS : float
value of the parameter to be updated
Returns
-------
rhoLS : float
updated penalty parameter
k : float
expansion/contraction factor used
"""
if (er_prim>10*er_dual)&(rhoLS<1e5):
k=2
rhoLS = k*rhoLS
elif (er_dual>10*er_prim)&(rhoLS<1e5):
k=0.5
rhoLS = k*rhoLS
else:
k=1
return rhoLS,k
[docs]def Sherman_MorrisonF(DF_H, b, c, rho):
r"""Solve a diagonal block linear system with a scaled identity
term using the Sherman-Morrison equation
The solution is obtained by independently solving a set of linear
systems of the form (see wohlberg-2015-efficient)
:math:`(a\cdot a^H +\rho I)x = b`
In this equation inner products and matrix products are taken along
the 3rd dimension of the corresponding multi-dimensional arrays; the
solutions are independent over the 1st and 2nd (and 4th, if
non-singleton) dimensions.
Parameters
----------
DF_H :complex
Conjugate of Multi-dimensional array containing :math:`a^H`
b :complex
Multi-dimensional array containing b
c :complex
Multi-dimensional array containing pre-computed quantities :math:`a^H/(a^H\cdot a +\\rho)`
rho :float
Scalar rho
Returns
-------
x :complex
Multi-dimensional array containing linear system solution
Notes
-----
Adapted from matlab code : Brendt Wohlberg <brendt@lanl.gov> Modified: 2014-12-18
"""
cb = np.sum(c * b, 2, keepdims=True)
# cb=np.repeat(cb[:,:,np.newaxis],K,axis=2)
cba = cb * DF_H
x = (b - cba) / rho
return x
[docs]def diag_thresh(m, n, s):
r"""Return a diagonal matrix of dimension mxn from a vector s
Use after `np.linalg.svd` to get a matrix from S
Parameters
----------
m :int
vertical dimension of the desired matrix
n :int
horizontal dimension of the desired matrix
s :float
vector to diagonalize
Returns
-------
out :float
Diagonal matrix (from S)
"""
q = np.abs(n - m)
np.diag(s, q)[:, q:]
if m < n:
return np.diag(s, q)[:, q:].T
else:
return np.diag(s, q)[:, q:]
[docs]def SVD_gpr(ref,rank):
""" Perform a SVD on the reference image and dump the first n rank singular values
Parameters
----------
ref : ndarray
reference image
rank : int
rank of the SVD
Returns
-------
A_remake : ndarray
Reconstructed reference image without the rank-n singular values
"""
U, D, VT = np.linalg.svd(ref, full_matrices=False)
D[:rank]=0
diag_D = diag_thresh(U.shape[0],VT.shape[0],D)
A_remake = (U @ diag_D @ VT)
return A_remake
[docs]def roll_fft(alpha, t, x):
r"""Correction of the position of the coefficients of the maps according to the
of the central position of the hyperbolas + Summation of the C_k
Parameters
----------
alpha :float
Complex tensor (M x N x K) of the coefficients maps
t :int
central position (pixel) of the used hyperbolas (ordinate).
x :int
central position (pixel) of the used hyperbolas (abscissa).
Returns
-------
out :float
Corrected and reduced matrix (M x N)
"""
Q = np.real(np.sum(alpha, axis=2))
Q = np.roll(Q, t, axis=0)
Q = np.roll(Q, -x, axis=1)
return Q
class ConvolutionalSparseCoding(BaseEstimator):
"""Base classe for convolutional sparse coding for image processing applications
Attributes
----------
dictionary : array_like of shape (n_pixelsx, n_pixelx_y, n_atoms)
Dictionary for sparse coding.
"""
def __init__(self, dictionary) -> None:
super().__init__()
self._set_dictionary(dictionary)
def _set_dictionary(self, dictionary):
"""Setting the internal dictionary.
Parameters
----------
dictionary : array_like of shape (n_pixelsx, n_pixelx_y, n_atoms)
Dictionary for sparse coding.
Raises
------
AttributeError
When the dimesnion of the dictionary is not 3.
"""
#check_array(dictionary)
if dictionary.ndim!=3:
raise AttributeError(f"Dimension of array {dictionary.ndim} != 3")
self.dictionary = dictionary