From cbe263df924d8ccd00c58ac1be4ec95208615272 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Wed, 23 Oct 2024 15:31:50 +0200 Subject: [PATCH] Fix / update LLM Complete Guide (side quest) (#134) * remove extra step invocation * update requirements * fixes and updates * update chunking pipeline logic * handle JSON changes * formatting * add a typos.toml * update typos.toml * move typos.toml to repo root --- .typos.toml | 53 ++++++++-------- .../pipelines/generate_chunk_questions.py | 4 +- llm-complete-guide/requirements.txt | 2 +- llm-complete-guide/run.py | 12 ++++ llm-complete-guide/steps/eval_retrieval.py | 2 +- .../steps/finetune_embeddings.py | 18 +++++- llm-complete-guide/steps/hf_dataset_loader.py | 3 - llm-complete-guide/steps/populate_index.py | 61 ++++++++++++------- llm-complete-guide/steps/synthetic_data.py | 44 ++++++++----- llm-complete-guide/steps/url_scraper.py | 13 ++-- .../steps/url_scraping_utils.py | 32 +++++++++- llm-complete-guide/steps/web_url_loader.py | 16 ++--- 12 files changed, 171 insertions(+), 89 deletions(-) diff --git a/.typos.toml b/.typos.toml index 9398d36d..6a44caa4 100644 --- a/.typos.toml +++ b/.typos.toml @@ -1,37 +1,36 @@ [files] extend-exclude = [ - "*.csv", - "sign-language-detection-yolov5/*", - "orbit-user-analysis/steps/report.py", - "customer-satisfaction/pipelines/deployment_pipeline.py", - "customer-satisfaction/streamlit_app.py", - "nba-pipeline/Building and Using An MLOPs Stack With ZenML.ipynb", - "customer-satisfaction/tests/data_test.py", - "end-to-end-computer-vision/**/*.ipynb", - "classifier-e2e/run_skip_basics.ipynb", - "classifier-e2e/run_full.ipynb", - "classifier-e2e/run_skip_basics.ipynb", - "classifier-e2e/run_full.ipynb", - "classifier-e2e/run_skip_basics.ipynb" + "*.json", + "*.js", + "*.ipynb", ] [default.extend-identifiers] -# HashiCorp = "HashiCorp" -connexion = "connexion" -preprocesser = "preprocesser" -Preprocesser = "Preprocesser" +HashiCorp = "HashiCorp" +NDArray = "NDArray" +K_Scatch = "K_Scatch" +MCAGA1UECgwZQW1hem9uIFdlYiBTZXJ2aWNlcywgSW5jLjETMBEGA1UECwwKQW1h = "MCAGA1UECgwZQW1hem9uIFdlYiBTZXJ2aWNlcywgSW5jLjETMBEGA1UECwwKQW1h" +VQQGEwJVUzEQMA4GA1UEBwwHU2VhdHRsZTETMBEGA1UECAwKV2FzaGluZ3RvbjEi = "VQQGEwJVUzEQMA4GA1UEBwwHU2VhdHRsZTETMBEGA1UECAwKV2FzaGluZ3RvbjEi" +MDEyOk9yZ2FuaXphdGlvbjg4Njc2OTU1 = "MDEyOk9yZ2FuaXphdGlvbjg4Njc2OTU1" [default.extend-words] -# aks = "aks" -GOES = "GOES" -lenght = "lenght" -preprocesser = "preprocesser" -Preprocesser = "Preprocesser" -Implicitly = "Implicitly" -fo = "fo" -mapp = "mapp" -polution = "polution" -magent = "magent" +# Don't correct the surname "Teh" +aks = "aks" +hashi = "hashi" +womens = "womens" +prepend = "prepend" +prepended = "prepended" +goes = "goes" +bare = "bare" +prepending = "prepending" +prev = "prev" +creat = "creat" +ret = "ret" +daa = "daa" +arange = "arange" +cachable = "cachable" +OT = "OT" +cll = "cll" [default] locale = "en-us" diff --git a/llm-complete-guide/pipelines/generate_chunk_questions.py b/llm-complete-guide/pipelines/generate_chunk_questions.py index bc331ef8..432aed97 100644 --- a/llm-complete-guide/pipelines/generate_chunk_questions.py +++ b/llm-complete-guide/pipelines/generate_chunk_questions.py @@ -19,10 +19,10 @@ from zenml.client import Client -@pipeline +@pipeline(enable_cache=False) def generate_chunk_questions(): """Pipeline to generate questions from chunks.""" - local_setting = ExternalArtifact(value=True) + local_setting = ExternalArtifact(value=False) client = Client() docs_with_embeddings = client.get_artifact_version( name_id_or_prefix="documents_with_embeddings" diff --git a/llm-complete-guide/requirements.txt b/llm-complete-guide/requirements.txt index 3ed7f976..b4ba919f 100644 --- a/llm-complete-guide/requirements.txt +++ b/llm-complete-guide/requirements.txt @@ -17,7 +17,7 @@ tiktoken umap-learn matplotlib pyarrow -rerankers[all] +rerankers[flashrank] datasets # optional requirements for S3 artifact store diff --git a/llm-complete-guide/run.py b/llm-complete-guide/run.py index 02d923b1..f457de64 100644 --- a/llm-complete-guide/run.py +++ b/llm-complete-guide/run.py @@ -42,6 +42,7 @@ from materializers.document_materializer import DocumentMaterializer from pipelines import ( finetune_embeddings, + generate_chunk_questions, generate_synthetic_data, llm_basic_rag, llm_eval, @@ -145,6 +146,13 @@ default=False, help="Whether to use the reranker.", ) +@click.option( + "--chunks", + "chunks", + is_flag=True, + default=False, + help="Generate chunks for Hugging Face dataset", +) def main( rag: bool = False, evaluation: bool = False, @@ -157,6 +165,7 @@ def main( dummyembeddings: bool = False, argilla: bool = False, reranked: bool = False, + chunks: bool = False, ): """Main entry point for the pipeline execution. @@ -170,6 +179,7 @@ def main( local (bool): If `True`, the local LLM via Ollama will be used. embeddings (bool): If `True`, the embeddings will be fine-tuned. argilla (bool): If `True`, the Argilla annotations will be used. + chunks (bool): If `True`, the chunks pipeline will be run. """ pipeline_args = {"enable_cache": not no_cache} embeddings_finetune_args = { @@ -201,6 +211,8 @@ def main( finetune_embeddings.with_options(**embeddings_finetune_args)() if dummyembeddings: chunking_experiment.with_options(**pipeline_args)() + if chunks: + generate_chunk_questions.with_options(**pipeline_args)() if __name__ == "__main__": diff --git a/llm-complete-guide/steps/eval_retrieval.py b/llm-complete-guide/steps/eval_retrieval.py index 68eeb136..d9100e25 100644 --- a/llm-complete-guide/steps/eval_retrieval.py +++ b/llm-complete-guide/steps/eval_retrieval.py @@ -198,7 +198,7 @@ def perform_retrieval_evaluation( if all(url_ending not in url for url in urls): logging.error( - f"Failed for question: {question}. Expected URL ending: {url_ending}. Got: {urls}" + f"Failed for question: {question}. Expected URL containing: {url_ending}. Got: {urls}" ) failures += 1 diff --git a/llm-complete-guide/steps/finetune_embeddings.py b/llm-complete-guide/steps/finetune_embeddings.py index f3c20273..1114825f 100644 --- a/llm-complete-guide/steps/finetune_embeddings.py +++ b/llm-complete-guide/steps/finetune_embeddings.py @@ -373,7 +373,14 @@ def visualize_results( color="red", ) for i, v in enumerate(finetuned_values): - ax.text(v - 1.5, i - height / 2, f"{v:.1f}", va="center", ha="right", color="white") + ax.text( + v - 1.5, + i - height / 2, + f"{v:.1f}", + va="center", + ha="right", + color="white", + ) ax.barh( [i + height / 2 for i in y], base_values, @@ -382,7 +389,14 @@ def visualize_results( color="blue", ) for i, v in enumerate(base_values): - ax.text(v - 1.5, i + height / 2, f"{v:.1f}", va="center", ha="right", color="white") + ax.text( + v - 1.5, + i + height / 2, + f"{v:.1f}", + va="center", + ha="right", + color="white", + ) ax.set_xlabel("Scores (%)") ax.set_title("Evaluation Results") diff --git a/llm-complete-guide/steps/hf_dataset_loader.py b/llm-complete-guide/steps/hf_dataset_loader.py index 07be3d14..5615ba4a 100644 --- a/llm-complete-guide/steps/hf_dataset_loader.py +++ b/llm-complete-guide/steps/hf_dataset_loader.py @@ -29,6 +29,3 @@ def load_hf_dataset() -> ( train_dataset = load_dataset(DATASET_NAME_DEFAULT, split="train") test_dataset = load_dataset(DATASET_NAME_DEFAULT, split="test") return train_dataset, test_dataset - - -load_hf_dataset() diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index fc54ef5e..741bcbd8 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -19,9 +19,10 @@ # https://www.timescale.com/blog/postgresql-as-a-vector-database-create-store-and-query-openai-embeddings-with-pgvector/ # for providing the base implementation for this indexing functionality +import json import logging import math -from typing import Annotated, List +from typing import Annotated from constants import ( CHUNK_OVERLAP, @@ -41,16 +42,16 @@ @step def preprocess_documents( - documents: List[Document], -) -> Annotated[List[Document], ArtifactConfig(name="split_chunks")]: + documents: str, +) -> Annotated[str, ArtifactConfig(name="split_chunks")]: """ - Preprocesses a list of documents by splitting them into chunks. + Preprocesses a JSON string of documents by splitting them into chunks. Args: - documents (List[Document]): A list of documents to be preprocessed. + documents (str): A JSON string containing a list of documents to be preprocessed. Returns: - Annotated[List[Document], ArtifactConfig(name="split_chunks")]: A list of preprocessed documents annotated with an ArtifactConfig. + Annotated[str, ArtifactConfig(name="split_chunks")]: A JSON string containing a list of preprocessed documents annotated with an ArtifactConfig. Raises: Exception: If an error occurs during preprocessing. @@ -64,10 +65,17 @@ def preprocess_documents( }, ) + # Parse the JSON string into a list of Document objects + document_list = [Document(**doc) for doc in json.loads(documents)] + split_docs = split_documents( - documents, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP + document_list, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) - return split_docs + + # Convert the list of Document objects back to a JSON string + split_docs_json = json.dumps([doc.__dict__ for doc in split_docs]) + + return split_docs_json except Exception as e: logger.error(f"Error in preprocess_documents: {e}") raise @@ -75,10 +83,8 @@ def preprocess_documents( @step def generate_embeddings( - split_documents: List[Document], -) -> Annotated[ - List[Document], ArtifactConfig(name="documents_with_embeddings") -]: + split_documents: str, +) -> Annotated[str, ArtifactConfig(name="documents_with_embeddings")]: """ Generates embeddings for a list of split documents using a SentenceTransformer model. @@ -86,7 +92,7 @@ def generate_embeddings( split_documents (List[Document]): A list of Document objects that have been split into chunks. Returns: - Annotated[List[Document], ArtifactConfig(name="embeddings")]: The list of Document objects with generated embeddings, annotated with an ArtifactConfig. + Annotated[str, ArtifactConfig(name="documents_with_embeddings")]: A JSON string containing the Document objects with generated embeddings, annotated with an ArtifactConfig. Raises: Exception: If an error occurs during the generation of embeddings. @@ -95,20 +101,28 @@ def generate_embeddings( model = SentenceTransformer(EMBEDDINGS_MODEL) log_artifact_metadata( - artifact_name="embeddings", + artifact_name="documents_with_embeddings", metadata={ "embedding_type": EMBEDDINGS_MODEL, "embedding_dimensionality": EMBEDDING_DIMENSIONALITY, }, ) - document_texts = [doc.page_content for doc in split_documents] + # Parse the JSON string into a list of Document objects + document_list = [ + Document(**doc) for doc in json.loads(split_documents) + ] + + document_texts = [doc.page_content for doc in document_list] embeddings = model.encode(document_texts) - for doc, embedding in zip(split_documents, embeddings): - doc.embedding = embedding + for doc, embedding in zip(document_list, embeddings): + doc.embedding = embedding.tolist() + + # Convert the list of Document objects to a JSON string + documents_json = json.dumps([doc.__dict__ for doc in document_list]) - return split_documents + return documents_json except Exception as e: logger.error(f"Error in generate_embeddings: {e}") raise @@ -116,7 +130,7 @@ def generate_embeddings( @step def index_generator( - documents: List[Document], + documents: str, ) -> None: """Generates an index for the given documents. @@ -126,7 +140,7 @@ def index_generator( using the cosine distance measure. Args: - documents (List[Document]): The list of Document objects with generated embeddings. + documents (str): A JSON string containing the Document objects with generated embeddings. Raises: Exception: If an error occurs during the index generation. @@ -155,11 +169,14 @@ def index_generator( register_vector(conn) + # Parse the JSON string into a list of Document objects + document_list = [Document(**doc) for doc in json.loads(documents)] + # Insert data only if it doesn't already exist - for doc in documents: + for doc in document_list: content = doc.page_content token_count = doc.token_count - embedding = doc.embedding.tolist() + embedding = doc.embedding filename = doc.filename parent_section = doc.parent_section url = doc.url diff --git a/llm-complete-guide/steps/synthetic_data.py b/llm-complete-guide/steps/synthetic_data.py index db3b7ebf..f61604b7 100644 --- a/llm-complete-guide/steps/synthetic_data.py +++ b/llm-complete-guide/steps/synthetic_data.py @@ -14,16 +14,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List +import json +import logging +from typing import Annotated import pandas as pd from datasets import Dataset from huggingface_hub import create_repo from litellm import completion from structures import Document -from zenml import step +from zenml import ArtifactConfig, step from zenml.client import Client +logger = logging.getLogger(__name__) + LOCAL_MODEL = "ollama/mixtral" @@ -36,7 +40,7 @@ def generate_question(chunk: str, local: bool = False) -> str: Returns: Generated question. """ - model = LOCAL_MODEL if local else "gpt-3.5-turbo" + model = LOCAL_MODEL if local else "gpt-4o" response = completion( model=model, messages=[ @@ -52,28 +56,37 @@ def generate_question(chunk: str, local: bool = False) -> str: @step def generate_questions_from_chunks( - docs_with_embeddings: List[Document], + docs_with_embeddings: str, local: bool = False, -) -> List[Document]: + logging_interval: int = 10, +) -> Annotated[str, ArtifactConfig(name="synthetic_questions")]: """Generate questions from chunks. Args: - docs_with_embeddings: List of documents with embeddings. + docs_with_embeddings: JSON string containing a list of Document objects with embeddings. local: Whether to run the pipeline with a local LLM. Returns: - List of documents with generated questions added. + JSON string containing a list of documents with generated questions added. """ - for doc in docs_with_embeddings: + document_list = [ + Document(**doc) for doc in json.loads(docs_with_embeddings) + ] + + for i, doc in enumerate(document_list, 1): doc.generated_questions = [generate_question(doc.page_content, local)] + if i % logging_interval == 0: + logger.info( + f"Progress: {i}/{len(document_list)} documents processed" + ) + logger.info( + f"Generated question for document {i}: {doc.generated_questions[0]}" + ) - assert all(doc.generated_questions for doc in docs_with_embeddings) + assert all(doc.generated_questions for doc in document_list) # Convert List[Document] to DataFrame - df = pd.DataFrame([doc.__dict__ for doc in docs_with_embeddings]) - - # Convert numpy arrays to lists - df["embedding"] = df["embedding"].apply(lambda x: x.tolist()) + df = pd.DataFrame([doc.__dict__ for doc in document_list]) # upload the parquet file to a private dataset on the huggingface hub client = Client() @@ -83,14 +96,15 @@ def generate_questions_from_chunks( "zenml/rag_qa_embedding_questions", token=hf_token, exist_ok=True, - private=True, repo_type="dataset", ) + # add an extra `__pydantic_initialised__` column to the dataframe + df["__pydantic_initialised__"] = True + dataset = Dataset.from_pandas(df) dataset.push_to_hub( repo_id="zenml/rag_qa_embedding_questions", - private=True, token=hf_token, create_pr=True, ) diff --git a/llm-complete-guide/steps/url_scraper.py b/llm-complete-guide/steps/url_scraper.py index 213c6994..af7fb2a3 100644 --- a/llm-complete-guide/steps/url_scraper.py +++ b/llm-complete-guide/steps/url_scraper.py @@ -12,10 +12,11 @@ # or implied. See the License for the specific language governing # permissions and limitations under the License. -from typing import List + +import json from typing_extensions import Annotated -from zenml import log_artifact_metadata, step +from zenml import ArtifactConfig, log_artifact_metadata, step from steps.url_scraping_utils import get_all_pages @@ -25,17 +26,16 @@ def url_scraper( docs_url: str = "https://docs.zenml.io", repo_url: str = "https://github.com/zenml-io/zenml", website_url: str = "https://zenml.io", -) -> Annotated[List[str], "urls"]: +) -> Annotated[str, ArtifactConfig(name="urls")]: """Generates a list of relevant URLs to scrape. Args: docs_url: URL to the documentation. repo_url: URL to the repository. - release_notes_url: URL to the release notes. website_url: URL to the website. Returns: - List of URLs to scrape. + JSON string containing a list of URLs to scrape. """ # We comment this out to make this pipeline faster # examples_readme_urls = get_nested_readme_urls(repo_url) @@ -44,8 +44,9 @@ def url_scraper( # all_urls = docs_urls + website_urls + examples_readme_urls all_urls = docs_urls log_artifact_metadata( + artifact_name="urls", metadata={ "count": len(all_urls), }, ) - return all_urls + return json.dumps(all_urls) diff --git a/llm-complete-guide/steps/url_scraping_utils.py b/llm-complete-guide/steps/url_scraping_utils.py index c27eb97c..5adc42a5 100644 --- a/llm-complete-guide/steps/url_scraping_utils.py +++ b/llm-complete-guide/steps/url_scraping_utils.py @@ -48,6 +48,18 @@ def is_valid_url(url: str, base: str) -> bool: return not re.search(version_pattern, url) +def strip_query_params(url: str) -> str: + """Strip query parameters from a URL. + + Args: + url (str): The URL to strip query parameters from. + + Returns: + str: The URL without query parameters. + """ + return url.split("?")[0] + + def get_all_pages(url: str) -> List[str]: """ Retrieve all pages with the same base as the given URL. @@ -60,10 +72,23 @@ def get_all_pages(url: str) -> List[str]: """ logger.info(f"Scraping all pages from {url}...") base_url = urlparse(url).netloc - pages = crawl(url, base_url) - logger.info(f"Found {len(pages)} pages.") + + # Use a queue-based approach instead of recursion + pages = set() + queue = [url] + while queue: + current_url = queue.pop(0) + if current_url not in pages: + pages.add(current_url) + links = get_all_links(current_url, base_url) + queue.extend(links) + sleep(1 / RATE_LIMIT) # Rate limit the requests + + stripped_pages = [strip_query_params(page) for page in pages] + + logger.info(f"Found {len(stripped_pages)} pages.") logger.info("Done scraping pages.") - return list(pages) + return list(stripped_pages) def crawl(url: str, base: str, visited: Set[str] = None) -> Set[str]: @@ -118,6 +143,7 @@ def get_all_links(url: str, base: str) -> List[str]: parsed_url = urlparse(full_url) cleaned_url = parsed_url._replace(fragment="").geturl() if is_valid_url(cleaned_url, base): + print(cleaned_url) links.append(cleaned_url) logger.debug(f"Found {len(links)} valid links from {url}") diff --git a/llm-complete-guide/steps/web_url_loader.py b/llm-complete-guide/steps/web_url_loader.py index 404312a5..fe523ebe 100644 --- a/llm-complete-guide/steps/web_url_loader.py +++ b/llm-complete-guide/steps/web_url_loader.py @@ -12,7 +12,8 @@ # or implied. See the License for the specific language governing # permissions and limitations under the License. -from typing import Annotated, List +import json +from typing import Annotated from structures import Document from unstructured.partition.html import partition_html @@ -23,18 +24,19 @@ @step def web_url_loader( - urls: List[str], -) -> Annotated[List[Document], ArtifactConfig(name="documents_from_urls")]: + urls: str, +) -> Annotated[str, ArtifactConfig(name="documents_from_urls")]: """Loads documents from a list of URLs. Args: - urls: List of URLs to load documents from. + urls: JSON string containing a list of URL strings to load documents from. Returns: - List of custom Document objects. + JSON string containing a list of custom Document objects. """ + url_list = json.loads(urls) documents = [] - for url in urls: + for url in url_list: elements = partition_html(url=url) text = "\n\n".join([str(el) for el in elements]) @@ -48,4 +50,4 @@ def web_url_loader( ) documents.append(document) - return documents + return json.dumps([doc.__dict__ for doc in documents])