-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
285 lines (233 loc) · 9.94 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
from models import *
def apply_clone_with_blending(image, clone, clone_pt, target_pt, radius):
mask = np.zeros((radius * 2, radius * 2), dtype=np.uint8)
cv2.circle(mask, (radius, radius), radius, 255, -1)
# Augmenter le flou gaussien pour adoucir davantage les bords
mask = cv2.GaussianBlur(mask, (31, 31), 0)
for y in range(-radius, radius):
for x in range(-radius, radius):
if x ** 2 + y ** 2 <= radius ** 2:
src_x = int(clone_pt['x']) + x
src_y = int(clone_pt['y']) + y
tgt_x = int(target_pt['x']) + x
tgt_y = int(target_pt['y']) + y
if (0 <= src_x < image.shape[1] and 0 <= src_y < image.shape[0] and
0 <= tgt_x < clone.shape[1] and 0 <= tgt_y < clone.shape[0]):
alpha = mask[y + radius, x + radius] / 255.0
clone[tgt_y, tgt_x] = (1.0 - alpha) * image[tgt_y, tgt_x] + alpha * image[src_y, src_x]
def docx_get_ressource_in_request(request, id, namefile):
idblock = int(request.json["ressource"])
OriginalDocx = docx.Document(os.path.join(app.config['UPLOAD_FOLDER'], str(id), namefile))
Html = ConverterAPI.ParaDocxToHtml(ConvAPI, OriginalDocx, idblock)
PreviousHtml = ConverterAPI.ParaDocxToHtml(ConvAPI, OriginalDocx, idblock - 1)
if PreviousHtml == "<p><br></p>":
PreviousHtml = "<p></p>"
NextHtml = ConverterAPI.ParaDocxToHtml(ConvAPI, OriginalDocx, idblock + 1)
if NextHtml == "<p><br></p>":
NextHtml = "<p></p>"
return Html, PreviousHtml, NextHtml
def verify_owner(project_id):
return str(flask_login.current_user.id) == str(Project.query.filter_by(id=project_id).first().Owner)
def manage_csv_memory(project_id):
csv_dir = os.path.join("static", "csv")
csv_path = os.path.join(csv_dir, f"memory{project_id}.csv")
os.makedirs(csv_dir, exist_ok=True)
# Crée le fichier CSV si nécessaire
if not os.path.isfile(csv_path):
with open(csv_path, "w") as csvfile:
writer = csv.writer(csvfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
project = Project.query.filter_by(id=project_id).first()
writer.writerow([project.Source_Lang, project.Target_Lang])
def get_project_data_for_get_method(project_id):
project = Project.query.filter_by(id=project_id).first()
user = User.query.filter_by(id=flask_login.current_user.id).first()
# Récupérer le chemin du dossier de projet
project_folder = os.path.join(app.config['UPLOAD_FOLDER'], str(project_id))
# Trouver le fichier DOCX avec le nom le plus court
docx_files = [f for f in os.listdir(project_folder) if f.endswith('.docx')]
if not docx_files:
total_sections = 0 # Aucun fichier trouvé
else:
# Trouver le fichier avec le nom le plus court
shortest_docx = min(docx_files, key=len)
# Ouvrir le fichier DOCX et compter les paragraphes
doc_path = os.path.join(project_folder, shortest_docx)
doc = Document(doc_path)
total_sections = len(doc.paragraphs) # Compter les paragraphes
return (
project.Type,
project.Extension,
project.Last_Block,
os.listdir(project_folder),
user.KeepStyle,
user.Autocomplete,
user.TranslatorSettings,
project,
total_sections
)
def get_project_data_for_post_method(project_id):
project = Project.query.filter_by(id=project_id).first()
user = User.query.filter_by(id=flask_login.current_user.id).first()
return (
project.Type,
project.Extension,
project.Source_Lang,
project.Target_Lang,
user.TranslatorProvider,
user.TranslatorSettings,
user.Formality,
user.ApiKey
)
def get_context_paragraphs(i, parasin, direction="before"):
context = []
# Set the range for previous (before) or next (after) paragraphs
if direction == "before":
start = max(0, i - 5)
end = i
step = 1
else:
start = i + 1
end = min(len(parasin), i + 6)
step = 1
# Iterate through the range to gather context paragraphs
for j in range(start, end, step):
para_text = parasin[j].text.strip()
if para_text:
context.append(para_text)
return " ".join(context)
def Glos(): # fonction formatage glossaire
formatedGlo = ""
i = 0
Gloquery = Glossary.query.order_by(Glossary.Source, Glossary.Target).with_entities(Glossary.Source, Glossary.Target).all()
if Gloquery is None:
return formatedGlo
else:
for _ in Gloquery:
formatedGlo = formatedGlo + str(Gloquery[i][0]) + "\t" + str(Gloquery[i][1]) + "\n"
i = i + 1
return formatedGlo[:-1]
def update_added_txt_and_restart_lt(kill=True):
# Récupération des mots dans la base de données
vocab_entries = Vocab.query.all()
# Organisation par langue pour les entrées de vocabulaire
vocab_by_lang = {}
for entry in vocab_entries:
lang = entry.Lang
if lang not in vocab_by_lang:
vocab_by_lang[lang] = []
# Format de l'entrée : mot;forme de base;catégorie genre nombre
Gender = entry.Gender or "e" # Vide si non défini
Plural = "sp" if entry.Plural else "s"
vocab_line = f"{entry.Word};{entry.Word};{entry.Grammatical_Category} {Gender} {Plural}".strip()
vocab_by_lang[lang].append(vocab_line)
# Mise à jour des fichiers `added.txt` pour chaque langue
for lang, entries in vocab_by_lang.items():
added_txt_path = os.path.join(ADDED_FILES_DIR, lang, "added.txt")
backup_path = os.path.join(ADDED_FILES_DIR, lang, "added_backup.txt")
# Sauvegarder l'original `added.txt` si non déjà fait
if not os.path.exists(backup_path):
os.rename(added_txt_path, backup_path)
# Charger les données depuis le backup
with open(backup_path, "r") as backup_file:
original_content = backup_file.readlines()
# Écrire l'original + les entrées de la base dans `added.txt`
with open(added_txt_path, "w") as f:
f.writelines(original_content) # Ajouter le contenu d'origine
f.write("\n")
f.write("\n".join(entries)) # Ajouter les mots du vocab
try:
if system == "Linux" or system == "Darwin":
if kill:
run(["pkill", "-f", "languagetool-server"])
Popen(["java", "-cp", os.path.join(LANGUAGETOOL_PATH, "languagetool-server.jar"), "org.languagetool.server.HTTPServer", "--port", "8081", "--allow-origin"])
elif system == "Windows":
if kill:
run(["taskkill", "/F", "/IM", "java.exe"], check=True)
Popen(["java", "-cp", os.path.join(LANGUAGETOOL_PATH, "languagetool-server.jar"), "org.languagetool.server.HTTPServer", "--port", "8081", "--allow-origin"], creationflags=subprocess.CREATE_NEW_CONSOLE) # Détache le processus sur Windows
except Exception: pass
def start_celery_worker():
system = platform.system()
# Vérifie si un worker Celery est déjà en cours d'exécution
for process in psutil.process_iter(['name', 'cmdline']):
# Vérifie si le processus est un worker Celery avec les arguments spécifiques
if "celery" in process.info['name'] and "-A app.celery worker" in ' '.join(process.info['cmdline']):
print("Un worker Celery est déjà en cours d'exécution.")
return # Termine la fonction si un worker existe déjà
# Si aucun worker actif n'est trouvé, démarre un nouveau worker Celery
if system in ["Linux", "Darwin"]:
# Linux ou macOS
command = ["celery", "-A", "app.celery", "worker", "--loglevel=info"]
Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
elif system == "Windows":
# Windows
command = ["celery", "-A", "app.celery", "worker", "--loglevel=info", "--pool=solo"]
Popen(command, creationflags=subprocess.CREATE_NEW_CONSOLE)
def start_celery_beat():
system = platform.system()
# Vérifie si un processus celery beat est déjà en cours d'exécution
for process in psutil.process_iter(['name', 'cmdline']):
# Vérifie si le processus est un processus celery beat avec les arguments spécifiques
if "celery" in process.info['name'] and "-A app.celery beat" in ' '.join(process.info['cmdline']):
print("Une instance de celery beat est déjà en cours d'exécution.")
return # Termine la fonction si celery beat est déjà en cours
# Si aucun processus beat n'est trouvé, lance un nouveau processus celery beat
if system in ["Linux", "Darwin"]:
# Linux ou macOS
command = ["celery", "-A", "app.celery", "beat", "--loglevel=info"]
Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
elif system == "Windows":
# Windows
command = ["celery", "-A", "app.celery", "beat", "--loglevel=info"]
Popen(command, creationflags=subprocess.CREATE_NEW_CONSOLE)
def restart_celery_workers():
for process in psutil.process_iter(['name', 'cmdline']):
if "celery" in process.info['name'] and "-A app.celery worker" in ' '.join(process.info['cmdline']):
process.terminate()
process.wait()
start_celery_worker()
def translate_paragraph(index, para_text, proxies_queue, max_retries=float('inf'), prev_paragraph: str = "", next_paragraph: str = "", formatedGlossary = ""):
translation = None
numberoftries = 0
proxy = None
while translation is None and numberoftries < max_retries:
numberoftries += 1
time.sleep(0.5)
# Get a proxy from the queue
try:
proxy = proxies_queue.get_nowait()
except:
print(f"No proxies available for paragraph {index}")
break
translator = PersonalDeepl(request=Request(proxy))
glossary_df = pd.read_csv(StringIO(formatedGlossary), sep="\t", header=None, names=['EN', 'FR'])
glossary = BaseTranslator.FormatedGlossary(dataframe=glossary_df, source_language='en', target_language='fr')
try:
translation = translator.translate(
text=para_text,
destination_language="fr",
source_language="en",
formality='informal',
glossary=glossary,
prev_paragraph=prev_paragraph,
next_paragraph=next_paragraph
).result
except requests.exceptions.Timeout:
pass
#print(f"Request timed out for paragraph {index}.")
except Exception as e:
pass
#print(f'Error for paragraph {index}: {str(e)}')
finally:
# Put the proxy back into the queue
proxies_queue.put(proxy)
if translation is None:
print(f"Failed to translate paragraph {index} after {max_retries} attempts.")
if translation == '':
return index, para_text, proxy
return index, translation, proxy
def test_proxy(proxy):
try:
response = requests.get("https://google.com/", proxies={"https": f"http://{proxy}"}, timeout=5)
return response.status_code in [200, 429]
except requests.RequestException:
return False