Skip to content

Commit

Permalink
Merge pull request #37 from ssciwr/pseudonymization-classstructure
Browse files Browse the repository at this point in the history
Restructure pseudonymization into class
  • Loading branch information
fexfl authored Oct 29, 2024
2 parents 4a2883b + 8382b34 commit e04186d
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 92 deletions.
258 changes: 166 additions & 92 deletions mailcom/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,86 +10,175 @@
lang = "es"
# lang = "fr"
# path where the input files can be found
path_input = Path("./test/data/")
path_input = Path("./mailcom/test/data/")
# path where the output files should be written to
# this is generated if not present yet
path_output = Path("../data/out/")
path_output = Path("./data/out/")
output_filename = "dict"
# the ner tool - currently only "transformers"
tool = "transformers"
# please do not modify below this section unless you know what you are doing


def get_sentences(doc):
# spacy
text = []
for sent in doc.sents:
text.append(str(sent))
return text


def process_doc(doc, ner_tool="transformers", text=None):
# remove any named entities
if ner_tool == "transformers":
if doc:
# found named entities
entlist = []
newtext = text
for entity in doc:
ent_string = entity["entity"] # noqa
# here we could check that string is "I-PER"
ent_conf = entity["score"] # noqa
ent_position = entity["start"], entity["end"]
# Here we have to be careful - tokenization with
# transformers is quite different from spacy/stanza/flair
# here we get character ids
entlist.append(ent_position)
# now replace respective characters
newtext = (
newtext[: ent_position[0]]
class Pseudonymize:
def __init__(self):

self.spacy_default_model_dict = {
"es": "es_core_news_md",
"fr": "fr_core_news_md",
}

self.pseudo_first_names = {
"es": [
"José",
"Angel",
"Alex",
"Ariel",
"Cruz",
"Fran",
"Arlo",
"Adri",
"Marce",
"Mati",
],
"fr": [
"Claude",
"Dominique",
"Claude",
"Camille",
"Charlie",
"Florence",
"Francis",
"Maxime",
"Remy",
"Cécile",
],
}

# records the already replaced names in an email
self.used_first_names = {}

def init_spacy(self, language: str, model="default"):
if model == "default":
model = self.spacy_default_model_dict[language]
try:
# disable not needed components
self.nlp_spacy = sp.load(
model, exclude=["morphologizer", "attribute_ruler", "lemmatizer", "ner"]
)
except OSError:
raise OSError("Could not find {} in standard directory.".format(model))

self.nlp_spacy = sp.load(model)

def init_transformers(
self,
model="xlm-roberta-large-finetuned-conll03-english",
model_revision_number="default",
):
# TODO: Model revision number

# ner_recognizer = pipeline("token-classification")
self.ner_recognizer = pipeline(
"token-classification", model=model, aggregation_strategy="simple"
)

def reset(self):
# reset used names for processing a new email
self.used_first_names.clear()

def get_sentences(self, input_text):
doc = self.nlp_spacy(input_text)
text_as_sents = []
for sent in doc.sents:
text_as_sents.append(str(sent))
return text_as_sents

def get_ner(self, sentence):
ner = self.ner_recognizer(sentence)
return ner

def pseudonymize_per(self, new_sentence, nelist):
unique_ne_list = list(dict.fromkeys(nelist))
for ne in unique_ne_list:
# choose the pseudonym
nm_list = self.used_first_names
pseudo_list = self.pseudo_first_names
pseudonym = ""
name_variations = [
ne,
ne.lower(),
ne.title(),
]
# if this name has been replaced before, choose the same pseudonym
for nm_var in name_variations:
pseudonym = nm_list.get(nm_var, "")
if pseudonym != "":
break
# if none is found, choose a new pseudonym
if pseudonym == "":
try:
pseudonym = pseudo_list["fr"][
len(nm_list)
] # reaches end of the list
except IndexError:
pseudonym = pseudo_list["fr"][0]
nm_list[ne] = pseudonym
# replace all occurences with pseudonym
new_sentence = new_sentence.replace(ne, pseudonym)
return new_sentence

def pseudonymize_ne(self, ner, sentence):
# remove any named entities
entlist = []
nelist = []
new_sentence = sentence
for i in range(len(ner)):
entity = ner[i]
ent_string = entity["entity_group"] # noqa
# here we could check that string is "PER"
ent_conf = entity["score"] # noqa
ent_position = entity["start"], entity["end"]
# Here we have to be careful - tokenization with
# transformers is quite different from spacy/stanza/flair
# here we get character ids
entlist.append(ent_position)
# now replace respective characters
# replace PER
if ent_string == "PER":
# add the name of this entity to list
nelist.append(entity["word"])
else:
# Locations and Organizations
new_sentence = (
new_sentence[: (ent_position[0])]
+ "x" * (ent_position[1] - ent_position[0])
+ newtext[ent_position[1] :] # noqa
+ new_sentence[(ent_position[1]) :] # noqa
)
newlist = [newtext]
else:
# no named entities found
newlist = [text]
return newlist


def init_spacy(lang):
# the md models seem most accurate
if lang == "es":
# model = "es_core_news_sm"
model = "es_core_news_md"
# model = "es_core_news_lg"
# model = "es_dep_news_trf"
elif lang == "fr":
# model = "fr_core_news_sm"
model = "fr_core_news_md"
# model = "fr_core_news_lg"
# model = "fr_dep_news_trf"
else:
print("model not found, aborting")
exit()
# initialize nlp pipeline
try:
# disable not needed components
nlp = sp.load(
model, exclude=["morphologizer", "attribute_ruler", "lemmatizer", "ner"]
)
except OSError:
raise OSError("Could not find {} in standard directory.".format(model))
nlp = sp.load(model)
return nlp
# replace all unique PER now
new_sentence = self.pseudonymize_per(new_sentence, nelist)

newlist = [new_sentence]
return newlist

def pseudonymize_numbers(self, sentence):
sent_as_list = list(sentence)
sent_as_list = [char if not char.isdigit() else "x" for char in sent_as_list]
return "".join(sent_as_list)

def concatenate(self, sentences):
return " ".join(sentences)

def init_transformers():
# ner_recognizer = pipeline("token-classification")
ner_recognizer = pipeline(
"token-classification", model="xlm-roberta-large-finetuned-conll03-english"
)
return ner_recognizer
def pseudonymize(self, text: str):
self.reset()
sentences = self.get_sentences(text)
pseudonymized_sentences = []
for sent in sentences:
ner = self.get_ner(sent)
ps_sent = " ".join(self.pseudonymize_ne(ner, sent)) if ner else sent
ps_sent = self.pseudonymize_numbers(ps_sent)
pseudonymized_sentences.append(ps_sent)
return self.concatenate(pseudonymized_sentences)


def check_dir(path: str) -> bool:
Expand Down Expand Up @@ -120,33 +209,18 @@ def make_dir(path: str):
io = InoutHandler(path_input)
io.list_of_files()
# html_files = list_of_files(path_input, "html")
pseudonymizer = Pseudonymize()
pseudonymizer.init_spacy("fr")
pseudonymizer.init_transformers()
for file in io.email_list:
print("Parsing input file {}".format(file))
text = io.get_text(file)
text = io.get_html_text(text)
xml = io.data_to_xml(text)
io.write_file(xml, path_output / output_filename)
# print(text)
# print(io.email_content["date"])
# print(io.email_content["attachment"])
# print(io.email_content["attachement type"])
# skip this text if email could not be parsed
if not text:
continue
### nlp = init_spacy(sprache)
# doc_spacy = nlp_spacy(text) ### fehlt - alte version
# text = get_sentences(doc_spacy)
# start with first line
# here you can limit the number of sentences to parse
# newlist = []
# max_i = len(text) ### weg
### init transformers
# for i in range(0, max_i):
# if tool == "transformers": ### gibt nur eins
# nlps = nlp_transformers(text[i]) ### fehlty bzw process_doc
# doc = nlps
# newlist.append(process_doc(doc, ner_tool=tool, text=text[i]))
# newlist[i] = " ".join(newlist[i])
# join the new and old lines for comparison
# printout = "New: " + " ".join(newlist) + "\n"
# printout = printout + "Old: " + " ".join(text[0:max_i])
# write_file(printout, path_output + "/" + file)
continue
# Test functionality of Pseudonymize class
output_text = pseudonymizer.pseudonymize(text)
print("New text:", output_text)
print("Old text:", text)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies = [
"eml_parser",
"bs4",
"dicttoxml",
"torch",
]

[project.optional-dependencies]
Expand Down

0 comments on commit e04186d

Please sign in to comment.