-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
1700 lines (1411 loc) · 70 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import io
import gc
import time
import json
from typing import List, Optional
import wave
import uuid
import shutil
import torch
import random
import logging
import webbrowser
import threading
import soundfile as sf
import pytesseract
import torch
import pycountry
from langdetect.lang_detect_exception import LangDetectException
from langdetect import detect
import cv2
import whisper
import numpy as np
from datetime import datetime
from os.path import splitext, exists
from collections import OrderedDict
from pynput import keyboard
from pdf2image import convert_from_path
from PIL import Image, ImageEnhance
from pydub import AudioSegment
from pydub.playback import play
import sounddevice as sd
from scipy.io.wavfile import write
import gradio as gr
import tempfile
# PyMuPDF
import fitz
# PyPDF2
from PyPDF2 import PdfReader, PdfWriter
from PyPDF2.generic import NameObject, TextStringObject
# Langchain and related imports
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
from langchain_core.messages import SystemMessage
from langchain_community.chat_models import ChatOllama
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain_experimental.text_splitter import SemanticChunker
from langchain_experimental.llms.ollama_functions import OllamaFunctions
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.indexes import SQLRecordManager, index
from langchain_elasticsearch import ElasticsearchStore
from langchain.chains import LLMChain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains.retrieval import create_retrieval_chain
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_core.prompts import (
ChatPromptTemplate,
HumanMessagePromptTemplate,
MessagesPlaceholder,
)
from langchain.memory import ConversationBufferMemory
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
import tkinter as tk
from tkinter import filedialog
import subprocess
import platform
import sys
import re
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Constants
POPPLER_PATH = r'.\installer_files\poppler-24.07.0\Library\bin'
program_files = os.environ.get('ProgramFiles')
tessdata_dir = os.path.join("installer_files", "tessdata")
tessdata_dir_config = f'--tessdata-dir "{tessdata_dir}"'
if platform.system() == 'Windows':
program_files = os.environ.get('PROGRAMFILES', 'C:\\Program Files')
PYTESSERACT_CMD = os.path.join(program_files, 'Tesseract-OCR', 'tesseract.exe')
pytesseract.pytesseract.tesseract_cmd = PYTESSERACT_CMD
PARAM_FILE = "params.json"
LOG_FILE = "process.log"
is_recording = False # To track if we are currently recording
recording = None # Global variable to hold the recording data
stream = None # To handle the audio stream
filename = "output_combined.wav" # File to save the recording
metadata_llm = ChatOllama(model="llama3.1:8b", temperature=0.9)
naming_llm = ChatOllama(model="llama3.1:8b", temperature=0.5, num_predict=30)
embed_model = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-MiniLM-L6-v2", model_kwargs = {'device': device})
session_params = {
"filter_key": "",
"filter_value": ""
}
system_prompt = """You are a helpful assistant with the name Jarvis created by Eren Kalinsazlioglu at Enpoi co. that has access to users' documents. Your primary goal is to be as helpful and precise as possible in answering the users' questions. If the user asks a specific or personalized question that you do not have knowledge of, you can retrieve the relevant documents like this:
retriever_tool: [here describe what kind of document the user wants to retrieve, this will be used for the similarity search so write some queries that are likely to be in the document]
only use the retriever when you need the document. Do not include any filter text or explanation, only the retriever calling.
Try to answer general questions without using the retriever. If you need to provide information about a specific document."""
metadata_template = """
You are tasked with extracting detailed metadata information from the content of a document. Follow these detailed guidelines to ensure the metadata is comprehensive and accurately reflects the document's content.
**Guidelines**:
1. **Document Type**:
- Identify the type of document (e.g., research paper, article, report).
- Examples: "Research Paper", "Article", "Report", "Forschungsbericht", "Artikel", "Bericht"
2. **Mentions**:
- Extract the main names like persons and companies mentioned in the document.
- Examples: "John Doe", "Acme Corporation", "United Nations", "Johann Schmidt", "Siemens AG", "Vereinte Nationen"
3. **Keywords**:
- Identify relevant keywords central to the document's topic.
- Examples: "Machine Learning", "Climate Change", "Economic Policy", "Maschinelles Lernen", "Klimawandel", "Wirtschaftspolitik"
4. **About**:
- Provide a brief description of the document's purpose and main arguments/findings.
- Examples: "This research paper explores the impact of AI on healthcare, focusing on predictive analytics and patient outcomes.", "Dieses Forschungspapier untersucht die Auswirkungen von KI auf das Gesundheitswesen, mit einem Fokus auf prädiktive Analysen und Patientenergebnisse."
5. **Questions**:
- List questions the document can answer.
- Examples: "What are the benefits of renewable energy?", "How does blockchain technology work?", "Welche Vorteile bietet erneuerbare Energie?", "Wie funktioniert die Blockchain-Technologie?"
6. **Entities**:
- Identify the main entities (people, places, organizations) mentioned.
- Examples: "Albert Einstein", "New York City", "World Health Organization", "Albert Einstein", "New York City", "Weltgesundheitsorganisation"
7. **Summaries**:
- Provide summaries of different sections or key points.
- Examples: "Introduction: Overview of AI in healthcare", "Methodology: Data collection and analysis techniques", "Conclusion: Implications of findings for future research", "Einleitung: Überblick über KI im Gesundheitswesen", "Methodik: Datenerfassungs- und Analysetechniken", "Fazit: Auswirkungen der Ergebnisse auf zukünftige Forschung"
8. **Authors**:
- List the document's authors.
- Examples: "Jane Smith", "John Doe", "Alice Johnson", "Hans Müller", "Peter Schmid", "Anna Meier"
9. **Source**:
- Specify the source or location where the document can be found.
- Examples: "https://example.com/research-paper", "Library of Congress", "Journal of Medical Research", "https://beispiel.de/forschungspapier", "Bibliothek des Kongresses", "Zeitschrift für medizinische Forschung"
10. **Language**:
- Indicate the language(s) the document is written in.
- Examples: "English", "German", "Spanish", "Englisch", "Deutsch", "Spanisch"
11. **Audience**:
- Describe the intended audience for the document.
- Examples: "Healthcare professionals", "University students", "Policy makers", "Gesundheitsfachkräfte", "Universitätsstudenten", "Politische Entscheidungsträger"
**Context**:
{context}
**Task**:
Extract and provide the following metadata from the document's content based on the above guidelines. Ensure that extracted information is in the original language of the document.
**Output Format**:
Return the metadata in the following structured format with no filter text or extra explanation, only give the extracted metadata:
```json
{{
"document_type": "Type of document",
"mentions": ["Main names mentioned"],
"keywords": ["Relevant keywords"],
"about": "Brief description",
"questions": ["Questions the document can answer"],
"entities": ["Main entities mentioned"],
"summaries": ["Summaries of key sections"],
"authors": ["List of authors"],
"source": "Source or location",
"language": "Document language",
"audience": "Intended audience"
}}
```
"""
naming_template = """
You are tasked with generating appropriate and consistent names for documents based on their content. Follow these detailed guidelines to ensure the names are informative, unique, and easy to manage:
1. **Think about your files**:
- Identify the group of files your naming convention will cover.
- Check for established file naming conventions in your discipline or group.
2. **Identify metadata**:
- Include important information to easily locate a specific file.
- Consider including a combination of the following:
- Experiment conditions
- Type of data
- Researcher name/initials
- Lab name/location
- Project or experiment name or acronym
- Experiment number or sample ID (use leading zeros for clarity)
3. **Abbreviate or encode metadata**:
- Standardize categories and/or replace them with 2- or 3-letter codes.
- Document any codes used.
4. **Think about how you will search for your files**:
- Decide what metadata should appear at the beginning.
- Use default ordering: alphabetically, numerically, or chronologically.
5. **Deliberately separate metadata elements**:
- Avoid spaces or special characters in file names.
- Use dashes (-), underscores (_), or capitalize the first letter of each word.
**Example Naming Convention**:
- Format: [Type]_[Project]_[SampleID].[ext]
- Example: FinancialReport_ProjectX_001.pdf
**Context**:
{context}
**Extracted Metadata**:
The extracted metadata contains important information such as keywords, entities, mentions, summaries, and other details that are useful for naming the document. This metadata helps in creating a name that is both descriptive and unique.
{metadata}
**Task**:
Generate a new, unique name for this document based on its content and the provided metadata. The new name should be formal, detailed, and distinctive to avoid confusion with other documents. Ensure the name is concise yet informative, highlighting significant details like names, firms, companies, etc. that capture the essence and purpose of the document. Be specific.
**Output Format**:
Provide only the new name in the following format with no filter or extra explanation, give only the new name: [Type]_[Name]_[YearRange]
**Question**: {question}
"""
# Set up logging
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s:%(levelname)s:%(message)s')
class DocPOIDirectoryLoader(BaseLoader):
def __init__(self, directory_path: str, metadata_path: Optional[str] = None) -> None:
self.directory_path = directory_path
self.metadata_path = metadata_path or directory_path
def load(self) -> List[Document]:
documents = []
for filename in os.listdir(self.directory_path):
file_path = os.path.join(self.directory_path, filename)
metadata_file = os.path.join(self.metadata_path, f"{os.path.splitext(filename)[0]}.json")
if os.path.exists(metadata_file):
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata = json.load(f)
else:
metadata = {}
# Ensure document_id is included in metadata
if 'document_id' not in metadata:
metadata['document_id'] = os.path.splitext(filename)[0]
if filename.endswith('.pdf'):
documents.extend(self.load_pdf(file_path, metadata))
elif filename.endswith('.txt'):
documents.extend(self.load_text(file_path, metadata))
return documents
def load_pdf(self, file_path: str, metadata: dict) -> List[Document]:
with fitz.open(file_path) as pdf_document:
full_text = ''.join([pdf_document.load_page(page_number).get_text() for page_number in range(len(pdf_document))])
embedding = embed_model
text_splitter = SemanticChunker(embedding, breakpoint_threshold_type="percentile")
chunks = text_splitter.create_documents([full_text])
return [
Document(
page_content=chunk.page_content,
metadata=OrderedDict(metadata, page_number=page_number + 1, source=file_path)
) for page_number, chunk in enumerate(chunks)
]
def load_text(self, file_path: str, metadata: dict) -> List[Document]:
with open(file_path, 'r', encoding='utf-8') as f:
text_content = f.read()
embedding = embed_model
text_splitter = SemanticChunker(embedding, breakpoint_threshold_type="percentile")
chunks = text_splitter.create_documents([text_content])
return [
Document(
page_content=chunk.page_content,
metadata=OrderedDict(metadata, page_number=page_number + 1, source=file_path)
) for page_number, chunk in enumerate(chunks)
]
class DocPOI(BaseLoader):
"""A custom document loader that reads and processes PDF or TXT files."""
def __init__(self, file_path: str, metadata_path: str = None) -> None:
"""
Initialize the loader with a file path and an optional metadata path.
Args:
file_path: Path to the PDF or TXT file.
metadata_path: Path to the metadata file (optional, defaults to None).
"""
self.file_path = file_path
# Set metadata path based on file path if not provided
if not metadata_path:
assumed_metadata_path = splitext(file_path)[0] + '.json'
if exists(assumed_metadata_path):
metadata_path = assumed_metadata_path
else:
print("No metadata file found, proceeding without external metadata.")
self.metadata_path = metadata_path
def load(self) -> list:
"""
Load and process the file, returning a list of Document objects.
"""
# Load metadata from a JSON file if provided
if self.metadata_path and exists(self.metadata_path):
with open(self.metadata_path, 'r') as f:
metadata = json.load(f)
else:
metadata = {'source': self.file_path, 'processed_date': datetime.now().isoformat()}
# Ensure document_id is included in metadata
if 'document_id' not in metadata:
metadata['document_id'] = os.path.splitext(os.path.basename(self.file_path))[0]
ordered_metadata = OrderedDict(metadata)
# Set up the text chunker
embedding = embed_model
text_splitter = SemanticChunker(embedding, breakpoint_threshold_type="percentile")
# Read and process the file
if self.file_path.endswith('.pdf'):
with fitz.open(self.file_path) as pdf:
full_text = ''.join([page.get_text() for page in pdf])
elif self.file_path.endswith('.txt'):
with open(self.file_path, 'r', encoding='utf-8') as file:
full_text = file.read()
else:
raise ValueError("Unsupported file type. Please provide a PDF or TXT file.")
# Use the SemanticChunker to split the text
documents = text_splitter.create_documents([full_text])
# Generate Document objects
return [
Document(
page_content=chunk.page_content,
metadata=OrderedDict(ordered_metadata, page_number=page_number + 1)
) for page_number, chunk in enumerate(documents)
]
class TTSStreamer:
def __init__(self, model_path, config_path, vocab_path, speaker_wav="thunder"):
self.model_path = model_path
self.config_path = config_path
self.vocab_path = vocab_path
self.speaker_wav = f"audio_samples\\{speaker_wav}.wav"
self.model = self.load_model()
self.stop_flag = threading.Event() # To control stopping
self.playback_thread = None
self.text_chunks = [] # Store text chunks
def load_model(self):
config = XttsConfig()
config.load_json(self.config_path)
model = Xtts.init_from_config(config)
model.load_checkpoint(config, checkpoint_dir=self.model_path, eval=True, vocab_path=self.vocab_path)
model.cuda()
return model
def unload_model(self):
del self.model
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
print("Model unloaded and GPU memory cleared successfully.")
def estimate_times(self, text_chunk, avg_gen_time_per_char, avg_audio_time_per_char):
gen_time = len(text_chunk) * avg_gen_time_per_char
audio_duration = len(text_chunk) * avg_audio_time_per_char
return gen_time, audio_duration
def split_text_into_sentences(self, text):
# Split text into sentences using regular expressions
sentences = re.split(r'(?<=[.!?]) +', text.strip())
final_sentences = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 <= 200: # +1 for the space or punctuation
if current_chunk:
current_chunk += " " + sentence
else:
current_chunk = sentence
else:
if current_chunk:
final_sentences.append(current_chunk)
current_chunk = sentence
if current_chunk:
final_sentences.append(current_chunk)
return final_sentences
def generate_audio_chunk(self, chunk, chunk_index, audio_buffer, playback_event, avg_gen_time_per_char, avg_audio_time_per_char, total_gen_time, language, speed):
if self.stop_flag.is_set():
return
est_gen_time, est_audio_duration = self.estimate_times(chunk, avg_gen_time_per_char, avg_audio_time_per_char)
print(f"Chunk {chunk_index + 1} estimated generation time: {est_gen_time:.2f} seconds, estimated audio duration: {est_audio_duration:.2f} seconds")
print(f"Generating audio for chunk {chunk_index + 1}...")
start_gen_time = time.time()
outputs = self.model.synthesize(
text=chunk,
config=self.model.config,
speaker_wav=self.speaker_wav,
gpt_cond_len=10,
language=language,
speed=speed
)
end_gen_time = time.time()
generation_time = end_gen_time - start_gen_time
total_gen_time[0] += generation_time
print(f"Chunk {chunk_index + 1} generated in {generation_time:.2f} seconds (estimated: {est_gen_time:.2f} seconds)")
wav_data = outputs['wav']
temp_output_file = f'temp_output_{chunk_index}.wav'
sf.write(temp_output_file, wav_data, 22050)
line_audio = AudioSegment.from_wav(temp_output_file)
actual_audio_duration = len(line_audio) / 1000.0
print(f"Chunk {chunk_index + 1} actual audio duration: {actual_audio_duration:.2f} seconds (estimated: {est_audio_duration:.2f} seconds)")
audio_buffer[chunk_index] = line_audio
print(f"Chunk {chunk_index + 1} audio saved and buffered")
playback_event.set()
def stream_audio_with_buffering(self, text, language="en", speed=1.2, speaker=None, fireup_delay=1.0, avg_gen_time_per_char=0.08058659382140704, avg_audio_time_per_char=0.1064346054068992):
self.stop_flag.clear() # Clear the stop flag at the start
if speaker:
self.speaker_wav = f"audio_samples\\{speaker}.wav"
print("Starting the audio streaming process...")
start_time = time.time()
self.text_chunks = self.split_text_into_sentences(text) # Store text chunks
audio_buffer = [None] * len(self.text_chunks)
playback_events = [threading.Event() for _ in self.text_chunks]
total_gen_time = [0]
def start_playback_after_delay():
print(f"Waiting {fireup_delay:.2f} seconds before starting playback...")
time.sleep(fireup_delay)
print("Fireup delay is over, starting playback...")
for chunk_index in range(len(self.text_chunks)):
if self.stop_flag.is_set():
break
playback_events[chunk_index].wait()
if audio_buffer[chunk_index] is not None:
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_file:
temp_output_file = temp_file.name
self.play_audio_segment(audio_buffer[chunk_index], temp_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
self.playback_thread = threading.Thread(target=start_playback_after_delay)
self.playback_thread.start()
for chunk_index, chunk in enumerate(self.text_chunks):
if self.stop_flag.is_set():
break
print(f"Processing chunk {chunk_index + 1}/{len(self.text_chunks)}: '{chunk}'")
self.generate_audio_chunk(chunk, chunk_index, audio_buffer, playback_events[chunk_index], avg_gen_time_per_char, avg_audio_time_per_char, total_gen_time, language, speed)
self.playback_thread.join()
print("Audio streaming process completed.")
print(f"Total generation time: {total_gen_time[0]:.2f} seconds")
def stop_streaming(self):
"""Stops the audio streaming process."""
self.stop_flag.set()
if self.playback_thread and self.playback_thread.is_alive():
self.playback_thread.join()
# Remove all temporary files
for chunk_index in range(len(self.text_chunks)):
temp_output_file = f'temp_output_{chunk_index}.wav'
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
def play_audio_segment(self, audio_segment, temp_output_file):
audio_segment.export(temp_output_file, format="wav")
play(AudioSegment.from_wav(temp_output_file))
class DocumentAssistant:
def __init__(self, model_name, temperature=0.9):
self.llm = ChatOllama(
model=model_name,
temperature=temperature,
)
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
# Initialize the default agent prompt
self.agent_prompt = ChatPromptTemplate.from_messages(
[
SystemMessage(content=system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
HumanMessagePromptTemplate.from_template("{input}")
]
)
# Initialize the LLM chain with the agent prompt
self.chain = LLMChain(
llm=self.llm,
prompt=self.agent_prompt,
verbose=True,
memory=self.memory
)
def reset_memory(self):
"""Resets the memory of the assistant."""
self.memory.clear()
def create_filter_criteria(self, filter_criteria):
if filter_criteria:
filter_key = filter_criteria.get("key")
filter_value = filter_criteria.get("value")
if filter_key and filter_value:
return {"term": {f"metadata.{filter_key}.keyword": filter_value}}
return None
def document_retriever(self, user_query, top_k, score_threshold, filter_criteria):
"""
Retrieves relevant documents based on the user's query and returns them with their scores and metadata.
"""
# Create filter criteria
filter_query = self.create_filter_criteria(filter_criteria)
# Perform the similarity search with scores
results = vectorstore.similarity_search_with_score(
query=user_query,
k=top_k,
filter=filter_query
)
# Filter documents based on the score threshold
filtered_documents = [
(doc, score) for doc, score in results if score >= score_threshold
]
# Extract document names, content, metadata, and score for returning
retrieved_documents = []
for doc, score in filtered_documents:
document_info = {
"document_name": doc.metadata.get('given_document_name', 'Unnamed'),
"document_content": doc.page_content,
"metadata": doc.metadata, # Include all metadata
"score": score
}
print(f"Retrieved document with score {score}: {document_info['document_name']}")
retrieved_documents.append(document_info)
return retrieved_documents
def formulate_final_prompt(self, user_query, context):
"""
Formulates the final input for the LLM considering the retrieved documents.
"""
combined_input = f"Here is the context from retrieved documents. Please use this information to answer the user's question.\n\nContext:\n{context}\n\nQuestion: {user_query}"
return combined_input
def query_llm(self, user_query, top_k, score_threshold, filter_criteria):
filter_criteria = self.create_filter_criteria(filter_criteria)
# First, use the LLM chain to determine whether document retrieval is necessary
response = self.chain.invoke({"input": user_query})
print(f"Initial response: {response['text']}")
if "retriever_tool:" in response['text'].lower():
retrieval_instruction = response['text'].split("retriever_tool:")[1].strip()
combined_query = f"{user_query} {retrieval_instruction}"
retrieved_documents = self.document_retriever(combined_query, top_k, score_threshold, filter_criteria)
context = "\n\n".join(
[f"Document Name: {doc['document_name']}\nMetadata: {doc['metadata']}\nContent:\n{doc['document_content']}"
for doc in retrieved_documents]
)
combined_input = self.formulate_final_prompt(user_query, context)
# Use the LLM chain again to answer the user's query based on the retrieved documents
final_response = self.chain.invoke({"input": combined_input})
print(f"Final response: {final_response['text']}")
parsed_response = final_response['text']
else:
parsed_response = response['text']
retrieved_documents = []
return parsed_response, retrieved_documents
def convert_image_to_pdf(image_path):
"""Convert an image file to a PDF and return the new PDF path."""
try:
image = Image.open(image_path)
pdf_path = image_path.lower().replace('.png', '.pdf').replace('.jpg', '.pdf').replace('.jpeg', '.pdf')
rgb_image = image.convert('RGB')
rgb_image.save(pdf_path, 'PDF', resolution=100.0)
os.remove(image_path)
print(f"Converted {image_path} to {pdf_path}")
return pdf_path
except Exception as e:
logging.error(f"Error converting image to PDF: {e}")
return None
def adaptive_image_processing(image):
# Convert to grayscale
gray = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2GRAY)
# Apply adaptive histogram equalization
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
clahe_image = clahe.apply(gray)
# Apply a slight Gaussian blur to reduce noise
blurred_image = cv2.GaussianBlur(clahe_image, (5, 5), 0)
# Enhance contrast
pil_image = Image.fromarray(blurred_image)
enhancer = ImageEnhance.Contrast(pil_image)
enhanced_image = enhancer.enhance(1.5)
return enhanced_image
def ocr_pdf(input_pdf_path):
try:
images = convert_from_path(input_pdf_path, poppler_path=POPPLER_PATH)
pdf_writer = PdfWriter()
# OCR the first page and detect its language
first_page_text = pytesseract.image_to_string(images[0])
try:
detected_lang = detect(first_page_text)
detected_lang_iso3 = pycountry.languages.get(alpha_2=detected_lang).alpha_3
print(f"Detected language: {detected_lang_iso3}")
except LangDetectException:
logging.warning("Language detection failed, defaulting to English.")
detected_lang_iso3 = 'eng' # Default to English if language detection fails
# OCR the entire PDF with the detected language
for image in images:
processed_image = adaptive_image_processing(image)
if processed_image is None:
print("Error: processed_image is None")
continue
pdf_bytes = pytesseract.image_to_pdf_or_hocr(processed_image, extension='pdf', lang=detected_lang_iso3, config=tessdata_dir_config)
if pdf_bytes is None:
print("Error: pdf_bytes is None")
continue
pdf_stream = io.BytesIO(pdf_bytes)
pdf = PdfReader(pdf_stream)
pdf_writer.add_page(pdf.pages[0])
output_pdf_path = input_pdf_path # Keep the file path consistent
with open(output_pdf_path, "wb") as f_out:
pdf_writer.write(f_out)
print(f"OCR processed and replaced {output_pdf_path}")
return output_pdf_path
except Exception as e:
logging.error(f"Error during OCR: {e}")
return None
def check_pdf_has_readable_text(file_path: str) -> bool:
"""Check if the PDF contains any readable text."""
try:
with open(file_path, "rb") as file:
reader = PdfReader(file)
for page in reader.pages:
text = page.extract_text()
if text:
return True
except Exception as e:
logging.error(f"Error reading {file_path}: {e}")
return False
def check_pdf_metadata_keys(file_path: str, required_keys: list) -> bool:
"""Check if the PDF metadata contains all the required keys."""
try:
with open(file_path, "rb") as file:
reader = PdfReader(file)
metadata = reader.metadata
return all(key in metadata for key in required_keys)
except Exception as e:
logging.error(f"Error reading metadata from {file_path}: {e}")
return False
import os
def check_for_metadata_json(file_path: str) -> bool:
"""Check if the corresponding JSON metadata file exists."""
json_file_path = f"{os.path.splitext(file_path)[0]}.json"
return os.path.exists(json_file_path)
def process_pdf_file(file_path: str) -> str:
"""Process a single PDF file and perform all checks."""
print(f"Processing {file_path}...")
document_name = None # Initialize document_name
has_text = check_pdf_has_readable_text(file_path)
metadata_keys = ["/document_id", "/original_file_name", "/given_document_name"]
# Perform OCR if no readable text is found
if not has_text:
print(f"The PDF {file_path} does not contain readable text. Performing OCR...")
file_path = ocr_pdf(file_path) # Update file path if the file was replaced
# Generate metadata if necessary
if not check_for_metadata_json(file_path):
print(f"The corresponding metadata JSON file does not exist for {file_path}. Performing metadata extraction...")
document_name, metadata = generate_metadata_and_name(file_path)
# Check for required metadata keys
if check_pdf_metadata_keys(file_path, metadata_keys):
print(f"The PDF {file_path} contains the required metadata keys.")
else:
print(f"The PDF {file_path} is missing some required metadata keys.")
return document_name
def process_txt_file(file_path: str) -> str:
"""Process a TXT file by generating metadata if necessary."""
print(f"Processing TXT file {file_path}...")
document_name = None # Initialize document_name
if check_for_metadata_json(file_path):
print(f"The corresponding metadata JSON file exists for {file_path}.")
else:
print(f"The corresponding metadata JSON file does not exist for {file_path}. Generating metadata...")
document_name, metadata = generate_metadata_and_name(file_path)
return document_name
def process_image_file(file_path: str) -> str:
"""Convert an image file to a PDF, perform OCR, and generate metadata."""
print(f"Processing image file {file_path}...")
document_name = None # Initialize document_name
pdf_path = convert_image_to_pdf(file_path)
if pdf_path:
print(f"Converted image to PDF: {pdf_path}. Performing OCR and generating metadata...")
pdf_path = ocr_pdf(pdf_path) # Capture the full text from OCR
document_name, metadata = generate_metadata_and_name(file_path)
return document_name
def process_files_in_directory(directory_path: str, only_pdf: bool = False) -> None:
def process():
print(f"Processing files in directory {directory_path}...")
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
if file.endswith(".pdf"):
process_pdf_file(file_path)
elif not only_pdf:
if file.endswith(".txt"):
process_txt_file(file_path)
elif file.lower().endswith(('.png', '.jpg', '.jpeg')):
process_image_file(file_path)
try:
process()
except Exception as e:
print(f"Error encountered: {e}. Retrying...")
try:
process()
except Exception as e:
print(f"Retry failed: {e}")
def update_pdfmetadata(file_path: str, new_metadata: dict) -> None:
"""Updates the metadata of the given PDF file with new keys.
Args:
file_path: The path to the PDF file.
new_metadata: A dictionary of new metadata to add.
"""
# Open the existing PDF
with open(file_path, "rb") as file:
reader = PdfReader(file)
writer = PdfWriter()
writer.append_pages_from_reader(reader)
# Get existing metadata
existing_metadata = reader.metadata
# Update existing metadata with new keys
updated_metadata = {NameObject(key): TextStringObject(value) for key, value in existing_metadata.items()}
for key, value in new_metadata.items():
updated_metadata[NameObject(key)] = TextStringObject(value)
# Add updated metadata
writer.add_metadata(updated_metadata)
# Save the PDF with the updated metadata back to the same file
with open(file_path, "wb") as updated_file:
writer.write(updated_file)
def generate_metadata_and_name(file_path):
# Load the document content
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == ".pdf":
loader = PyPDFLoader(file_path, extract_images=False)
elif file_extension == ".txt":
loader = TextLoader(file_path)
else:
raise ValueError("Unsupported file type")
docs = loader.load()
metadata_prompt = ChatPromptTemplate.from_template(metadata_template)
# Create the chain for metadata extraction
metadata_chain = metadata_prompt | metadata_llm
# Invoke the chain with the document content
metadata_result = metadata_chain.invoke({
"context": docs
})
# Extract the content from the result
metadata_content = metadata_result.content
# Parse the JSON part of the content
json_start = metadata_content.find('{')
json_end = metadata_content.rfind('}') + 1
json_content = metadata_content[json_start:json_end]
metadata = json.loads(json_content)
# Ensure all keys have values that are basic lists or primitive types
def collapse_dicts(value):
if isinstance(value, list):
return [item['name'] if isinstance(item, dict) and 'name' in item else item for item in value]
return value
metadata = {key: collapse_dicts(value) for key, value in metadata.items()}
# Format the metadata in a readable format
formatted_metadata = json.dumps(metadata, indent=4, ensure_ascii=False)
naming_prompt = ChatPromptTemplate.from_template(naming_template)
# Create the chain for document naming
naming_chain = naming_prompt | naming_llm
# Invoke the chain with the context and metadata
naming_result = naming_chain.invoke({
"question": "What is the most suitable name for this document based on its content?",
"context": docs,
"metadata": formatted_metadata
})
# Extract the document name from the result
naming_content = naming_result.content
temp_document_name = naming_content.split('\n')[0].strip()
# Replace spaces in document name with underscores
document_name = temp_document_name.replace(" ", "_")
# Determine the file extension
# Load the metadata JSON
metadata = json.loads(formatted_metadata)
# Ensure given_document_name is the first key
ordered_metadata = OrderedDict([("given_document_name", document_name)])
ordered_metadata.update(metadata)
# Generate a unique ID for the document
document_id = str(uuid.uuid4())
# Get file details
file_directory = os.path.dirname(file_path)
original_file_name = os.path.basename(file_path)
# Get file creation and modification dates
file_creation_date = datetime.fromtimestamp(os.path.getctime(file_path)).isoformat()
file_modification_date = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
metadata_creation_date = datetime.now().isoformat()
# Append additional metadata
ordered_metadata['document_id'] = document_id
ordered_metadata['file_directory'] = file_directory
ordered_metadata['original_file_name'] = original_file_name
ordered_metadata['file_creation_date'] = file_creation_date
ordered_metadata['file_modification_date'] = file_modification_date
ordered_metadata['metadata_creation_date'] = metadata_creation_date
# Format the metadata in a readable format
formatted_metadata = json.dumps(ordered_metadata, indent=4, ensure_ascii=False)
# Save the metadata to a JSON file with the same name as the document
metadata_file_path = os.path.join(file_directory, f"{document_name}.json")
with open(metadata_file_path, 'w', encoding='utf-8') as f:
f.write(formatted_metadata)
if file_extension == ".pdf":
# Update PDF metadata with relevant keys (only for PDFs)
pdf_metadata = {
"/document_id": document_id,
"/original_file_name": original_file_name,
"/given_document_name": document_name
}
update_pdfmetadata(file_path, pdf_metadata)
# Rename the original PDF file to the new document name
new_file_path = os.path.join(file_directory, f"{document_name}.pdf")
elif file_extension == ".txt":
# Rename the original TXT file to the new document name
new_file_path = os.path.join(file_directory, f"{document_name}.txt")
os.rename(file_path, new_file_path)
return document_name, formatted_metadata
# Function to save parameters to a file
def save_params(params):
with open(PARAM_FILE, "w") as f:
json.dump(params, f)
def load_params():
# Define default parameters
default_params = {
"collection_name": "docpoi",
"score_threshold": 0.7,
"top_k": 5,
"voiceover_speed": 1.3,
"fireup_speed": 5.0,
"language": "en",
"speaker": "thunder",
"use_voiceover": False, # Default to unchecked
"filter_key": "",
"filter_value": ""
}
params_file = PARAM_FILE
# Attempt to load parameters from the file
try:
with open(params_file, "r") as file:
params = json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
params = {}
# Update missing parameters with default values
for key, value in default_params.items():
if key not in params:
params[key] = value
return params
def initialize_vectorstore(collection_name="docpoi"):
embedding = embed_model
vectorstore = ElasticsearchStore(
es_url="http://localhost:9200", index_name=collection_name, embedding=embedding, strategy=ElasticsearchStore.ExactRetrievalStrategy()
)
namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
namespace, db_url="sqlite:///record_manager_cache.sql"
)
record_manager.create_schema()
return vectorstore, record_manager
# Function to reload the vectorstore
def reload_vectorstore():
global vectorstore, record_manager
vectorstore, record_manager = initialize_vectorstore(params["collection_name"])
return "Vectorstore reloaded with collection: " + params["collection_name"]
def add_to_vectorstore():
loader = DocPOIDirectoryLoader(directory_path=DIRECTORY_PATH)
documents = loader.load()
index(
documents,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="document_id",
)
def reset_vectorstore():