from spacy.cli import download import spacy from spacy.pipeline import Sentencizer from funcs.presidio_analyzer_custom import analyze_dict spacy.prefer_gpu() def spacy_model_installed(model_name): try: import en_core_web_sm en_core_web_sm.load() print("Successfully imported spaCy model") nlp = spacy.load("en_core_web_sm") #print(nlp._path) except: download(model_name) nlp = spacy.load(model_name) print("Successfully imported spaCy model") #print(nlp._path) return nlp #if not is_model_installed(model_name): # os.system(f"python -m spacy download {model_name}") model_name = "en_core_web_sm" nlp = spacy_model_installed(model_name) import re import secrets import base64 import time import pandas as pd from faker import Faker from presidio_analyzer import AnalyzerEngine, BatchAnalyzerEngine, PatternRecognizer from presidio_anonymizer import AnonymizerEngine, BatchAnonymizerEngine from presidio_anonymizer.entities import OperatorConfig from typing import List # Function to Split Text and Create DataFrame using SpaCy def expand_sentences_spacy(df, colname, custom_delimiters:List[str]=[], nlp=nlp): expanded_data = [] # if not custom_delimiters: # custom_delimiters = [] df = df.drop('index', axis = 1, errors="ignore").reset_index(names='index') # sentencizer = Sentencizer() # new_punct_chars = sentencizer.default_punct_chars # new_punct_chars.extend(custom_delimiters) # config = {"punct_chars": new_punct_chars} # nlp.add_pipe("sentencizer", config=config) for index, row in df.iterrows(): doc = nlp(row[colname]) for sent in doc.sents: expanded_data.append({'document_index': row['index'], colname: sent.text}) return pd.DataFrame(expanded_data) # def expand_sentences_spacy(df, colname, custom_delimiters:List[str]=[], nlp=nlp): # #print("Custom delimiters:", custom_delimiters) # expanded_data = [] # df = df.drop('index', axis = 1, errors="ignore").reset_index(names='index') # sentencizer = Sentencizer() # new_punct_chars = sentencizer.default_punct_chars # if custom_delimiters: # new_punct_chars.extend(custom_delimiters) # pattern = "(" + "|".join(re.escape(punct) for punct in new_punct_chars) + ")" # #print("Patterns:", pattern) # split_list = [] # for idx, string in enumerate(df[colname]): # new_split = re.split(pattern, string) # for n, sentence in enumerate(new_split): # if sentence: # # If there is a split delimiter in the 'sentence' after, add it to the previous sentence as it will be removed at a later step # if n + 1 < len(new_split): # if new_split[n + 1]: # # If the next split is in the list of split characters, then add it to this current sentence # if new_split[n + 1] in new_punct_chars: # split_list.append({'document_index': idx, colname: sentence + new_split[n + 1]}) # else: # split_list.append({'document_index': idx, colname: sentence}) # return pd.DataFrame(split_list) def anon_consistent_names(df): # ## Pick out common names and replace them with the same person value df_dict = df.to_dict(orient="list") analyzer = AnalyzerEngine() batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer) analyzer_results = batch_analyzer.analyze_dict(df_dict, language="en") analyzer_results = list(analyzer_results) # + tags=[] text = analyzer_results[3].value # + tags=[] recognizer_result = str(analyzer_results[3].recognizer_results) # + tags=[] recognizer_result # + tags=[] data_str = recognizer_result # abbreviated for brevity # Adjusting the parse_dict function to handle trailing ']' # Splitting the main data string into individual list strings list_strs = data_str[1:-1].split('], [') def parse_dict(s): s = s.strip('[]') # Removing any surrounding brackets items = s.split(', ') d = {} for item in items: key, value = item.split(': ') if key == 'score': d[key] = float(value) elif key in ['start', 'end']: d[key] = int(value) else: d[key] = value return d # Re-running the improved processing code result = [] for lst_str in list_strs: # Splitting each list string into individual dictionary strings dict_strs = lst_str.split(', type: ') dict_strs = [dict_strs[0]] + ['type: ' + s for s in dict_strs[1:]] # Prepending "type: " back to the split strings # Parsing each dictionary string dicts = [parse_dict(d) for d in dict_strs] result.append(dicts) #result # + tags=[] names = [] for idx, paragraph in enumerate(text): paragraph_texts = [] for dictionary in result[idx]: if dictionary['type'] == 'PERSON': paragraph_texts.append(paragraph[dictionary['start']:dictionary['end']]) names.append(paragraph_texts) # + tags=[] # Flatten the list of lists and extract unique names unique_names = list(set(name for sublist in names for name in sublist)) # + tags=[] fake_names = pd.Series(unique_names).apply(fake_first_name) # + tags=[] mapping_df = pd.DataFrame(data={"Unique names":unique_names, "Fake names": fake_names}) # + tags=[] # Convert mapping dataframe to dictionary # Convert mapping dataframe to dictionary, adding word boundaries for full-word match name_map = {r'\b' + k + r'\b': v for k, v in zip(mapping_df['Unique names'], mapping_df['Fake names'])} # + tags=[] name_map # + tags=[] scrubbed_df_consistent_names = df.replace(name_map, regex = True) # + tags=[] scrubbed_df_consistent_names return scrubbed_df_consistent_names def detect_file_type(filename): """Detect the file type based on its extension.""" if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')): return 'csv' elif filename.endswith('.xlsx'): return 'xlsx' elif filename.endswith('.parquet'): return 'parquet' else: raise ValueError("Unsupported file type.") def read_file(filename): """Read the file based on its detected type.""" file_type = detect_file_type(filename) if file_type == 'csv': return pd.read_csv(filename, low_memory=False) elif file_type == 'xlsx': return pd.read_excel(filename) elif file_type == 'parquet': return pd.read_parquet(filename) def anonymise_script(df, chosen_col, anon_strat): #print(df.shape) #df_chosen_col_mask = (df[chosen_col].isnull()) | (df[chosen_col].str.strip() == "") #print("Length of input series blank at start is: ", df_chosen_col_mask.value_counts()) # DataFrame to dict df_dict = pd.DataFrame(data={chosen_col:df[chosen_col].astype(str)}).to_dict(orient="list") analyzer = AnalyzerEngine() # Add titles to analyzer list titles_recognizer = PatternRecognizer(supported_entity="TITLE", deny_list=["Mr","Mrs","Miss", "Ms", "mr", "mrs", "miss", "ms"]) analyzer.registry.add_recognizer(titles_recognizer) batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer) anonymizer = AnonymizerEngine() batch_anonymizer = BatchAnonymizerEngine(anonymizer_engine = anonymizer) print("Identifying personal data") analyse_tic = time.perf_counter() #analyzer_results = batch_analyzer.analyze_dict(df_dict, language="en") analyzer_results = analyze_dict(batch_analyzer, df_dict, language="en") #print(analyzer_results) analyzer_results = list(analyzer_results) analyse_toc = time.perf_counter() analyse_time_out = f"Analysing the text took {analyse_toc - analyse_tic:0.1f} seconds." print(analyse_time_out) # Generate a 128-bit AES key. Then encode the key using base64 to get a string representation key = secrets.token_bytes(16) # 128 bits = 16 bytes key_string = base64.b64encode(key).decode('utf-8') # Create faker function (note that it has to receive a value) fake = Faker("en_UK") def fake_first_name(x): return fake.first_name() # Set up the anonymization configuration WITHOUT DATE_TIME replace_config = eval('{"DEFAULT": OperatorConfig("replace")}') redact_config = eval('{"DEFAULT": OperatorConfig("redact")}') hash_config = eval('{"DEFAULT": OperatorConfig("hash")}') mask_config = eval('{"DEFAULT": OperatorConfig("mask", {"masking_char":"*", "chars_to_mask":100, "from_end":True})}') people_encrypt_config = eval('{"PERSON": OperatorConfig("encrypt", {"key": key_string})}') # The encryption is using AES cypher in CBC mode and requires a cryptographic key as an input for both the encryption and the decryption. fake_first_name_config = eval('{"PERSON": OperatorConfig("custom", {"lambda": fake_first_name})}') if anon_strat == "replace": chosen_mask_config = replace_config if anon_strat == "redact": chosen_mask_config = redact_config if anon_strat == "hash": chosen_mask_config = hash_config if anon_strat == "mask": chosen_mask_config = mask_config if anon_strat == "encrypt": chosen_mask_config = people_encrypt_config elif anon_strat == "fake_first_name": chosen_mask_config = fake_first_name_config # I think in general people will want to keep date / times - NOT FOR TOPIC MODELLING #keep_date_config = eval('{"DATE_TIME": OperatorConfig("keep")}') #combined_config = {**chosen_mask_config, **keep_date_config} combined_config = {**chosen_mask_config}#, **keep_date_config} combined_config print("Anonymising personal data") anonymizer_results = batch_anonymizer.anonymize_dict(analyzer_results, operators=combined_config) #print(anonymizer_results) scrubbed_df = pd.DataFrame(data={chosen_col:anonymizer_results[chosen_col]}) scrubbed_series = scrubbed_df[chosen_col] #print(scrubbed_series[0:6]) #print("Length of output series is: ", len(scrubbed_series)) #print("Length of input series at end is: ", len(df[chosen_col])) #scrubbed_values_mask = (scrubbed_series.isnull()) | (scrubbed_series.str.strip() == "") #df_chosen_col_mask = (df[chosen_col].isnull()) | (df[chosen_col].str.strip() == "") #print("Length of input series blank at end is: ", df_chosen_col_mask.value_counts()) #print("Length of output series blank is: ", scrubbed_values_mask.value_counts()) # Create reporting message out_message = "Successfully anonymised" if anon_strat == "encrypt": out_message = out_message + ". Your decryption key is " + key_string + "." return scrubbed_series, out_message def do_anonymise(in_file, anon_strat, chosen_cols): # Load file anon_df = pd.DataFrame() if in_file: for match_file in in_file: match_temp_file = pd.read_csv(match_file.name, delimiter = ",", low_memory=False)#, encoding='cp1252') anon_df = pd.concat([anon_df, match_temp_file]) # Split dataframe to keep only selected columns all_cols_original_order = list(anon_df.columns) anon_df_part = anon_df[chosen_cols] anon_df_remain = anon_df.drop(chosen_cols, axis = 1) # Anonymise the selected columns anon_df_part_out, out_message = anonymise_script(anon_df_part, anon_strat) # Rejoin the dataframe together anon_df_out = pd.concat([anon_df_part_out, anon_df_remain], axis = 1) anon_df_out = anon_df_out[all_cols_original_order] # Export file out_file_part = re.sub(r'\.csv', '', match_file.name) anon_export_file_name = out_file_part + "_anon_" + anon_strat + ".csv" anon_df_out.to_csv(anon_export_file_name, index = None) return out_message, anon_export_file_name