Skip to content

Commit

Permalink
[td] Fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
tommydangerous committed May 2, 2024
1 parent 930ac9f commit 59d4db3
Show file tree
Hide file tree
Showing 11 changed files with 663 additions and 35 deletions.
8 changes: 4 additions & 4 deletions conditionals/data_preparation/nlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# from transformers import BertTokenizer
# from nltk.corpus import stopwords

nlp_spacy = spacy.load('en_core_web_lg')
# nlp_spacy.add_pipe('textrank')
nlp = spacy.load('en_core_web_lg')
# nlp.add_pipe('textrank')

# stopwords_set = set(stopwords.words('english'))

Expand All @@ -13,7 +13,7 @@

@factory
def nlp(*args, **kwargs):
return nlp_spacy
return nlp


@factory
Expand All @@ -25,4 +25,4 @@ def stop_words(*args, **kwargs) -> set:
@factory
def tokenizer(*args, **kwargs):
pass
# return bert_tokenizer
# return bert_tokenizer
7 changes: 0 additions & 7 deletions data_loaders/daring_waterfall.yaml

This file was deleted.

15 changes: 9 additions & 6 deletions data_loaders/remote_blocks/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
@data_loader
def load_data(*args, **kwargs):
sample = int(kwargs.get('sample', 2))
dry_run = sample >= 1

outputs = kwargs.get('remote_blocks')

Expand All @@ -26,12 +25,8 @@ def load_data(*args, **kwargs):

df = pd.DataFrame()
arr = []

if dry_run:
outputs = outputs[:sample]

print(f'sample: {sample}')
print(f'dry_run: {dry_run}')
print(f'sample: {sample}')
print(f'outputs: {len(outputs)}')

for dfs in outputs:
Expand Down Expand Up @@ -59,8 +54,16 @@ def load_data(*args, **kwargs):

if len(arr) >= 1:
print(f'arr: {len(arr)}')
if sample >= 1:
return arr[:sample]

return arr

print(f'df: {len(df)}')

if sample >= 1:
df = df.iloc[:sample]
print(f'df: {len(df)}')
return df

return df
4 changes: 2 additions & 2 deletions pipelines/data_preparation_transformer/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ blocks:
language: python
name: remote_blocks/load
retry_config: null
status: executed
status: updated
timeout: null
type: data_loader
upstream_blocks: []
uuid: remote_blocks/load
- all_upstream_blocks_executed: true
- all_upstream_blocks_executed: false
color: null
configuration:
dynamic: true
Expand Down
Empty file.
68 changes: 68 additions & 0 deletions pipelines/training_set_llm/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
blocks:
- all_upstream_blocks_executed: true
color: null
configuration:
dynamic: true
file_path: data_loaders/remote_blocks/load.py
file_source:
path: data_loaders/remote_blocks/load.py
downstream_blocks:
- topics/model
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: remote_blocks/load
retry_config: null
status: executed
timeout: null
type: data_loader
upstream_blocks: []
uuid: remote_blocks/load
- all_upstream_blocks_executed: true
color: null
configuration:
file_source:
path: transformers/topics/model.py
downstream_blocks: []
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: topics/model
retry_config: null
status: updated
timeout: null
type: transformer
upstream_blocks:
- remote_blocks/load
uuid: topics/model
cache_block_output_in_memory: false
callbacks: []
concurrency_config: {}
conditionals: []
created_at: '2024-05-01 08:28:59.123845+00:00'
data_integration: null
description: Training set to fine tune LLMs.
executor_config: {}
executor_count: 1
executor_type: null
extensions: {}
name: training set/LLM
notification_config: {}
remote_variables_dir: null
retry_config: {}
run_pipeline_in_one_process: false
settings:
triggers: null
spark_config: {}
tags:
- llm
type: python
uuid: training_set_llm
variables:
remote_source_block_uuid: reduce/dataframe
remote_source_pipeline_uuid: data_preparation_data_loader
sample: 40
variables_dir: /root/.mage_data/llm_orchestration
widgets: []
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ faiss-cpu==1.8.0
gensim==4.3.2
hnswlib==0.8.0
neo4j==5.19.0
openai==1.25.0
# python -m nltk.downloader stopwords punkt wordnet
# nltk==3.8.1
pgvector==0.2.5
Expand Down
75 changes: 75 additions & 0 deletions transformers/consolidated_summary_refinement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import requests
import json
from typing import List, Dict, Any


