From 6f53553a5229fb8cf7c24eb5d7fc98c0105e5a80 Mon Sep 17 00:00:00 2001 From: Luciano Silveira Date: Sat, 24 Aug 2024 16:14:22 -0300 Subject: [PATCH] Fix/Confluence upload using split_strategy (#22) * better metadata processing using split_strategy option * downgrade nltk to 3.8.1 --- amazon_s3/s3reader.py | 2 +- atlassian_confluence/confluence_config.md | 4 +- poetry.lock | 10 +-- pyproject.toml | 1 + saia_ingest/ingestor.py | 74 ++++++++++++++++------- saia_ingest/profile_utils.py | 4 +- 6 files changed, 65 insertions(+), 30 deletions(-) diff --git a/amazon_s3/s3reader.py b/amazon_s3/s3reader.py index 8f632ae..3bbd950 100644 --- a/amazon_s3/s3reader.py +++ b/amazon_s3/s3reader.py @@ -396,7 +396,7 @@ def get_files(self) -> list[str]: self.download_s3_file(self.key, temp_dir, file_paths) count = 1 elif self.keys: - logging.getLogger().info(f"keys: '{self.keys.length}'") + logging.getLogger().info(f"keys: '{len(self.keys)}'") for key in self.keys: self.download_s3_file(key, temp_dir, file_paths) count = len(self.keys) diff --git a/atlassian_confluence/confluence_config.md b/atlassian_confluence/confluence_config.md index fa58e99..775fd83 100644 --- a/atlassian_confluence/confluence_config.md +++ b/atlassian_confluence/confluence_config.md @@ -13,7 +13,9 @@ confluence: include_attachments: !!bool true|false (default) include_children: !!bool true|false (default) cloud: !!bool true|false (default) - namespace: !!str 'namespace name' # Must match the associated RAG assistant, check the index section + download_dir: !!str path to a folder where metadata is stored (mandatory for delta ingestion) + split_strategy: !!str None | id (create a id.json for each page) + namespace: !!str 'namespace name' # Must match the associated RAG assistant, check the index section (deprecated) saia: base_url: !!str 'string' # GeneXus Enterprise AI Base URL api_token: !!str 'string' diff --git a/poetry.lock b/poetry.lock index 0a80c3b..6aa8ffa 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2494,13 +2494,13 @@ test = ["pytest (>=7.2)", "pytest-cov (>=4.0)"] [[package]] name = "nltk" -version = "3.9" +version = "3.8.1" description = "Natural Language Toolkit" optional = false -python-versions = ">=3.8" +python-versions = ">=3.7" files = [ - {file = "nltk-3.9-py3-none-any.whl", hash = "sha256:d17863e861bb33ac617893329d71d06a3dfb7e3eb9ee0b8105281c53944a45a1"}, - {file = "nltk-3.9.tar.gz", hash = "sha256:e98acac454407fa38b76cccb29208d377731cf7fab68f323754a3681f104531f"}, + {file = "nltk-3.8.1-py3-none-any.whl", hash = "sha256:fd5c9109f976fa86bcadba8f91e47f5e9293bd034474752e92a520f81c93dda5"}, + {file = "nltk-3.8.1.zip", hash = "sha256:1834da3d0682cba4f2cede2f9aad6b0fafb6461ba451db0efb6f9c39798d64d3"}, ] [package.dependencies] @@ -4618,4 +4618,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.9,<3.12" -content-hash = "613d7d338a08f532d6a308d0e54374e5f9308d9f7a1025df8e8c5928ab8d8019" +content-hash = "97b12ef8d0a1c44eaf6c1bbcbded2907dfe33f5557191cc896125977109ca320" diff --git a/pyproject.toml b/pyproject.toml index 4087ad8..b2fa2d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ pydrive = "^1.3.1" docx2txt = "^0.8" azure-identity = "^1.17.1" azure-keyvault-secrets = "^4.8.0" +nltk = "3.8.1" [tool.poetry.group.dev.dependencies] pylint = "^3.1.0" diff --git a/saia_ingest/ingestor.py b/saia_ingest/ingestor.py index 2b6d8cf..1d4871e 100644 --- a/saia_ingest/ingestor.py +++ b/saia_ingest/ingestor.py @@ -58,9 +58,9 @@ def check_valid_profile(rag_api, profile_name): logging.getLogger().error(f"Invalid profile {profile_name}") return ret -def save_to_file(lc_documents, prefix='module'): +def save_to_file(lc_documents, prefix='module', path=None, name=None): try: - debug_folder = os.path.join(os.getcwd(), 'debug') + debug_folder = os.path.join(os.getcwd(), 'debug') if path is None else path create_folder(debug_folder) serialized_docs = [] @@ -73,7 +73,7 @@ def save_to_file(lc_documents, prefix='module'): now = datetime.now() formatted_timestamp = now.strftime("%Y%m%d%H%M%S") # Format the datetime object as YYYYMMDDHHMMSS - filename = '%s_%s.json' % (prefix, formatted_timestamp) + filename = '%s_%s.json' % (prefix, formatted_timestamp) if name is None else name file_path = os.path.join(debug_folder, filename) with open(file_path, 'w', encoding='utf8') as json_file: json.dump(serialized_docs, json_file, ensure_ascii=False, indent=4) @@ -178,7 +178,7 @@ def ingest_jira( ret = False finally: return ret - + def ingest_confluence( configuration: str, timestamp: datetime = None, @@ -187,6 +187,8 @@ def ingest_confluence( ret = True start_time = time.time() try: + message_response = "" + config = get_yaml_config(configuration) confluence_level = config.get('confluence', {}) user_name = confluence_level.get('email', None) @@ -198,15 +200,17 @@ def ingest_confluence( include_children = confluence_level.get('include_children', None) cloud = confluence_level.get('cloud', None) confluence_namespace = confluence_level.get('namespace', None) + download_dir = confluence_level.get('download_dir', None) + split_strategy = confluence_level.get('split_strategy', None) embeddings_level = config.get('embeddings', {}) - openapi_key = embeddings_level.get('openapi_key', None) + openapi_key = embeddings_level.get('openapi_key', '') chunk_size = embeddings_level.get('chunk_size', None) chunk_overlap = embeddings_level.get('chunk_overlap', None) embeddings_model = embeddings_level.get('model', 'text-embedding-ada-002') vectorstore_level = config.get('vectorstore', {}) - vectorstore_api_key = vectorstore_level.get('api_key', None) + vectorstore_api_key = vectorstore_level.get('api_key', '') os.environ['OPENAI_API_KEY'] = openapi_key os.environ['CONFLUENCE_USERNAME'] = user_name @@ -218,12 +222,12 @@ def ingest_confluence( documents = [] if page_ids is not None: - try: - list_documents = load_documents(loader, page_ids=page_ids, include_attachments=include_attachments, include_children=include_children) - for item in list_documents: - documents.append(item) - except Exception as e: - logging.getLogger().error(f"Error processing {page_ids}: {e}") + try: + list_documents = load_documents(loader, page_ids=page_ids, include_attachments=include_attachments, include_children=include_children, timestamp=timestamp) + for item in list_documents: + documents.append(item) + except Exception as e: + logging.getLogger().error(f"Error processing {page_ids}: {e}") elif space_keys is not None: for key in space_keys: try: @@ -235,9 +239,21 @@ def ingest_confluence( logging.getLogger().error(f"Error processing {key}: {e}") continue - lc_documents = split_documents(documents, chunk_size=chunk_size, chunk_overlap=chunk_overlap) - - docs_file = save_to_file(lc_documents, prefix='confluence') + if split_strategy is not None: + if split_strategy == 'id': + ids = [] + if not os.path.exists(download_dir): + raise Exception(f"Download directory {download_dir} does not exist") + for doc in documents: + lc_documents = split_documents([doc], chunk_size=chunk_size, chunk_overlap=chunk_overlap) + metadata = doc.metadata + doc_id = metadata.get("id", "") + name = f"{doc_id}.json.custom" + docs_file = save_to_file(lc_documents, prefix='confluence', path=download_dir, name=name) + ids.append(docs_file) + else: + lc_documents = split_documents(documents, chunk_size=chunk_size, chunk_overlap=chunk_overlap) + docs_file = save_to_file(lc_documents, prefix='confluence') # Saia saia_level = config.get('saia', {}) @@ -245,22 +261,36 @@ def ingest_confluence( saia_api_token = saia_level.get('api_token', None) saia_profile = saia_level.get('profile', None) upload_operation_log = saia_level.get('upload_operation_log', False) + max_parallel_executions = saia_level.get('max_parallel_executions', 5) if saia_base_url is not None: ragApi = RagApi(saia_base_url, saia_api_token, saia_profile) - target_file = f"{docs_file}.custom" - shutil.copyfile(docs_file, target_file) + if split_strategy is None: + doc_count = 1 + target_file = f"{docs_file}.custom" + shutil.copyfile(docs_file, target_file) - response_body = ragApi.upload_document_with_metadata_file(target_file) # ToDo check .metadata - if response_body is None: - logging.getLogger().error("Error uploading document") - return False + _ = ragApi.upload_document_with_metadata_file(target_file) + else: + saia_file_ids_to_delete = search_failed_to_delete(ids) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_executions) as executor: + futures = [executor.submit(ragApi.delete_profile_document, id, saia_profile) for id in saia_file_ids_to_delete] + concurrent.futures.wait(futures) + + doc_count = 0 + for doc_id_path in ids: + ret = saia_file_upload(saia_base_url, saia_api_token, saia_profile, doc_id_path, False) + if not ret: + message_response += f"Error uploading document {doc_id_path} {ret}\n" + else: + doc_count += 1 + message_response += f"{doc_id_path}\n" if upload_operation_log: end_time = time.time() - message_response = f"bulk ingest ({end_time - start_time:.2f}s)" + message_response += f"bulk ingest {doc_count} items ({end_time - start_time:.2f}s)" ret = operation_log_upload(saia_base_url, saia_api_token, saia_profile, "ALL", message_response, 0) else: diff --git a/saia_ingest/profile_utils.py b/saia_ingest/profile_utils.py index e9e0a19..88311ee 100644 --- a/saia_ingest/profile_utils.py +++ b/saia_ingest/profile_utils.py @@ -79,7 +79,9 @@ def file_upload( with open(file_path + '.saia.metadata', 'w') as file: file.write(json.dumps(response_body, indent=2)) end_time = time.time() - message_response = f"{response_body['indexStatus']}, {file_name},{response_body['name']},{response_body['id']},{end_time - start_time:.2f} seconds" + metadata_elements = response_body.get('metadata', []) + metadata_count_items = f",{len(metadata_elements)}" if len(metadata_elements) > 0 else "" + message_response = f"{response_body['indexStatus']}, {file_name},{response_body['name']},{response_body['id']}{metadata_count_items},{end_time - start_time:.2f} seconds" logging.getLogger().info(message_response) except Exception as e: if e.response['Error']['Code'] == '401':