"""Data cleaning and identity resolution helper functions This module contains identity resolution and data cleaning helper functions developed by ERDC. Currently, it contains only a single function, "delete_superfluous_records()." Notes Author: John Sabel Washington State Education Research and Data Center Creation date: 2022-04-11 Version: 0.1 ----- """ import pandas as pd import numpy as np from numpy.core.defchararray import find def delete_superfluous_records(data_set_in: dict, static_columns: list, dedup_columns: list, Debug: str = "N"): """Deletes superfluous records from a data set Splits the input data set into two data frames. The first contains the essential, higher information records based on the character columns defined by the input parameter dedup_columns. The second data frame contains the superfluous, lower information records. Parameters ---------- data_set_in : dict Data set containing possible superfluous records. Can be in the form of a DataFrame as well. static_columns : list List of static columns. Since they are invariant, they are akin to a natural key. dedup_columns : list Columns that contain potentially superfluous information. This function's algorithm works on these columns. Returns ------- essential : DataFrame The non-superfluous records superfluous : DataFrame The superfluous records """ essential = pd.DataFrame(data_set_in) essential['RcrdID'] = essential.index only_index = essential[['RcrdID']] essential_columns = essential.columns.values.tolist() # print("Entering function") # print(essential) #Need to throw an error here. if set(static_columns) != set(essential_columns).intersection(static_columns): print("static_columns are not all in data_set_in") print(set(static_columns), "vs.", set(essential_columns).intersection(static_columns)) if set(dedup_columns) != set(essential_columns).intersection(dedup_columns): print("dedup_columns are not all in data_set_in") print(set(dedup_columns), "vs.", set(essential_columns).intersection(dedup_columns)) # Seperate ou the exact duplicates across all static_columns + # dedup_columns. superfluous = essential[essential.duplicated(subset=static_columns + dedup_columns)].copy() superfluous['WhyDeleted'] = 'DISTINCT' essential = essential.drop_duplicates(subset=static_columns + dedup_columns) # This for loop moves through dedup_columns, moving from left to # right, identifying and then removing superfluous records with each # iteration. for dedup_column in dedup_columns: print("*************** dedup_column ******************", dedup_column) # print(type(dedup_column)) sort_columns_except_dedup_column = static_columns.copy() + dedup_columns.copy() sort_columns_except_dedup_column.remove(dedup_column) # Place current dedup_column onto end of list of columns and # then sort. sort_columns = sort_columns_except_dedup_column + [dedup_column] essential = essential.sort_values(by=sort_columns) # static_columns_firsts = essential.groupby(by=static_columns).first() static_columns_firsts = essential.groupby(by=sort_columns_except_dedup_column).first() static_columns_firsts = static_columns_firsts.reset_index() static_columns_firsts['FirstRowInStaticSet'] = True # print("Before merge with static_columns_first") # print(essential) essential =pd.merge(essential, static_columns_firsts, how='left') # With the dataframe sorted add a row number now so it can # be sorted descending later. essential["row_num"] = np.arange(len(essential)) # Extract out lagged version of current dedup_columns and # then merge it back in # print("Before LaggedDedupColumnMerge") # print(essential) active_dedup_column = essential[dedup_column].squeeze().shift(periods=1) essential = essential.merge(active_dedup_column.rename("LaggedDedupColumn"), left_index=True, right_index=True) # print("LaggedDedupColumnMerge") # print(essential) # # This block of code finds the first record of in each sets of static_columns dedup_column_series = essential[dedup_column].values.astype(str) lagged_dedup_column_series = essential["LaggedDedupColumn"].values.astype(str) # Determine if the dedup column contains, starting at position # zero, the lagged dedup column. If so then returns 0 for row. essential = essential.assign(FirstOccurance=find(dedup_column_series, lagged_dedup_column_series)) # print(essential.assign(Winner3=find(lagged_dedup_column_series, dedup_column_series))) essential['SupersetCount'] = 0 for i, row in essential.iterrows(): if (row['FirstOccurance'] != 0 or row['LaggedDedupColumn'] == row[dedup_column])or row['FirstRowInStaticSet'] == True: # to guard against bridging between static cols SupersetCount = 0 else: SupersetCount = SupersetCount + 1 essential.at[i,'SupersetCount'] = SupersetCount # print("50, Right After Counts") # print(essential) # print(len(sort_columns_except_dedup_column)) essential.sort_values(by=sort_columns_except_dedup_column + ['row_num'], ascending=[True for _ in range(len(sort_columns_except_dedup_column))] + [False], inplace=True) # Lag SupersetCount then merge it back in. active_dedup_column = essential['SupersetCount'].squeeze().shift(periods=1) essential = essential.merge(active_dedup_column.rename("SupersetCountLagged"), left_index=True, right_index=True) # print("Before splitting:", essential) essential['Retain'] = np.where(essential['SupersetCount'] < essential['SupersetCountLagged'], False, True) # print("60, Marked up") # print(essential) # Masks are fun. mask = essential['Retain'] == True # Create DataFrame of the current rows to be deleted. _superfluous = essential[~mask] _superfluous = _superfluous[essential_columns] # Set the KeepRecord = to the dedup_column value. This is # the "reason" why the row was deleted. _superfluous["WhyDeleted"] = dedup_column # Union in the current delete records to the dataframe of all # deleted records. if not _superfluous.empty: superfluous = pd.concat([superfluous, _superfluous]) # Now remove the current delete records from the master dataframe. essential = essential[mask] essential = essential[essential_columns] # essential = essential.drop(["Retain"], axis=1) essential = only_index.reset_index().merge(essential, how="inner").set_index('index') essential.index.name = None superfluous = only_index.reset_index().merge(superfluous, how="inner").set_index('index') superfluous.index.name = None # essential.drop(['RcrdID'], axis = 1, inplace=True) # superfluous.drop(['RcrdID'], axis = 1, inplace=True) essential.set_index('RcrdID', inplace=True) superfluous.set_index('RcrdID', inplace=True) return (essential, superfluous) # # simple example # data_in = { # 'CohortID': [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], # 'FirstName': ['J', 'Jona', 'John', 'Jonath', 'Jo', 'Jon','John', 'Jh', 'Jonathon', 'Jhon', 'Jr', ] # } # results_out, results_deleted_out = delete_superfluous_records(data_in, # ["CohortID"], # ["FirstName"]) # print('data_in') # print(pd.DataFrame(data_in)) # print('results_out') # print(results_out) # print('results_deleted_out') # print(results_deleted_out)