Skip to content

Commit

Permalink
orjson dump and formatting for debug messages
Browse files Browse the repository at this point in the history
  • Loading branch information
cyyeh committed Jul 4, 2024
1 parent 4114b1a commit 585c35b
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 31 deletions.
11 changes: 8 additions & 3 deletions wren-ai-service/src/pipelines/ask/followup_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Any, List

import orjson
from hamilton import base
from hamilton.experimental.h_async import AsyncDriver
from haystack import Document
Expand Down Expand Up @@ -138,7 +139,9 @@ def prompt(
prompt_builder: PromptBuilder,
) -> dict:
logger.debug(f"query: {query}")
logger.debug(f"documents: {documents}")
logger.debug(
f"documents: {orjson.dumps(documents, option=orjson.OPT_INDENT_2).decode()}"
)
logger.debug(f"history: {history}")
return prompt_builder.run(
query=query, documents=documents, history=history, alert=alert
Expand All @@ -147,13 +150,15 @@ def prompt(

@async_timer
async def generate(prompt: dict, generator: Any) -> dict:
logger.debug(f"prompt: {prompt}")
logger.debug(f"prompt: {orjson.dumps(prompt, option=orjson.OPT_INDENT_2).decode()}")
return await generator.run(prompt=prompt.get("prompt"))


@async_timer
async def post_process(generate: dict, post_processor: GenerationPostProcessor) -> dict:
logger.debug(f"generate: {generate}")
logger.debug(
f"generate: {orjson.dumps(generate, option=orjson.OPT_INDENT_2).decode()}"
)
return await post_processor.run(generate.get("replies"))


Expand Down
15 changes: 11 additions & 4 deletions wren-ai-service/src/pipelines/ask/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Any, Dict, List

import orjson
from hamilton import base
from hamilton.experimental.h_async import AsyncDriver
from haystack import Document
Expand Down Expand Up @@ -98,22 +99,28 @@ def prompt(
prompt_builder: PromptBuilder,
) -> dict:
logger.debug(f"query: {query}")
logger.debug(f"documents: {documents}")
logger.debug(f"exclude: {exclude}")
logger.debug(
f"documents: {orjson.dumps(documents, option=orjson.OPT_INDENT_2).decode()}"
)
logger.debug(
f"exclude: {orjson.dumps(exclude, option=orjson.OPT_INDENT_2).decode()}"
)
return prompt_builder.run(
query=query, documents=documents, exclude=exclude, alert=alert
)


@async_timer
async def generate(prompt: dict, generator: Any) -> dict:
logger.debug(f"prompt: {prompt}")
logger.debug(f"prompt: {orjson.dumps(prompt, option=orjson.OPT_INDENT_2).decode()}")
return await generator.run(prompt=prompt.get("prompt"))


@async_timer
async def post_process(generate: dict, post_processor: GenerationPostProcessor) -> dict:
logger.debug(f"generate: {generate}")
logger.debug(
f"generate: {orjson.dumps(generate, option=orjson.OPT_INDENT_2).decode()}"
)
return await post_processor.run(generate.get("replies"))


Expand Down
12 changes: 8 additions & 4 deletions wren-ai-service/src/pipelines/ask/historical_question.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional

import orjson
from hamilton import base
from hamilton.experimental.h_async import AsyncDriver
from haystack import Document, component
Expand Down Expand Up @@ -64,21 +65,24 @@ async def embedding(query: str, embedder: Any) -> dict:
@async_timer
async def retrieval(embedding: dict, retriever: Any) -> dict:
res = await retriever.run(query_embedding=embedding.get("embedding"))
documents = res.get("documents")
return dict(documents=documents)
return dict(documents=res.get("documents"))


@timer
def filtered_documents(retrieval: dict, score_filter: ScoreFilter) -> dict:
logger.debug(f"retrieval: {retrieval}")
logger.debug(
f"retrieval: {orjson.dumps(retrieval, option=orjson.OPT_INDENT_2).decode()}"
)
return score_filter.run(documents=retrieval.get("documents"))


@timer
def formatted_output(
filtered_documents: dict, output_formatter: OutputFormatter
) -> dict:
logger.debug(f"filtered_documents: {filtered_documents}")
logger.debug(
f"filtered_documents: {orjson.dumps(filtered_documents, option=orjson.OPT_INDENT_2).decode()}"
)
return output_formatter.run(documents=filtered_documents.get("documents"))


Expand Down
15 changes: 11 additions & 4 deletions wren-ai-service/src/pipelines/ask/sql_correction.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Any, Dict, List

import orjson
from hamilton import base
from hamilton.experimental.h_async import AsyncDriver
from haystack import Document
Expand Down Expand Up @@ -64,8 +65,12 @@ def prompt(
alert: str,
prompt_builder: PromptBuilder,
) -> dict:
logger.debug(f"documents: {documents}")
logger.debug(f"invalid_generation_results: {invalid_generation_results}")
logger.debug(
f"documents: {orjson.dumps(documents, option=orjson.OPT_INDENT_2).decode()}"
)
logger.debug(
f"invalid_generation_results: {orjson.dumps(invalid_generation_results, option=orjson.OPT_INDENT_2).decode()}"
)
return prompt_builder.run(
documents=documents,
invalid_generation_results=invalid_generation_results,
Expand All @@ -75,13 +80,15 @@ def prompt(

@async_timer
async def generate(prompt: dict, generator: Any) -> dict:
logger.debug(f"prompt: {prompt}")
logger.debug(f"prompt: {orjson.dumps(prompt, option=orjson.OPT_INDENT_2).decode()}")
return await generator.run(prompt=prompt.get("prompt"))


@async_timer
async def post_process(generate: dict, post_processor: GenerationPostProcessor) -> dict:
logger.debug(f"generate: {generate}")
logger.debug(
f"generate: {orjson.dumps(generate, options=orjson.OPT_INDENT_2).decode()}"
)
return await post_processor.run(generate.get("replies"))


Expand Down
6 changes: 4 additions & 2 deletions wren-ai-service/src/pipelines/ask_details/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ def prompt(sql: str, prompt_builder: PromptBuilder) -> dict:

@async_timer
async def generate(prompt: dict, generator: Any) -> dict:
logger.debug(f"prompt: {prompt}")
logger.debug(f"prompt: {orjson.dumps(prompt, option=orjson.OPT_INDENT_2).decode()}")
return await generator.run(prompt=prompt.get("prompt"))


@async_timer
async def post_process(generate: dict, post_processor: GenerationPostProcessor) -> dict:
logger.debug(f"generate: {generate}")
logger.debug(
f"generate: {orjson.dumps(generate, option=orjson.OPT_INDENT_2).decode()}"
)
return await post_processor.run(generate.get("replies"))


Expand Down
28 changes: 21 additions & 7 deletions wren-ai-service/src/pipelines/indexing/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,51 +323,65 @@ def clean_document_store(mdl_str: str, cleaner: DocumentCleaner) -> Dict[str, An
def validate_mdl(
clean_document_store: Dict[str, Any], validator: MDLValidator
) -> Dict[str, Any]:
logger.debug(f"input in validate_mdl: {clean_document_store}")
logger.debug(
f"input in validate_mdl: {orjson.dumps(clean_document_store, option=orjson.OPT_INDENT_2).decode()}"
)
mdl = clean_document_store.get("mdl")
res = validator.run(mdl=mdl)
return dict(mdl=res["mdl"])


@timer
def convert_to_ddl(mdl: Dict[str, Any], ddl_converter: DDLConverter) -> Dict[str, Any]:
logger.debug(f"input in convert_to_ddl: {mdl}")
logger.debug(
f"input in convert_to_ddl: {orjson.dumps(mdl, option=orjson.OPT_INDENT_2).decode()}"
)
return ddl_converter.run(mdl=mdl)


@async_timer
async def embed_ddl(
convert_to_ddl: Dict[str, Any], ddl_embedder: Any
) -> Dict[str, Any]:
logger.debug(f"input in embed_ddl: {convert_to_ddl}")
logger.debug(
f"input in embed_ddl: {orjson.dumps(convert_to_ddl, option=orjson.OPT_INDENT_2).decode()}"
)
return await ddl_embedder.run(documents=convert_to_ddl["documents"])


@timer
def write_ddl(embed_ddl: Dict[str, Any], ddl_writer: DocumentWriter) -> None:
logger.debug(f"input in write_ddl: {embed_ddl}")
logger.debug(
f"input in write_ddl: {orjson.dumps(embed_ddl, option=orjson.OPT_INDENT_2).decode()}"
)
return ddl_writer.run(documents=embed_ddl["documents"])


@timer
def convert_to_view(
mdl: Dict[str, Any], view_converter: ViewConverter
) -> Dict[str, Any]:
logger.debug(f"input in convert_to_view: {mdl}")
logger.debug(
f"input in convert_to_view: {orjson.dumps(mdl, option=orjson.OPT_INDENT_2).decode()}"
)
return view_converter.run(mdl=mdl)


@async_timer
async def embed_view(
convert_to_view: Dict[str, Any], view_embedder: Any
) -> Dict[str, Any]:
logger.debug(f"input in embed_view: {convert_to_view}")
logger.debug(
f"input in embed_view: {orjson.dumps(convert_to_view, option=orjson.OPT_INDENT_2).decode()}"
)
return await view_embedder.run(documents=convert_to_view["documents"])


@timer
def write_view(embed_view: Dict[str, Any], view_writer: DocumentWriter) -> None:
logger.debug(f"input in write_view: {embed_view}")
logger.debug(
f"input in write_view: {orjson.dumps(embed_view, option=orjson.OPT_INDENT_2).decode()}"
)
return view_writer.run(documents=embed_view["documents"])


Expand Down
14 changes: 10 additions & 4 deletions wren-ai-service/src/pipelines/sql_explanation/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ def run(self, replies: List[str]) -> Dict[str, Any]:
def preprocess(
sql_analysis_results: List[dict], pre_processor: SQLAnalysisPreprocessor
) -> List[dict]:
logger.debug(f"sql_analysis_results: {sql_analysis_results}")
logger.debug(
f"sql_analysis_results: {orjson.dumps(sql_analysis_results, option=orjson.OPT_INDENT_2).decode()}"
)
return pre_processor.run(sql_analysis_results)


Expand All @@ -243,7 +245,9 @@ def prompt(
) -> dict:
logger.debug(f"question: {question}")
logger.debug(f"sql: {sql}")
logger.debug(f"preprocess: {preprocess}")
logger.debug(
f"preprocess: {orjson.dumps(preprocess, option=orjson.OPT_INDENT_2).decode()}"
)
logger.debug(f"sql_summary: {sql_summary}")
logger.debug(f"full_sql: {full_sql}")
return prompt_builder.run(
Expand All @@ -257,13 +261,15 @@ def prompt(

@async_timer
async def generate(prompt: dict, generator: Any) -> dict:
logger.debug(f"prompt: {prompt}")
logger.debug(f"prompt: {orjson.dumps(prompt, option=orjson.OPT_INDENT_2).decode()}")
return await generator.run(prompt=prompt.get("prompt"))


@timer
def post_process(generate: dict, post_processor: GenerationPostProcessor) -> dict:
logger.debug(f"generate: {generate}")
logger.debug(
f"generate: {orjson.dumps(generate, option=orjson.OPT_INDENT_2).decode()}"
)
return post_processor.run(generate.get("replies"))


Expand Down
12 changes: 9 additions & 3 deletions wren-ai-service/src/pipelines/sql_regeneration/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def sql_regeneration_prompt(
preprocess: Dict[str, Any],
sql_regeneration_prompt_builder: PromptBuilder,
) -> dict:
logger.debug(f"preprocess: {preprocess}")
logger.debug(
f"preprocess: {orjson.dumps(preprocess, option=orjson.OPT_INDENT_2).decode()}"
)
return sql_regeneration_prompt_builder.run(results=preprocess["results"])


Expand All @@ -114,7 +116,9 @@ async def sql_regeneration_generate(
sql_regeneration_prompt: dict,
sql_regeneration_generator: Any,
) -> dict:
logger.debug(f"sql_regeneration_prompt: {sql_regeneration_prompt}")
logger.debug(
f"sql_regeneration_prompt: {orjson.dumps(sql_regeneration_prompt, option=orjson.OPT_INDENT_2).decode()}"
)
return await sql_regeneration_generator.run(
prompt=sql_regeneration_prompt.get("prompt")
)
Expand All @@ -125,7 +129,9 @@ def sql_regeneration_post_process(
sql_regeneration_generate: dict,
sql_regeneration_post_processor: SQLRegenerationPostProcessor,
) -> dict:
logger.debug(f"sql_regeneration_generate: {sql_regeneration_generate}")
logger.debug(
f"sql_regeneration_generate: {orjson.dumps(sql_regeneration_generate, option=orjson.OPT_INDENT_2).decode()}"
)
return sql_regeneration_post_processor.run(
replies=sql_regeneration_generate.get("replies"),
)
Expand Down

0 comments on commit 585c35b

Please sign in to comment.