def construct_combined_prompt(data: List[Dict[str, Any]]) -> str:
"""
Constructs a single, comprehensive prompt for the LLM based on multiple summaries and topics.
"""
combined_prompt_parts = []
for item in data:
summary, topics = item.get('summary', ''), item.get('topics', [])
topics_str = ", ".join(topics)
prompt_part = f"Summary: {summary} Topics: {topics_str}."
combined_prompt_parts.append(prompt_part)

combined_prompt = " ".join(combined_prompt_parts) + \
" Please adjust the summaries for accuracy and suggest adjusted topics if necessary. " \
"Return as a JSON list of dictionaries with keys 'topics', 'summary', " \
"'summary_adjusted', and 'topics_adjusted'."
return combined_prompt


def call_llm_api_with_combined_prompt(prompt: str) -> List[Dict[str, Any]]:
"""
Calls the LLM API with the combined prompt and returns the API's response.
"""
url = "https://sorcery.mage.ai:8000/api/v1/generations"
payload = {
"model": "deepseek-ai/deepseek-coder-6.7b-base",
"texts": [prompt]
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, headers=headers, data=json.dumps(payload))

if response.status_code == 200:
response_data = response.json()
generation_output = response_data.get('generations', ["{}"])[0]
try:
output_data = json.loads(generation_output)
except json.JSONDecodeError:
output_data = [{"error": "Failed to decode LLM output as JSON.", "response": generation_output}]
return output_data
else:
return [{"error": "Failed to receive a successful response from the LLM API.", "status_code": response.status_code}]


def update_with_new_keys(existing_data: List[Dict[str, Any]], new_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Updates the existing list of dictionaries with new keys and values from the LLM response.
"""
# Assuming the order and count of dictionaries in both existing_data and new_data are aligned
# and correspond to each other.
for original, update in zip(existing_data, new_data):
# This loop assumes each dictionary in new_data corresponds to and should update the dictionary in existing_data
original.update(update)
return existing_data


@transform
def process(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Transforms the input data by calling the LLM API with a single, combined prompt constructed from all items, processing the response,
and integrating the new data with the existing data.
"""
prompt = construct_combined_prompt(data)

new_data = call_llm_api_with_combined_prompt(prompt)

# Assuming new_data correctly maps back to the original list of dictionaries
# and contains additional keys for updates.
# Now, call the update_with_new_keys function to merge the new data with the existing data.
updated_data = update_with_new_keys(data, new_data)

return updated_data
65 changes: 49 additions & 16 deletions transformers/topics/model.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,53 @@
from typing import Dict, List, Union
import json
from typing import Any, Dict, List, Union

from default_repo.llm_orchestration.models.topics import get_train_transform
import pandas as pd

from default_repo.llm_orchestration.utils.topic_summary_processor import summarize_and_infer_topics_openai


@transformer
def transform(documents: List[List[Union[str, Dict]]], *args, **kwargs):
factory_items_mapping = kwargs.get('factory_items_mapping')
nlp, _ = factory_items_mapping['data_preparation/nlp']

data = get_train_transform(
nlp,
documents=[document[1] for document in documents],
execution_partition=kwargs.get('execution_partition'),
train=kwargs.get('train', 1) == 1,
)

return [
data,
]
def transform(document: Dict[str, Any], *args, **kwargs) -> pd.DataFrame:
"""
Transform a single document, represented as a dictionary, to include topics
and a summary generated from its content.
:param document: A dictionary containing at least 'document_id', 'document', and 'metadata'.
:param args: Additional positional arguments (unused in this example).
:param kwargs: Additional keyword arguments (unused in this example).
:return: Updated document dictionary including 'topics' and 'summary'.
"""
document_id = document.get('document_id', '')
doc_text = document.get('document', '')
metadata = document.get('metadata', {})

print(document_id)

rows = []
topics = {}

responses = summarize_and_infer_topics_openai(doc_text, verbosity=2)
for res in responses:
chunks = list(json.loads(res['choices'][0]['message']['content']).values())[0]
print(f'chunks: {len(chunks)}')

for topic_chunk in chunks:
print(topic_chunk)
chunk = topic_chunk.get('sentence') or topic_chunk.get('text')
topic = topic_chunk['topic']

topics[topic] = topics.get(topic) or []
topics[topic].append(chunk)

row = dict(chunk=chunk, topic=topic)
row.update(document)
rows.append(row)

print(f'rows: {len(rows)}')
print(f'topics: {len(topics)}')
for topic, chunks in topics.items():
print(f'\t{topic}: {len(chunks)}')

df = pd.DataFrame(rows)

return df
Loading

0 comments on commit 59d4db3

Please sign in to comment.