Source code for faster.estimation

import itertools
import numpy as np

[docs] class Estimation(): """ A class for estimating the parameters of the Fellegi–Sunter model based on observed patterns of discrete similarity levels across multiple variables. :param K_Fuzzy: Number of variables compared for fuzzy matching. :type K_Fuzzy: int :param K_Exact: Number of variables compared for exact matching. :type K_Exact: int :param Counts: Array containing the observed counts for each pattern of discrete similarity levels across the compared variables. :type Counts: numpy.ndarray """ def __init__(self, K_Fuzzy: int, K_Exact: int, Counts: np.array): self.K_Fuzzy = K_Fuzzy self.K_Exact = K_Exact self.Counts = Counts self.Gamma = self._Gamma() """ Holds the matrix of observed patterns of discrete similarity levels across variables. :return: Matrix encoding all observed combinations of discrete similarity levels across variables. - Each row represents a combination of discrete similarity levels. - Each column represents a variable. - Each element represents the discrete similarity level for a specific variable in the given pattern. :rtype: numpy.ndarray """ self.Lambda = None """ Holds the estimated overall probability that any two observations are matching. :return: Unconditional match probability. :rtype: float """ self.Pi = None """ Holds the estimated probability of observing each discrete level of similarity for each variable, conditional on the latent match status. :return: Three-dimensional tensor containing the estimated probabilities of observing each discrete level of similarity for each variable, conditional on latent match status. - The first index denotes the latent match status, where 0 represents a non-match and 1 represents a match. - The second index denotes the variable. - The third index denotes the discrete level of similarity, with higher values reflecting greater similarity. :rtype: list[list[numpy.ndarray]] """ self._Fit_flag = False def _Gamma(self): """ Generates all possible combinations of discrete similarity levels across variables in a format suitable for use with the ``Gamma`` tensor. :return: Matrix encoding all observed combinations of discrete similarity levels across variables. - Each row represents a combination of discrete levels of similarity. - Each column represents a variable. - Each element represents the discrete level of similarity for that variable in the given pattern. :rtype: numpy.ndarray """ return np.array(list(itertools.product(*(range(i) for i in np.repeat([3,2], [self.K_Fuzzy, self.K_Exact]))))) def _match_probability(self): """ Computes the conditional match probability for each pattern in ``Gamma`` given the current parameter values. :return: Array containing the conditional match probabilities for each combination of discrete similarity levels across variables. :rtype: numpy.ndarray """ cond_prob = np.zeros((2, len(self.Gamma)), dtype = np.float32) # Loop over latent states for m in range(2): # Loop over variables for k in range(self.K_Fuzzy + self.K_Exact): # Using log-transformation to multiply probabilities of discrete levels of similarity for all variables (conditional on latent variable) cond_prob[m,:] += np.log(self.Pi[m][k][self.Gamma[:,k]]) cond_prob[m,:] = np.exp(cond_prob[m,:]) # Compute conditional match probability using Bayes' Rule result = (self.Lambda * cond_prob[1,:]) / (self.Lambda * cond_prob[1,:] + (1 - self.Lambda) * cond_prob[0,:]) return result
[docs] def fit(self, Tolerance = 1e-4, Max_Iter = 5000): """ Estimates the parameters of the Fellegi–Sunter model using the Expectation–Maximization (EM) algorithm. :param Tolerance: Convergence threshold: the algorithm stops when the largest change in ``Pi`` is smaller than this value. Defaults to 1e-4. :type Tolerance: float, optional :param Max_Iter: Maximum number of EM iterations to perform. Defaults to 5000. :type Max_Iter: int, optional :raises Exception: If the model has already been fitted, it cannot be fitted again. """ if self._Fit_flag: raise Exception("If the model has already been fitted, it cannot be fitted again.") # Parameter Initialization self.Lambda = 0.1 L_by_Variable = np.repeat([3,2], [self.K_Fuzzy, self.K_Exact]) pi_0 = [-np.sort(-np.random.dirichlet(np.arange(1, i * 50 + 1, 50))) for i in L_by_Variable] pi_1 = [np.sort(np.random.dirichlet(np.arange(1, i * 50 + 1, 50))) for i in L_by_Variable] self.Pi = [pi_0, pi_1] # Loop until convergence or the maximum number of iterations is reached convergence = False iter = 1 while not convergence and iter <= Max_Iter: # E-Step: Compute match probability for possible patterns given current parameters ksi = self._match_probability() # M-Step: Compute new parameter values consistent with E-step self.Lambda = np.dot(ksi, self.Counts) / sum(self.Counts) pi_1_denom = np.dot(ksi, self.Counts) pi_1 = [np.fromiter((np.dot((self.Gamma[:,k] == l) * self.Counts, ksi) for l in range(L)), dtype = float) / pi_1_denom for k, L in enumerate(L_by_Variable)] pi_0_denom = np.dot(1 - ksi, self.Counts) pi_0 = [np.fromiter((np.dot((self.Gamma[:,k] == l) * self.Counts, 1 - ksi) for l in range(L)), dtype = float) / pi_0_denom for k, L in enumerate(L_by_Variable)] new_Pi = [pi_0, pi_1] # Convergence is achieved when the largest change in Pi is smaller than Tolerance if np.max(np.absolute(np.concatenate([np.concatenate(x) for x in new_Pi]) - np.concatenate([np.concatenate(x) for x in self.Pi]))) < Tolerance: convergence = True self.Pi = new_Pi iter += 1 self._Fit_flag = True if convergence: print("Convergence successfully achieved.") else: print("Reached the maximum number of iterations without achieving convergence.")
@property def Ksi(self): """ Holds the conditional match probabilities for each combination of discrete levels of similarity across variables, given the estimated parameters of the Fellegi-Sunter model. :return: Array containing the conditional match probabilities for each pattern of discrete similarity levels across variables. :rtype: numpy.ndarray :raises Exception: The model must be fitted first. """ if not self._Fit_flag: raise Exception("The model must be fitted first.") try: return self._Ksi except: self._Ksi = self._match_probability() return self._Ksi