diff --git a/examples/gpt_functions.py b/examples/gpt_functions.py new file mode 100644 index 0000000..7d66131 --- /dev/null +++ b/examples/gpt_functions.py @@ -0,0 +1,131 @@ +import requests +from PIL import Image +from io import BytesIO + +import json + +from creds import oai_token, oai_organization +from openai_api.src.openai_api.dalle import DALLE +from leonardo_api.leonardo_sync import Leonardo +from page_retriever import PageRetriever +from pytest_runner import run_tests + + +doc_engine = PageRetriever('https://wwakabobik.github.io/') + + +def get_weather(city, units): + base_url = "http://api.openweathermap.org/data/2.5/weather" + params = { + "q": city, + "appid": "93171b03384f92ee3c55873452a49c7c", + "units": units + } + response = requests.get(base_url, params=params) + data = response.json() + return data + + +def get_current_weather(location, unit="metric"): + """Get the current weather in a given location""" + owm_info = get_weather(location, units=unit) + weather_info = { + "location": location, + "temperature": owm_info["main"]["temp"], + "unit": unit, + "forecast": owm_info["weather"][0]["description"], + "wind": owm_info["wind"]["speed"] + } + return json.dumps(weather_info) + + +def draw_image_using_dalle(prompt): + dalle = DALLE(auth_token=oai_token, organization=oai_organization) + image = dalle.create_image_url(prompt) + url_dict = {'image_url': image[0]} + response = requests.get(image[0]) + img = Image.open(BytesIO(response.content)) + img.show() + return json.dumps(url_dict) + + +def draw_image(prompt): + leonardo = Leonardo(auth_token='a0178171-c67f-4922-afb3-458f24ecef1a') + leonardo.get_user_info() + response = leonardo.post_generations(prompt=prompt, num_images=1, guidance_scale=5, + model_id='e316348f-7773-490e-adcd-46757c738eb7', width=1024, height=768) + response = leonardo.wait_for_image_generation(generation_id=response['sdGenerationJob']['generationId']) + url_dict = {'image_url': response[0]['url']} + response = requests.get(url_dict['image_url']) + img = Image.open(BytesIO(response.content)) + img.show() + return json.dumps(url_dict) + + +gpt_functions = [ + { + "name": "draw_image", + "description": "Draws image using user prompt. Returns url of image.", + "parameters": { + "type": "object", + "properties": { + "prompt": { + "type": "string", + "description": "Prompt, the description, what should be drawn and how", + }, + }, + "required": ["prompt"], + }, + }, + { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + }, + { + "name": "get_page_code", + "description": "Get page code to generate locators and tests", + "parameters": { + "type": "object", + "properties": { + "url": { + "type": "string", + "description": "The URL of the page to get the code from" + } + }, + "required": [] + } + }, + { + "name": "get_tests_results", + "description": "Get the results of the tests", + "parameters": { + "type": "object", + "properties": { + "test_files": { + "type": "array", + "items": { + "type": "string" + }, + "description": "The list of test files to run" + } + }, + "required": [] + } + } + ] + +gpt_functions_dict = {'get_current_weather': get_current_weather, + 'draw_image': draw_image, + 'get_page_code': doc_engine.get_body_without_scripts, + 'get_tests_results': run_tests('tests/test_example.py')} \ No newline at end of file diff --git a/examples/page_retriever.py b/examples/page_retriever.py new file mode 100644 index 0000000..f3861f5 --- /dev/null +++ b/examples/page_retriever.py @@ -0,0 +1,108 @@ +import re +import time + +from bs4 import BeautifulSoup +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.by import By +from webdriver_manager.chrome import ChromeDriverManager + + +class PageRetriever: + """The PageRetriever class is for managing an instance of the PageRetriever.""" + def __init__(self, url=''): + """ + General init. + + :param url: URL of the page. + """ + options = Options() + options.add_argument("--headless") + self.driver = webdriver.Chrome(ChromeDriverManager().install(), options=options) + self.url = url + + def set_url(self, url): + """ + Set the url. + + :param url: URL of the page. + """ + self.url = url + + def get_page(self): + """ + Get the page content from the url. + + :returns: HTML content of the page. + """ + return self.get_page_content(self.url) + + def get_body(self): + """ + Get the body content of the page. + + :returns: Body content of the page. + """ + return self.extract_body_content(self.get_page()) + + def get_body_without_scripts(self): + """ + Get the body content of the page without tags. + + :returns: Body content of the page without tags. + """ + return self.remove_script_tags(self.get_body()) + + def get_page_content(self, url): + """ + Get the page content from the url. + + :param url: URL of the page. + :returns: HTML content of the page. + """ + self.driver.get(url) + + WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body'))) + + start_time = time.time() + while True: + network_activity = self.driver.execute_script( + "return window.performance.getEntriesByType('resource').filter(item => " + "item.initiatorType == 'xmlhttprequest' && item.duration == 0)" + ) + if not network_activity or time.time() - start_time > 30: # Таймаут в 30 секунд + break + + content = self.driver.page_source + self.driver.quit() + + return content + + @staticmethod + def extract_body_content(html_content): + """ + Extract the body content from the html_content. + + :param html_content: HTML content of the page. + :returns: Body content of the page. + """ + soup = BeautifulSoup(html_content, 'html.parser') + body_content = soup.body + + return str(body_content) + + @staticmethod + def remove_script_tags(input_content): + """ + Remove all tags from the input_content. + + :param input_content: HTML content of the page. + :returns: Body content of the page without tags. + """ + pattern_1 = re.compile(r'.*?', re.DOTALL) + pattern_2 = re.compile(r'.*?', re.DOTALL) + output = re.sub(pattern_1, '', input_content) + output = re.sub(pattern_2, '', output) + return output diff --git a/examples/pom_case_generator.py b/examples/pom_case_generator.py new file mode 100644 index 0000000..56d1305 --- /dev/null +++ b/examples/pom_case_generator.py @@ -0,0 +1,82 @@ +import os + +from urllib.parse import urlparse, unquote + + +class PomTestCaseGenerator: + """ Class for generating test files and page objects from json data """ + def __init__(self, url=''): + """ + General init. + + :param url: URL of the page. + """ + self.url = url + + def set_url(self, url): + """ + Set the url. + + :param url: URL of the page. + """ + self.url = url + + def ___create_pom_file(self, file_name, page_objects, url='', pom_folder='pom'): + """ + Create page object model file. + + :param file_name: Name of the file. + :param page_objects: List of page objects. + :param url: URL of the page. + :param pom_folder: Folder for page object model files. + """ + if not url: + url = self.url + if not os.path.exists(pom_folder): + os.makedirs(pom_folder) + with open(f'{pom_folder}/page_{file_name}.py', 'w', encoding='utf-8') as pom_file: + pom_file.write('from selenium.webdriver.common.by import By\n') + pom_file.write('from selenium.webdriver.support.ui import WebDriverWait\n') + pom_file.write('from selenium.webdriver.support import expected_conditions as EC\n\n\n') + pom_file.write(f'class Page{"".join(word.capitalize() for word in file_name.split("_"))}:\n') + pom_file.write(f' def __init__(self, driver):\n') + pom_file.write(f' self.url = "{url}"\n') + pom_file.write(f' self.driver = driver\n\n') + for method in page_objects: + pom_file.write(f' {method}\n\n') + + @staticmethod + def ___create_test_file(file_name, tests, pom_folder='pom', tests_folder='tests'): + """ + Create test file. + + :param file_name: Name of the file. + :param tests: List of tests. + :param pom_folder: Folder for page object model files. + :param tests_folder: Folder for test files. + """ + with open(f'{tests_folder}/test_{file_name}.py', 'w') as test_file: + test_file.write('import pytest\n\n') + test_file.write(f'from {pom_folder}.{os.path.splitext(f"page_{file_name}")[0]} import Page' + f'{"".join(word.capitalize() for word in file_name.split("_"))}\n\n\n') + test_file.write('@pytest.fixture(scope="module")\n') + test_file.write('def page(driver):\n') + test_file.write(f' page_under_test = Page{"".join(word.capitalize() for word in file_name.split("_"))}(driver)\n') + test_file.write(f' driver.get(page_under_test.url)\n') + test_file.write(f' return page_under_test\n\n\n') + for test in tests: + test_file.write(f'{test}\n\n\n') + + def create_files_from_json(self, json_data, url=''): + """ + Create test and page object model files from json data. + + :param json_data: JSON data. + :param url: URL of the page. + """ + if not url: + url = self.url + parsed_url = urlparse(unquote(url)) + file_name = parsed_url.path.strip('/').replace('/', '_') or 'index' + self.___create_test_file(file_name, json_data['tests'], pom_folder='..pom') + self.___create_pom_file(file_name, json_data['page_objects'], url) diff --git a/examples/pytest_runner.py b/examples/pytest_runner.py new file mode 100644 index 0000000..6dc8395 --- /dev/null +++ b/examples/pytest_runner.py @@ -0,0 +1,36 @@ +import json +import pytest +from pytest_jsonreport.plugin import JSONReport + + +def run_tests(test_files): + pytest.main(["-q", "--json-report", "--json-report-file=test_report.json"] + test_files) + + with open('test_report.json', encoding='utf-8') as json_file: + data = json.load(json_file) + + # Форматируем результаты + results = { + "passed": [], + "failed": [], + "error": [], + "failure details": {} + } + + for test in data['tests']: + if test['outcome'] == 'passed': + results["passed"].append(test['nodeid']) + elif test['outcome'] == 'failed': + results["failed"].append(test['nodeid']) + results["failure details"][test['nodeid']] = test['longrepr'] + page_html = next((prop[1] for prop in test['user_properties'] if prop[0] == 'page_html'), None) + results["failed_pages"][test['nodeid']] = page_html + elif test['outcome'] == 'error': + results["error"].append(test['nodeid']) + results["failure details"][test['nodeid']] = test['longrepr'] + page_html = next((prop[1] for prop in test['user_properties'] if prop[0] == 'page_html'), None) + results["failed_pages"][test['nodeid']] = page_html + + json_results = json.dumps(results) + + return json_results diff --git a/examples/test_gpt.py b/examples/test_gpt.py new file mode 100644 index 0000000..83356e8 --- /dev/null +++ b/examples/test_gpt.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +""" +Filename: chatgpt.py +Author: Iliya Vereshchagin +Copyright (c) 2023. All rights reserved. + +Created: 25.08.2023 +Last Modified: 25.08.2023 + +Description: +This file contains testing procedures for ChatGPt experiments +""" + +import string +import sys + +import asyncio + +from utils.audio_recorder import AudioRecorder +from utils.transcriptors import CustomTranscriptor +from utils.tts import CustomTTS + +from creds import oai_token, oai_organization +from openai_api.src.openai_api.chatgpt import ChatGPT + + +gpt = ChatGPT(auth_token=oai_token, organization=oai_organization, model="gpt-3.5-turbo") +gpt.max_tokens = 200 +gpt.stream = True + + +tts = CustomTTS(method="google", lang="en") + +# queues +prompt_queue = asyncio.Queue() +tts_queue = asyncio.Queue() + + +async def ask_chat(user_input): + full_response = "" + word = "" + async for response in gpt.str_chat(user_input): + for char in response: + word += char + if char in string.whitespace or char in string.punctuation: + if word: + await prompt_queue.put(word) + word = "" + sys.stdout.write(char) + sys.stdout.flush() + full_response += char + print("\n") + return full_response + + +async def tts_task(): + limit = 5 + empty_counter = 0 + while True: + if prompt_queue.empty(): + empty_counter += 1 + if empty_counter >= 3: + limit = 5 + empty_counter = 0 + words = [] + # Get all available words + limit_counter = 0 + while len(words) < limit: + try: + word = await asyncio.wait_for(prompt_queue.get(), timeout=0.5) + words.extend(word.split()) + if len(words) >= limit: + break + except asyncio.TimeoutError: + limit_counter += 1 + if limit_counter >= 10: + limit = 1 + + # If we have at least limit words or queue was empty 3 times, process them + if len(words) >= limit: + text = " ".join(words) + await tts.process(text) + limit = 1 + + +async def tts_sentence_task(): + punctuation_marks = ".?!,;:" + sentence = "" + while True: + try: + word = await asyncio.wait_for(prompt_queue.get(), timeout=0.5) + sentence += " " + word + # If the last character is a punctuation mark, process the sentence + if sentence[-1] in punctuation_marks: + await tts_queue.put(sentence) + sentence = "" + except Exception as error: + pass + + +async def tts_worker(): + while True: + try: + sentence = await tts_queue.get() + if sentence: + await tts.process(sentence) + tts_queue.task_done() + except Exception as error: + pass + + +async def get_user_input(): + while True: + try: + user_input = input() + if user_input.lower() == "[done]": + break + else: + await ask_chat(user_input) + except KeyboardInterrupt: + break + + +async def main(): + asyncio.create_task(tts_sentence_task()) + asyncio.create_task(tts_worker()) + method = "google" + + while True: + try: + if "google" not in method: + file_path = AudioRecorder().listen() + with open(file_path, "rb") as f: + transcript = await gpt.transcript(file=f, language="en") + else: + transcript = CustomTranscriptor(language="en-US").transcript() + pass + if transcript: + print(f"User: {transcript}") + #translate = CustomTranslator(source='ru', target='en').translate(transcript) + #print(translate) + response = await ask_chat(transcript) + except KeyboardInterrupt: + break + + +asyncio.run(main()) +