import itertools
import numpy as np
[docs]
class Estimation():
"""
A class for estimating the parameters of the Fellegi–Sunter model based on observed patterns of discrete similarity levels across multiple variables.
:param K_Fuzzy: Number of variables compared for fuzzy matching.
:type K_Fuzzy: int
:param K_Exact: Number of variables compared for exact matching.
:type K_Exact: int
:param Counts: Array containing the observed counts for each pattern of discrete similarity levels across the compared variables.
:type Counts: numpy.ndarray
"""
def __init__(self, K_Fuzzy: int, K_Exact: int, Counts: np.array):
self.K_Fuzzy = K_Fuzzy
self.K_Exact = K_Exact
self.Counts = Counts
self.Gamma = self._Gamma()
"""
Holds the matrix of observed patterns of discrete similarity levels across variables.
:return: Matrix encoding all observed combinations of discrete similarity levels across variables.
- Each row represents a combination of discrete similarity levels.
- Each column represents a variable.
- Each element represents the discrete similarity level for a specific variable in the given pattern.
:rtype: numpy.ndarray
"""
self.Lambda = None
"""
Holds the estimated overall probability that any two observations are matching.
:return: Unconditional match probability.
:rtype: float
"""
self.Pi = None
"""
Holds the estimated probability of observing each discrete level of similarity for each variable, conditional on the latent match status.
:return: Three-dimensional tensor containing the estimated probabilities of observing each discrete level of similarity for each variable, conditional on latent match status.
- The first index denotes the latent match status, where 0 represents a non-match and 1 represents a match.
- The second index denotes the variable.
- The third index denotes the discrete level of similarity, with higher values reflecting greater similarity.
:rtype: list[list[numpy.ndarray]]
"""
self._Fit_flag = False
def _Gamma(self):
"""
Generates all possible combinations of discrete similarity levels across variables in a format suitable for use with the ``Gamma`` tensor.
:return: Matrix encoding all observed combinations of discrete similarity levels across variables.
- Each row represents a combination of discrete levels of similarity.
- Each column represents a variable.
- Each element represents the discrete level of similarity for that variable in the given pattern.
:rtype: numpy.ndarray
"""
return np.array(list(itertools.product(*(range(i) for i in np.repeat([3,2], [self.K_Fuzzy, self.K_Exact])))))
def _match_probability(self):
"""
Computes the conditional match probability for each pattern in ``Gamma`` given the current parameter values.
:return: Array containing the conditional match probabilities for each combination of discrete similarity levels across variables.
:rtype: numpy.ndarray
"""
cond_prob = np.zeros((2, len(self.Gamma)), dtype = np.float32)
# Loop over latent states
for m in range(2):
# Loop over variables
for k in range(self.K_Fuzzy + self.K_Exact):
# Using log-transformation to multiply probabilities of discrete levels of similarity for all variables (conditional on latent variable)
cond_prob[m,:] += np.log(self.Pi[m][k][self.Gamma[:,k]])
cond_prob[m,:] = np.exp(cond_prob[m,:])
# Compute conditional match probability using Bayes' Rule
result = (self.Lambda * cond_prob[1,:]) / (self.Lambda * cond_prob[1,:] + (1 - self.Lambda) * cond_prob[0,:])
return result
[docs]
def fit(self, Tolerance = 1e-4, Max_Iter = 5000):
"""
Estimates the parameters of the Fellegi–Sunter model using the Expectation–Maximization (EM) algorithm.
:param Tolerance: Convergence threshold: the algorithm stops when the largest change in ``Pi`` is smaller than this value. Defaults to 1e-4.
:type Tolerance: float, optional
:param Max_Iter: Maximum number of EM iterations to perform. Defaults to 5000.
:type Max_Iter: int, optional
:raises Exception: If the model has already been fitted, it cannot be fitted again.
"""
if self._Fit_flag:
raise Exception("If the model has already been fitted, it cannot be fitted again.")
# Parameter Initialization
self.Lambda = 0.1
L_by_Variable = np.repeat([3,2], [self.K_Fuzzy, self.K_Exact])
pi_0 = [-np.sort(-np.random.dirichlet(np.arange(1, i * 50 + 1, 50))) for i in L_by_Variable]
pi_1 = [np.sort(np.random.dirichlet(np.arange(1, i * 50 + 1, 50))) for i in L_by_Variable]
self.Pi = [pi_0, pi_1]
# Loop until convergence or the maximum number of iterations is reached
convergence = False
iter = 1
while not convergence and iter <= Max_Iter:
# E-Step: Compute match probability for possible patterns given current parameters
ksi = self._match_probability()
# M-Step: Compute new parameter values consistent with E-step
self.Lambda = np.dot(ksi, self.Counts) / sum(self.Counts)
pi_1_denom = np.dot(ksi, self.Counts)
pi_1 = [np.fromiter((np.dot((self.Gamma[:,k] == l) * self.Counts, ksi) for l in range(L)), dtype = float) / pi_1_denom for k, L in enumerate(L_by_Variable)]
pi_0_denom = np.dot(1 - ksi, self.Counts)
pi_0 = [np.fromiter((np.dot((self.Gamma[:,k] == l) * self.Counts, 1 - ksi) for l in range(L)), dtype = float) / pi_0_denom for k, L in enumerate(L_by_Variable)]
new_Pi = [pi_0, pi_1]
# Convergence is achieved when the largest change in Pi is smaller than Tolerance
if np.max(np.absolute(np.concatenate([np.concatenate(x) for x in new_Pi]) - np.concatenate([np.concatenate(x) for x in self.Pi]))) < Tolerance:
convergence = True
self.Pi = new_Pi
iter += 1
self._Fit_flag = True
if convergence:
print("Convergence successfully achieved.")
else:
print("Reached the maximum number of iterations without achieving convergence.")
@property
def Ksi(self):
"""
Holds the conditional match probabilities for each combination of discrete levels of similarity across variables, given the estimated parameters of the Fellegi-Sunter model.
:return: Array containing the conditional match probabilities for each pattern of discrete similarity levels across variables.
:rtype: numpy.ndarray
:raises Exception: The model must be fitted first.
"""
if not self._Fit_flag:
raise Exception("The model must be fitted first.")
try:
return self._Ksi
except:
self._Ksi = self._match_probability()
return self._Ksi