diff --git a/pyproject.toml b/pyproject.toml index 8c1e58ac..3791345b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,11 @@ numpy = "~=1.26" umap-learn = "~=0.5" scikit-learn = "~=1.5" nltk = "~=3.9" +sacrebleu = "^2.4.3" +pytest-testmon = "^2.1.1" vocos = "~=0.1" +deepeval = "^1.2.2" +textstat = "^0.7.4" iso639 = "~=0.1" nest-asyncio = "~=1.5" pylangacq = "~=0.19" @@ -129,10 +133,7 @@ target-version = "py310" [tool.ruff.lint] select = ["ANN", "D", "E", "F", "I"] -ignore = [ - "ANN101", # self should not be annotated. - "ANN102" # cls should not be annotated. -] +ignore = [] fixable = ["ALL"] unfixable = [] dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" @@ -163,7 +164,7 @@ skip = [ "*.cha", "*.ipynb" ] -ignore-words-list = ["senselab", "nd", "astroid", "wil", "SER", "te"] +ignore-words-list = ["senselab", "nd", "astroid", "wil", "SER", "te", "ROUGE", "rouge"] [build-system] requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"] diff --git a/src/senselab/text/tasks/evaluate_conversation/__init__.py b/src/senselab/text/tasks/evaluate_conversation/__init__.py new file mode 100644 index 00000000..6a9953da --- /dev/null +++ b/src/senselab/text/tasks/evaluate_conversation/__init__.py @@ -0,0 +1,7 @@ +"""senselab project integrates deepeval for evaluating conversations. + +Using an api.py script to interface with deep_eval.py, +which includes a custom ROUGE metric for comprehensive evaluation. +The ScriptLine class standardizes input data, and unit tests ensure accurate functionality, +making Senselab a robust wrapper for deepeval and other tools. +""" diff --git a/src/senselab/text/tasks/evaluate_conversation/api.py b/src/senselab/text/tasks/evaluate_conversation/api.py new file mode 100644 index 00000000..196f3b78 --- /dev/null +++ b/src/senselab/text/tasks/evaluate_conversation/api.py @@ -0,0 +1,22 @@ +"""This module provides the API for the senselab text evaluation.""" + +from typing import Dict, List + +from senselab.utils.data_structures.script_line import ScriptLine + +from .deep_eval import evaluate_conversation + + +def evaluate_chat(script_lines: List[ScriptLine]) -> Dict: + """Evaluate chat using the provided script lines and metrics. + + Args: + script_lines (List[ScriptLine]): A list of script lines to evaluate. + + Returns: + dict: The standardized result with overall score and metrics. + """ + metrics = ["rouge1", "rouge2", "rougeL"] # Define the metrics you want to use + result = evaluate_conversation(script_lines, metrics) + standardized_result = {"metrics": result["metrics"]} + return standardized_result diff --git a/src/senselab/text/tasks/evaluate_conversation/deep_eval.py b/src/senselab/text/tasks/evaluate_conversation/deep_eval.py new file mode 100644 index 00000000..5496e13d --- /dev/null +++ b/src/senselab/text/tasks/evaluate_conversation/deep_eval.py @@ -0,0 +1,33 @@ +"""deep_eval.py.""" + +from typing import Dict, List + +from senselab.utils.data_structures.script_line import ScriptLine + +from .metrics import Rouge + + +def evaluate_conversation(script_lines: List[ScriptLine], metrics: List[str]) -> Dict: + """Evaluate a conversation based on the provided script lines and metrics. + + Args: + script_lines (List[ScriptLine]): A list of script lines to evaluate. + metrics (List[str]): A list of metrics to use for evaluation. + + Returns: + dict: The evaluation result containing detailed metrics. + """ + if not script_lines: + return {"metrics": []} + references: List[str] = [line.text for line in script_lines if line.speaker == "agent" and line.text is not None] + hypotheses: List[str] = [line.text for line in script_lines if line.speaker == "user" and line.text is not None] + + if not references or not hypotheses: + return {"metrics": []} + + metric_instance = Rouge() + scores = metric_instance(references, hypotheses) + + metrics_results = [{metric: score.get(metric, 0.0) for metric in metrics} for score in scores] + + return {"metrics": metrics_results} diff --git a/src/senselab/text/tasks/evaluate_conversation/metrics.py b/src/senselab/text/tasks/evaluate_conversation/metrics.py new file mode 100644 index 00000000..2a949c0f --- /dev/null +++ b/src/senselab/text/tasks/evaluate_conversation/metrics.py @@ -0,0 +1,56 @@ +"""Metrics to assess performance on tutor response. + +Functions named as ``*_score`` return a scalar value to maximize: the higher +the better. + +Function named as ``*_error`` or ``*_loss`` return a scalar value to minimize: +the lower the better. + +All other functions are value-independent. +""" + +from typing import Dict, List + +import sacrebleu as sb +import textstat +from deepeval.metrics import GEval +from deepeval.test_case import LLMTestCaseParams +from rouge_score import rouge_scorer +from sacrebleu.metrics import BLEUScore + + +def Rouge(*args: List, **kwargs: Dict) -> rouge_scorer.RougeScorer: + """Wrapper for rouge_scorer's RougeScorer class.""" + return rouge_scorer.RougeScorer(*args, **kwargs) + + +Rouge.__doc__ = rouge_scorer.RougeScorer.__doc__ + + +def sentence_bleu_sacre(*args: List) -> List[BLEUScore]: + """Wrapper for sacrebleu's sentence_bleu function.""" + return [sb.sentence_bleu(str(item), str(ref)) for item, ref in args] + + +sentence_bleu_sacre.__doc__ = sb.sentence_bleu.__doc__ + + +def word_count(*args: List, **kwargs: Dict) -> int: + """Wrapper for textstat's lexicon_count function.""" + return textstat.lexicon_count(*args, **kwargs) + + +word_count.__doc__ = textstat.lexicon_count.__doc__ + + +correctness_metric = GEval( + name="Correctness", + criteria="Determine whether the actual output is factually correct based on the expected output.", + # NOTE: you can only provide either criteria or evaluation_steps, and not both + evaluation_steps=[ + "Check whether the facts in 'actual output' contradicts any facts in 'expected output'", + "You should also heavily penalize omission of detail", + "Vague language, or contradicting OPINIONS, are OK", + ], + evaluation_params=[LLMTestCaseParams.INPUT, LLMTestCaseParams.ACTUAL_OUTPUT], +) diff --git a/src/senselab/text/tasks/llms/__init__.py b/src/senselab/text/tasks/llms/__init__.py new file mode 100644 index 00000000..c5fcbf59 --- /dev/null +++ b/src/senselab/text/tasks/llms/__init__.py @@ -0,0 +1,3 @@ +""".. include:: ./doc.md""" # noqa: D415 + +__version__ = "1.0.0" diff --git a/src/senselab/text/tasks/llms/doc.md b/src/senselab/text/tasks/llms/doc.md new file mode 100644 index 00000000..3fae1acf --- /dev/null +++ b/src/senselab/text/tasks/llms/doc.md @@ -0,0 +1,188 @@ +# LLMs + + +## Overview +This module provides the API for making LLM calls in senselab. + +This project focuses on ingesting and processing data, utilizing language models, and handling transcript data. It provides utilities for parsing unstructured text and generating meaningful insights using a combination of custom functions and pre-trained models. + +## Structure +The project contains the following main components: + +transcript_manager.py: Handles data ingestion and preprocessing tasks. + +llm.py: Integrates language model-related functionality. + +process_transcript_example.py: Demonstrates how to process transcript data, using methods provided in this package. + + +## transcript_manager.py + +The `transcript_manager` module provides a data manager for handling interactions with a large language model (LLM). It allows the loading of transcripts, converting JSON data into scriptline objects, and extracting conversation data in a format that can be used to query potential AI responses. + +### Class: `Transcript` + +The `Transcript` class manages message data for interactions with a LLM. It provides methods to load transcripts, convert JSON transcript data into a usable format, and extract conversation segments for AI response opportunities. You will use it by initializing it on a valid transcript path. That transcript data is loaded in and stored as a list of scriptlines. These can then be printed in a readable format, you can see the number of tokens in the transcript, and the data is ready to be called by the LLM class in llm.py. + +### Attributes: + **`scriptlines (List[ScriptLine])`**: A list of `ScriptLine` objects representing the conversation. See documentionation in senselab/utils/data_structures/script_line.py. + +### Methods + +#### 1. `__init__(self, transcript_path: Path) -> None` + +Initializes the `MessagesManager` with a path to the JSON transcript file. Loads the transcript and converts it into scriptline objects. + +**Parameters:** +- `transcript_path (Path)`: The path to the JSON transcript file. + + +#### 2. `print_human_readable(self) -> None` + +Prints the scriptlines attribute in a human-readable format, where each message is displayed with the speaker and content. + + +#### 3. `extract_response_opportunities(self) -> List[List[Dict[str, str]]]` + +Extracts consecutive sublists from the message list, ending after every 'user' response. These sublists can be used to compare AI responses to human responses over the course of a conversation. + +**Returns:** +- `List[List[Dict[str, str]]]`: A list of consecutive sublists of messages, each ending with a 'user' message. + +Example: +```python +response_opportunities = manager.extract_response_opportunities() +``` + + +#### 4. `convert_json_to_scriptlines(self, json_obj: Dict) -> List[ScriptLine]` + +Converts transcript segments from a JSON object into a list of `ScriptLine` objects, where each scriptline contains the text and speaker. This method also maps "teacher" to "assistant" and "kid" to "user". + +**Parameters:** +- `json_obj (Dict)`: The JSON object containing the conversation segments. + + The input JSON object should have the following structure: + ``` + { + "segments": [ + { + "start": , + "end": , + "text": , + "words": [ + { + "word": , + "start": , + "end": , + "score": , + "speaker": [kid|teacher] + }, + ... + ], + "speaker": [kid|teacher] + }, + ... + ] + } + ``` + +**Returns:** +- `List[ScriptLine]`: A list of `ScriptLine` objects representing the conversation. + +**Raises:** +- `ValueError`: If the input JSON structure is invalid or contains an unknown speaker role. + + +#### 5. `get_num_tokens(self) -> int` + +Returns the total number of tokens in the stored scriptlines. Uses OpenAI GPT-4o tokenizer. + +**Returns:** +- `int`: Number of tokens in the transcript. +--- + +## Example Usage + +```python +from pathlib import Path +from transcript_manager import Transcript + +# Initialize the manager with the path to a transcript +transcript = Transcript(Path("transcript.json")) + +transcript.print_human_readable(messages) + +# Extract response opportunities from the conversation +response_opportunities = transcript.extract_response_opportunities() + +# Get the number of tokens used in the conversation +num_tokens = transcript.get_num_tokens() + +print(f"Total tokens: {num_tokens}") +``` +--- + + + +### Class: `LLM` + +The `LLM` class abstracts the interaction with different large language models (LLMs) such as `llama3-8b`, `llama3-70b`, and `gpt-4o`. The `LLM` class is designed to start a server for model interaction, handle inputs, and produce outputs based on the model selected. + +Note that some models (like `gpt-4o`) are called through external endpoints, while others (like `llama3-8b`) are hosted locally and need to be initialized first. Depending on the model, the `call` function sends requests either to an external server or a locally hosted server. + +#### Attributes: + **`_model_name (str)`**: The name of the model being used (e.g., `"llama3-70b"`). + **`_base_url (str)`**: The URL where the server is hosted. + **`_tokenizer (AutoTokenizer)`**: Tokenizer for the selected model. + +--- + +#### Methods + +##### 1. `__init__(self, model_name: str) -> None` + +Initializes the `LLM` instance with the specified model name, setting up the necessary client and tokenizer. + +**Parameters:** +- `model_name (str)`: The name of the model to initialize. +--- + +##### 2. `start_server(self, num_gpus: int, base_url: str = "http://localhost:8000/v1") -> Popen` + +Starts a VLLM server with the specified number of GPUs, serving the specified local model. The server enables tensor parallelism to manage large models efficiently. + +**Parameters:** +- `num_gpus (int)`: The number of GPUs to initialize the model with. +- `base_url (Optional[str])`: The URL where the server is to be hosted. Default is `"http://localhost:8000/v1"`. + +**Returns:** +- `Popen`: A `Popen` object representing the running server process. +--- + +##### 3. `call(self, messages: List[Dict], system_instruction: Optional[str] = "", max_tokens: Optional[int] = 100, temperature: Optional[float] = 0.3, measure: Optional[bool] = False) -> LLMResponse` + +Sends a series of messages to the model server and returns the model’s output. The `system_instruction` parameter provides additional context for the model, while the `measure` flag allows for token and latency measurements. + +**Parameters:** +- `messages (List[Dict])`: List of messages in the conversation. Each message is a dictionary with `role` and `content` keys. +- `system_instruction (Optional[str])`: Instruction for the system. Default is an empty string. +- `max_tokens (Optional[int])`: Maximum number of tokens for the output. +- `temperature (Optional[float])`: Sampling temperature, controlling randomness. Default is `0.3`. +- `measure (Optional[bool])`: If `True`, measures latency and token usage. Default is `False`. + +**Returns:** +- `LLMResponse`: An object containing the response content, latency, and token information (if measure flag set to True). See documentation at senselab/utils/data_structures/llm_response.py. + +### Example Usage + +``` +llm = LLM("llama3-70b") + +llm.start_server(num_gpus=4) + +messages = [{"role": "user", "content": "Tell me a joke."}] +response = llm.call(messages, system_instruction="You are a friendly assistant") +print(response.content) +``` +--- diff --git a/src/senselab/text/tasks/llms/llm.py b/src/senselab/text/tasks/llms/llm.py new file mode 100644 index 00000000..102e1a72 --- /dev/null +++ b/src/senselab/text/tasks/llms/llm.py @@ -0,0 +1,166 @@ +"""This module provides a wrapper for invoking various Large Language Models (LLMs). + +Classes: + LLM: A unified interface for interacting with various LLMs. +""" + +import os +import time +from subprocess import PIPE, Popen, check_output +from typing import List, Optional, Tuple + +import requests +import torch +from openai import OpenAI +from transformers import AutoTokenizer # type: ignore + +from senselab.utils.data_structures.llm_response import LLMResponse +from senselab.utils.data_structures.script_line import ScriptLine + + +class LLM: + """Wrapper for invoking various LLMs. + + This class provides a unified interface for interacting with LLMs, + running on a vllm server at localhost:8000. + + Parameters: + ----------- + model_name : str + The name of the model to use. This is a required argument. Options: + - "llama3-8b" + - "llama3-70b" + - "gpt-4o" + + Methods: + -------- + call(messages: List[Dict], system_instruction: Optional[str] = "", + max_tokens: Optional[int] = 100, temperature: Optional[float] = 0.3) -> str: + Invokes the model with the given message and system instruction. + start_server(num_gpus: int, base_url: str) -> None: + Starts the VLLM server with the specified number of GPUs. + """ + + def __init__(self: "LLM", model_name: str) -> None: + """Initializes the LLM instance with a model name and OpenAI client. + + Args: + model_name (str): The name of the model to use. + """ + self._model_name, self._serving_url = self._get_model(model_name) + + self._tokenizer = AutoTokenizer.from_pretrained(self._model_name) + + self._client = OpenAI(base_url=self._serving_url) + + def start_server(self: "LLM", num_gpus: int, timeout: int = 700) -> Optional[Popen]: + """Starts the VLLM server with the specified number of GPUs and logs the output. + + Args: + num_gpus (int): The number of GPUs to use for tensor parallelism in the VLLM server. + base_url (str): The base URL of the VLLM server, from which the host and port are extracted. + timeout (int): Time, in seconds, to wait for the server to start before termination. + + Returns: + Popen instance from subprocess module + """ + if torch.cuda.is_available(): + host = check_output("hostname -I | awk '{print $1}'", shell=True, text=True).strip() + port = 8000 + command = f"vllm serve {self._model_name} --host {host} --port {port} --tensor-parallel-size {num_gpus}" + self._serving_url = f"http://{host}:{port}/v1" + + # Run the server in the background + process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, text=True) + + # Wait for the server to start + start_time = time.time() + while time.time() - start_time < timeout: + try: + response = requests.get(self._serving_url, timeout=5) + if response.status_code == 200: + print("Server is up and running with a 200 response!") + break + except requests.ConnectionError: + pass + time.sleep(5) + else: + print(f"Server did not respond with a 200 status code within {timeout} seconds.") + process.terminate() + return None + + self._client = OpenAI(base_url=self._serving_url, api_key="EMPTY") + print(f"Serving on Host: {host}\tPort: {port}") + return process + else: + print("Please migrate to a compute node with GPU resources.") + return None + + def call( + self: "LLM", + messages: List[ScriptLine], + system_instruction: Optional[str] = "", + max_tokens: Optional[int] = 100, + temperature: Optional[float] = 0.3, + measure: Optional[bool] = False, + ) -> LLMResponse: + """Invokes the model with a given message and system instruction. + + Args: + messages (List[ScriptLine]): Conversation history. + system_instruction (Optional[str]): The system instruction for the model. + max_tokens (Optional[int]): Maximum number of tokens to generate. + temperature (Optional[float]): Sampling temperature ranging between 0 and 2. + measure (Optional[bool]): Whether to measure token counts and latency. + + Returns: + LLMResponse: dataclass with model's response, with token counts and latency if measured. + """ + openai_messages = [{"role": msg.speaker, "content": msg.text} for msg in messages] + + if system_instruction: + system_message = {"role": "system", "content": system_instruction} # type: ignore + openai_messages.insert(0, system_message) # type: ignore + + in_tokens = out_tokens = latency = None + + # initialize latency measurements + if measure: + in_tokens = sum(len(self._tokenizer.encode(message["content"])) for message in openai_messages) + start_time = time.time() + + completion = self._client.chat.completions.create( + model=self._model_name, + messages=openai_messages, # type: ignore[arg-type] + max_tokens=max_tokens, + temperature=temperature, + ) + content = completion.choices[0].message.content + + if measure: + latency = time.time() - start_time + out_tokens = len(self._tokenizer.encode(content)) + + return LLMResponse(speaker="AI", text=content, latency=latency, in_tokens=in_tokens, out_tokens=out_tokens) + + def _get_model(self: "LLM", model: str) -> Tuple[str, str]: + """Maps a model name to the corresponding model identifier and url. + + Args: + model (str): The name of the model. + + Returns: + Tuple[str,str]: 1) model identifier 2) URL + + Raises: + ValueError: If the model name is unsupported. + """ + model_mapping = { + "llama3-70b": ("meta-llama/Meta-Llama-3.1-70B-Instruct", f"http://{os.getenv('VLLM_IP_ADDRESS')}:8000/v1"), + "llama3-8b": ("meta-llama/Meta-Llama-3.1-8B-Instruct", f"http://{os.getenv('VLLM_IP_ADDRESS')}:8000/v1"), + "gpt-4o": ("gpt-4o", "https://api.openai.com/v1"), + } + if model in model_mapping: + return model_mapping[model] + available_options = ",\n\t".join(model_mapping.keys()) + raise ValueError(f"Unsupported model. Available options: \n\t{available_options}") diff --git a/src/senselab/text/tasks/llms/process_transcript_example.py b/src/senselab/text/tasks/llms/process_transcript_example.py new file mode 100644 index 00000000..a2c3a825 --- /dev/null +++ b/src/senselab/text/tasks/llms/process_transcript_example.py @@ -0,0 +1,138 @@ +"""Example usage of llms directory to process AI responses from transcript.""" + +import os +import pickle +import time +from pathlib import Path +from typing import Generator, List + +from tqdm import tqdm + +from senselab.text.tasks.llms.llm import LLM +from senselab.utils.data_structures.llm_response import LLMResponse +from senselab.utils.data_structures.transcript_input import TranscriptInput +from senselab.utils.data_structures.transcript_output import TranscriptOutput + + +def generate_ai_conversation( + transcript_path: Path, prompt_path: Path, temp: float, model_name: str, measure: bool, cache_path: Path, llm: LLM +) -> TranscriptOutput: + """Generates an AI conversation based on transcript and prompt data. + + Args: + transcript_path (Path): Path to the transcript file. + prompt_path (Path): Path to the prompt file. + temp (float): Temperature parameter for the LLM. + model_name (str): Name of the model to use. + measure (bool): Whether to measure performance (e.g., tokens, latency). + cache_path (Path): Path to store the cached responses. + llm (LLM): instantiated model being used. + + Returns: + TranscriptOutput: The resulting transcript and data as a `TranscriptOutput` object. + """ + manager = TranscriptInput(transcript_path) + + with open(prompt_path, "r") as f: + system_instruction = f.read() + + all_messages = manager.extract_response_opportunities() + + # Check if cached responses already exist + if cache_path.exists(): # type: ignore + with open(cache_path, "rb") as f: # type: ignore + responses = pickle.load(f) # type: ignore + print(f"Loaded cached responses for {transcript_path.name}") + else: + responses = [ + llm.call( + messages=messages, + system_instruction=system_instruction, + max_tokens=200, + temperature=temp, + measure=measure, + ) + for messages in tqdm(all_messages, desc=f"Processing: {transcript_path.name}") + ] + + with open(cache_path, "wb") as f: # type: ignore + pickle.dump(responses, f) # type: ignore + + def response_gen() -> Generator[LLMResponse, None, None]: + """Generates responses from the cached or newly generated data.""" + yield from responses + + gen = response_gen() + + conversation = [] + + for i, message in enumerate(manager.scriptlines): + content = message.text + if message.speaker == "assistant": + conversation.append({"speaker": "Tutor", "text": content}) + if i > 0: + response_content = next(gen) + conversation.append(response_content.to_dict()) + else: + conversation.append({"speaker": "Student", "text": content}) + + return TranscriptOutput( + temp=temp, model=model_name, prompt=prompt_path.name, transcript=transcript_path.name, data=conversation + ) + + +def generate_all_transcripts( + transcript_dir: Path, prompt_path: Path, temp: float, model_name: str, measure: bool, cache_dir: Path, llm: LLM +) -> List[TranscriptOutput]: + """Generates AI conversations for all transcripts in a directory. + + Args: + transcript_dir (Path): Directory containing transcript files. + prompt_path (Path): Path to the prompt file. + temp (float): Temperature parameter for the LLM. + model_name (str): Name of the model to use. + measure (bool): Whether to measure performance (e.g., tokens, latency). + cache_dir (Path): Directory to store cached responses. + llm (LLM): instantiated model being used. + + Returns: + List[TranscriptOutput]: A list of `TranscriptOutput` objects. + """ + outputs = [] + for transcript_path in transcript_dir.iterdir(): + cache_path = cache_dir / f"{transcript_path.stem}_cache.pkl" + outputs.append( + generate_ai_conversation(transcript_path, prompt_path, temp, model_name, measure, cache_path, llm) + ) + return outputs + + +if __name__ == "__main__": + transcript_dir = Path("/home/goshdam/to_do") + prompt_path = Path("/home/goshdam/prompts/V2_1038.txt") + temp = 0.5 + model_name = "llama3-70b" + llm = LLM(model_name) + + timeout = 700 # in seconds + poll_interval = 5 # interval to check in seconds + start_time = time.time() + + while os.getenv("VLLM_STATUS") != "Running": + elapsed_time = time.time() - start_time + if elapsed_time > timeout: + raise TimeoutError(f"Timed out after {timeout} seconds waiting for VLLM_STATUS to be 'Running'.") + time.sleep(poll_interval) + + output_path = Path("/home/goshdam/outputs/ai_outputs") + cache_dir = Path("/home/goshdam/outputs/cache") + measure = False + + cache_dir.mkdir(parents=True, exist_ok=True) + + outputs = generate_all_transcripts(transcript_dir, prompt_path, temp, model_name, measure, cache_dir, llm) + + for output in outputs: + output.save_to_json(output_path / f"{output.transcript}.json") + + print(f"Successfully saved all {len(outputs)} outputs to {output_path}") diff --git a/src/senselab/utils/data_structures/device.py b/src/senselab/utils/data_structures/device.py index d6606d15..9c9f7275 100644 --- a/src/senselab/utils/data_structures/device.py +++ b/src/senselab/utils/data_structures/device.py @@ -9,9 +9,9 @@ class DeviceType(Enum): """Device types for PyTorch operations.""" - CPU: str = "cpu" - CUDA: str = "cuda" - MPS: str = "mps" + CPU = "cpu" + CUDA = "cuda" + MPS = "mps" DTYPE_MAP = {DeviceType.CPU: torch.float32, DeviceType.CUDA: torch.float16, DeviceType.MPS: torch.float32} diff --git a/src/senselab/utils/data_structures/llm_response.py b/src/senselab/utils/data_structures/llm_response.py new file mode 100644 index 00000000..8cc93eb2 --- /dev/null +++ b/src/senselab/utils/data_structures/llm_response.py @@ -0,0 +1,23 @@ +"""This module contains the definition of the LLMResponse object.""" + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class LLMResponse: + """Represents a response from a language model.""" + + speaker: str + text: str + latency: Optional[float] + in_tokens: Optional[int] + out_tokens: Optional[int] + + def to_dict(self: "LLMResponse") -> dict: + """Return a dictionary representation of the response. + + Returns: + dict: A dictionary representation of the response. + """ + return self.__dict__ diff --git a/src/senselab/utils/data_structures/script_line.py b/src/senselab/utils/data_structures/script_line.py index 79ad1b72..6f8ed12b 100644 --- a/src/senselab/utils/data_structures/script_line.py +++ b/src/senselab/utils/data_structures/script_line.py @@ -88,6 +88,14 @@ def get_chunks(self) -> Optional[List["ScriptLine"]]: """ return self.chunks + def __repr__(self) -> str: + """Return a string representation of the ScriptLine object. + + Returns: + str: A formatted string with the object's attributes. + """ + return f"" + @classmethod def from_dict(cls, d: Dict[str, Any]) -> "ScriptLine": """Create a ScriptLine instance from a dictionary. diff --git a/src/senselab/utils/data_structures/transcript_input.py b/src/senselab/utils/data_structures/transcript_input.py new file mode 100644 index 00000000..55b15692 --- /dev/null +++ b/src/senselab/utils/data_structures/transcript_input.py @@ -0,0 +1,177 @@ +"""This module provides a data manager for handling interactions with a LLM.""" + +import json +from pathlib import Path +from typing import Dict, List + +import tiktoken + +from senselab.utils.data_structures.script_line import ScriptLine + + +class TranscriptInput: + """Manages message data for interactions with a LLM. + + Provides methods to load transcripts, convert JSON data to message objects, + and generate data from a human conversation to query potential AI responses. + + Attributes: + scriptlines (List[Scriptline]): A list of Scriptline objects. + + Methods: + __init__(transcript_path: Path) -> None: Initializes the manager with a transcript file path. + print_human_readable() -> None: Prints messages in a readable format. + extract_response_opportunities() -> List[List[Scriptline]]: Extracts sublists ending with user input. + get_num_tokens()-> int: total number of tokens in transcript + _load_transcript(json_path: Path) -> Dict: Loads a JSON transcript from a file. + convert_json_to_scriptlines(json_obj: Dict) -> List[ScriptLine]: Converts transcript format to LLM format. + """ + + def __init__(self: "TranscriptInput", transcript_path: Path) -> None: + """Initializes the manager with a transcript file path. + + Args: + transcript_path (Path): The path to the JSON transcript file. + """ + if not transcript_path.exists(): + raise ValueError("Transcript path not found!") + json_obj = self._load_transcript(transcript_path) + self.scriptlines = self.convert_json_to_scriptlines(json_obj) + + def print_human_readable(self: "TranscriptInput") -> None: + """Prints the stored scriptlines in a human-readable format.""" + for message in self.scriptlines: + print(f"{message.speaker}:\t\t{message.text}\n") + + def get_num_tokens(self: "TranscriptInput") -> int: + """Returns the total number of OpenAI tokens in the conversation. + + Returns: + int: number of tokens + """ + c = 0 + encoding = tiktoken.encoding_for_model("gpt-4o") + for message in self.scriptlines: + if message.text: + c += len(encoding.encode(message.text)) + return c + + def extract_response_opportunities(self: "TranscriptInput") -> List[List[ScriptLine]]: + """Extract consecutive sublists from the messages list, ending after every 'user' response. + + This is used to compare AI responses to a human's response + over the course of a conversation, where the AI has the previous, + natural conversation before making its own response. + + Returns: + List[ScriptLine]: A list of sublists, each starting from the + beginning of the messages list and ending with the next + sequential message where the role is "user". + """ + sublists = [] + + for i, message in enumerate(self.scriptlines): + if message.speaker == "user": + sublist = self.scriptlines[0 : i + 1] + sublists.append(sublist) + + return sublists + + @staticmethod + def _load_transcript(json_path: Path) -> Dict: + """Load a JSON transcript from the specified file path. + + This static method reads a JSON file from the provided file path and + returns the loaded JSON object. + + Args: + json_path (Path): The file path to the JSON transcript file. + + Returns: + Dict: The JSON object loaded from the file. + """ + with open(json_path, "r", encoding="utf-8") as file: + data = json.load(file) + + return data + + @staticmethod + def convert_json_to_scriptlines(json_obj: Dict) -> List[ScriptLine]: + """Converts transcript segments to list of ScriptLine objects. + + The input JSON object should have the following structure: + { + "segments": [ + { + "start": , + "end": , + "text": , + "words": [ + { + "word": , + "start": , + "end": , + "score": , + "speaker": [kid|teacher] + }, + ... + ], + "speaker": [kid|teacher] + }, + ... + ] + } + + + The conversion will map the "teacher" speaker role to "assistant" and the "kid" speaker + role to "user". + + Args: + json_obj (Dict): The input JSON object containing conversation segments. + + Returns: + List[ScriptLine]: See src/senselab/utils/data_structures/script_line.py + + Raises: + ValueError: If the input JSON structure is invalid or contains an unknown speaker role. + """ + # Ensure valid JSON structure + if not (isinstance(json_obj, dict) and isinstance(json_obj.get("segments"), list)): + raise ValueError("Invalid JSON structure: must be a dictionary with a 'segments' list") + + scriptlines = [] + current_role: str = "" + current_content: List[str] = [] + + for segment in json_obj["segments"]: + # Validate segment structure + if not all(key in segment for key in ("words",)): + raise ValueError(f"Invalid segment structure: {segment}") + + for word_obj in segment["words"]: + if not all(key in word_obj for key in ("word", "speaker")): + raise ValueError(f"Invalid word structure: {word_obj}") + + word = word_obj["word"] + speaker = word_obj["speaker"] + + if speaker == "teacher": + role = "assistant" + elif speaker == "kid": + role = "user" + else: + continue + + if role != current_role: + if current_content: + scriptlines.append(ScriptLine(text=" ".join(current_content), speaker=current_role)) + + current_role = role + current_content = [word] + else: + current_content.append(word) + + if current_content: + scriptlines.append(ScriptLine(text=" ".join(current_content), speaker=current_role)) + + return scriptlines diff --git a/src/senselab/utils/data_structures/transcript_output.py b/src/senselab/utils/data_structures/transcript_output.py new file mode 100644 index 00000000..d16650c4 --- /dev/null +++ b/src/senselab/utils/data_structures/transcript_output.py @@ -0,0 +1,45 @@ +"""This module contains the definition of the TranscriptOutput object.""" + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Union + + +@dataclass +class TranscriptOutput: + """Represents an output from an AI conversation transcript.""" + + temp: float + model: str + prompt: str + transcript: str + data: list[dict] # list[dict[speaker, text, latency, in_tokens, out_tokens]] + + def __str__(self: "TranscriptOutput") -> str: + """Return a formatted string representation of the transcript. + + Returns: + str: A formatted string representing the transcript. + """ + output = "" + for item in self.data: + output += f"{item['speaker']}: {item['text']}\n\n" + return output + + def to_json(self: "TranscriptOutput") -> str: + """Return a JSON representation of the transcript. + + Returns: + str: A JSON representation of the transcript. + """ + return json.dumps(self.__dict__) + + def save_to_json(self: "TranscriptOutput", path: Union[str, Path]) -> None: + """Save the JSON representation of the transcript to a file. + + Args: + path (str | Path): The path to save the JSON file. + """ + with open(path, "w") as f: + json.dump(self.__dict__, f) diff --git a/src/tests/text/tasks/evaluate_conversation_test.py b/src/tests/text/tasks/evaluate_conversation_test.py new file mode 100644 index 00000000..22868a36 --- /dev/null +++ b/src/tests/text/tasks/evaluate_conversation_test.py @@ -0,0 +1,37 @@ +"""Unit tests for evaluating chat functionality.""" + +from typing import List + +import pytest + +from senselab.text.tasks.evaluate_conversation.api import evaluate_chat +from senselab.utils.data_structures.script_line import ScriptLine + + +@pytest.fixture +def script_lines() -> List[ScriptLine]: + """Fixture for providing sample script lines. + + Returns: + List[ScriptLine]: A list of sample script lines. + """ + return [ + ScriptLine(text="Mazen speaks Arabic", speaker="agent"), + ScriptLine(text="Mazen speaks Arabic", speaker="user"), + ScriptLine(text="I live in USA", speaker="agent"), + ScriptLine(text="I live in KSA", speaker="user"), + ] + + +def test_evaluate_chat(script_lines: List[ScriptLine]) -> None: + """Test the evaluate_chat function. + + Args: + script_lines (List[ScriptLine]): A list of script lines to evaluate. + + Asserts: + The evaluation result is not None and contains overall score and metrics. + """ + result = evaluate_chat(script_lines) + assert result is not None + assert "metrics" in result diff --git a/src/tests/text/tasks/transcript_manager_test.py b/src/tests/text/tasks/transcript_manager_test.py new file mode 100644 index 00000000..1a86a561 --- /dev/null +++ b/src/tests/text/tasks/transcript_manager_test.py @@ -0,0 +1,103 @@ +"""Test cases for the transcript_input data structure class.""" + +import json +import os +from pathlib import Path +from typing import List + +import pytest + +from senselab.utils.data_structures.script_line import ScriptLine +from senselab.utils.data_structures.transcript_input import TranscriptInput + +if os.getenv("GITHUB_ACTIONS") != "true": + + @pytest.fixture + def sample_json_obj() -> dict: + """Fixture for a sample JSON object representing conversation segments.""" + return { + "segments": [ + { + "start": 0.0, + "end": 1.0, + "words": [ + {"word": "uh", "start": 0.0, "end": 0.5, "score": 1.0, "speaker": "kid"}, + {"word": "hello", "start": 0.6, "end": 1.0, "score": 1.0, "speaker": "teacher"}, + ], + "speaker": "kid", + }, + { + "start": 1.0, + "end": 2.0, + "words": [ + {"word": "world", "start": 1.0, "end": 1.5, "score": 1.0, "speaker": "teacher"}, + {"word": "namaste", "start": 1.6, "end": 2.0, "score": 1.0, "speaker": "teacher"}, + ], + "speaker": "teacher", + }, + { + "start": 2.0, + "end": 3.0, + "words": [ + {"word": "kemosabe", "start": 2.0, "end": 2.5, "score": 1.0, "speaker": "teacher"}, + {"word": "hi", "start": 2.6, "end": 2.8, "score": 1.0, "speaker": "kid"}, + {"word": "there", "start": 2.9, "end": 3.0, "score": 1.0, "speaker": "kid"}, + ], + "speaker": "kid", + }, + ] + } + + @pytest.fixture + def sample_transcript(tmp_path: Path, sample_json_obj: dict) -> Path: + """Fixture to create a sample transcript file.""" + transcript_file = tmp_path / "transcript.json" + with transcript_file.open("w") as f: + json.dump(sample_json_obj, f) + return transcript_file + + @pytest.fixture + def expected_messages() -> List[ScriptLine]: + """Fixture for the expected list of message objects.""" + return [ + ScriptLine(speaker="user", text="uh"), + ScriptLine(speaker="assistant", text="hello world namaste kemosabe"), + ScriptLine(speaker="user", text="hi there"), + ] + + def test_convert_json_to_messages(sample_json_obj: dict, expected_messages: List[ScriptLine]) -> None: + """Test the conversion of JSON conversation segments to message objects.""" + result = TranscriptInput.convert_json_to_scriptlines(sample_json_obj) + assert result == expected_messages + + def test_missing_word_or_speaker_field() -> None: + """Test behavior when word or speaker field is missing from the segment.""" + invalid_json = { + "segments": [ + { + "start": 0.0, + "end": 1.0, + "words": [{"word": "hello"}], # Missing speaker + "speaker": "teacher", + } + ] + } + with pytest.raises(ValueError, match="Invalid word structure"): + TranscriptInput.convert_json_to_scriptlines(invalid_json) + + def test_get_num_tokens(sample_transcript: Path) -> None: + """Test the ability of the program to return the correct number of expected tokens.""" + transcript = TranscriptInput(sample_transcript) # Initialize the transcript + result = transcript.get_num_tokens() # Get the token count + assert result == 10 + + def test_response_opportunities_extraction(sample_transcript: Path) -> None: + """Test the extraction of response opportunities.""" + transcript = TranscriptInput(sample_transcript) + print(transcript) + opportunities = transcript.extract_response_opportunities() + print(opportunities) + + assert len(opportunities) == 2, "Expected two response opportunities" + assert opportunities[0][-1].speaker == "user", "Expected last message to be first message from user" + assert opportunities[1][-1].speaker == "user", "Expected last message to be second message from 'user'" diff --git a/tutorials/llms.ipynb b/tutorials/llms.ipynb new file mode 100644 index 00000000..67bbf3ba --- /dev/null +++ b/tutorials/llms.ipynb @@ -0,0 +1,27 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# LLMs\n", + "\n", + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/sensein/senselab/blob/main/tutorials/llms.ipynb)\n", + "\n", + "This tutorial demonstrates how to use `senselab` for using LLMs." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}