Skip to content
This repository has been archived by the owner on Aug 10, 2023. It is now read-only.

Commit

Permalink
Refactor (#1473)
Browse files Browse the repository at this point in the history
* Use named expression

* Refactor

- Optimize `plugin.test.py`
- Add types and sort imports for `plugin.test.py`
- Remove redundant slice index in `plugin.test.py`
- Raise from previous errors in `V1.py`

* Refactor
  • Loading branch information
danparizher authored Aug 2, 2023
1 parent 54cea93 commit 2c8d3b7
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 74 deletions.
133 changes: 73 additions & 60 deletions src/revChatGPT/V1.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def random_int(min: int, max: int) -> int:
log = logging.getLogger(__name__)


def logger(is_timed: bool):
def logger(is_timed: bool) -> function:
"""Logger decorator
Args:
Expand All @@ -88,7 +88,7 @@ def logger(is_timed: bool):
_type_: decorated function
"""

def decorator(func):
def decorator(func: function) -> function:
wraps(func)

def wrapper(*args, **kwargs):
Expand Down Expand Up @@ -146,9 +146,7 @@ def captcha_solver(images: list[str], challenge_details: dict) -> int:
"Developer instructions: The captcha images have an index starting from 0 from left to right",
)
print("Enter the index of the images that matches the captcha instructions:")
index = int(input())

return index
return int(input())


CAPTCHA_URL = getenv("CAPTCHA_URL", "https://bypass.churchless.tech/captcha/")
Expand Down Expand Up @@ -198,41 +196,39 @@ def get_arkose_token(
if resp.status_code != 200:
raise Exception("Failed to verify captcha")
return resp_json.get("token")
else:
working_endpoints: list[str] = []
# Check uptime for different endpoints via gatus
resp2: list[dict] = requests.get(
"https://stats.churchless.tech/api/v1/endpoints/statuses?page=1"
).json()
for endpoint in resp2:
# print(endpoint.get("name"))
if endpoint.get("group") != "Arkose Labs":
continue
# Check the last 5 results
results: list[dict] = endpoint.get("results", [])[-5:-1]
# print(results)
if not results:
print(f"Endpoint {endpoint.get('name')} has no results")
continue
# Check if all the results are up
if all(result.get("success") == True for result in results):
working_endpoints.append(endpoint.get("name"))
if not working_endpoints:
print("No working endpoints found. Please solve the captcha manually.")
return get_arkose_token(download_images=True, captcha_supported=True)
# Choose a random endpoint
endpoint = random.choice(working_endpoints)
resp: requests.Response = requests.get(endpoint)
if resp.status_code != 200:
if resp.status_code != 511:
raise Exception("Failed to get captcha token")
else:
print("Captcha required. Please solve the captcha manually.")
return get_arkose_token(download_images=True, captcha_supported=True)
try:
return resp.json().get("token")
except Exception:
return resp.text
working_endpoints: list[str] = []
# Check uptime for different endpoints via gatus
resp2: list[dict] = requests.get(
"https://stats.churchless.tech/api/v1/endpoints/statuses?page=1",
).json()
for endpoint in resp2:
# print(endpoint.get("name"))
if endpoint.get("group") != "Arkose Labs":
continue
# Check the last 5 results
results: list[dict] = endpoint.get("results", [])[-5:-1]
# print(results)
if not results:
print(f"Endpoint {endpoint.get('name')} has no results")
continue
# Check if all the results are up
if all(result.get("success") is True for result in results):
working_endpoints.append(endpoint.get("name"))
if not working_endpoints:
print("No working endpoints found. Please solve the captcha manually.")
return get_arkose_token(download_images=True, captcha_supported=True)
# Choose a random endpoint
endpoint = random.choice(working_endpoints)
resp: requests.Response = requests.get(endpoint)
if resp.status_code != 200:
if resp.status_code != 511:
raise Exception("Failed to get captcha token")
print("Captcha required. Please solve the captcha manually.")
return get_arkose_token(download_images=True, captcha_supported=True)
try:
return resp.json().get("token")
except Exception:
return resp.text


class Chatbot:
Expand Down Expand Up @@ -315,8 +311,8 @@ def __init__(
else:
self.session.proxies.update(proxies)

self.conversation_id = conversation_id or config.get("conversation_id", None)
self.parent_id = parent_id or config.get("parent_id", None)
self.conversation_id = conversation_id or config.get("conversation_id")
self.parent_id = parent_id or config.get("parent_id")
self.conversation_mapping = {}
self.conversation_id_prev_queue = []
self.parent_id_prev_queue = []
Expand Down Expand Up @@ -496,7 +492,7 @@ def __write_cache(self, info: dict) -> None:
json.dump(info, open(self.cache_path, "w", encoding="utf-8"), indent=4)

@logger(is_timed=False)
def __read_cache(self):
def __read_cache(self) -> dict[str, dict[str, str]]:
try:
cached = json.load(open(self.cache_path, encoding="utf-8"))
except (FileNotFoundError, json.decoder.JSONDecodeError):
Expand Down Expand Up @@ -543,7 +539,7 @@ def __send_request(
# print(f"Arkose token obtained: {data['arkose_token']}")
except Exception as e:
print(e)
raise e
raise

cid, pid = data["conversation_id"], data["parent_message_id"]
message = ""
Expand Down Expand Up @@ -598,10 +594,12 @@ def __send_request(
author = {}
if line.get("message"):
author = metadata.get("author", {}) or line["message"].get("author", {})
if line["message"].get("content"):
if line["message"]["content"].get("parts"):
if len(line["message"]["content"]["parts"]) > 0:
message_exists = True
if (
line["message"].get("content")
and line["message"]["content"].get("parts")
and len(line["message"]["content"]["parts"]) > 0
):
message_exists = True
message: str = (
line["message"]["content"]["parts"][0] if message_exists else ""
)
Expand Down Expand Up @@ -643,7 +641,7 @@ def post_messages(
messages: list[dict],
conversation_id: str | None = None,
parent_id: str | None = None,
plugin_ids: list = [],
plugin_ids: list = None,
model: str | None = None,
auto_continue: bool = False,
timeout: float = 360,
Expand All @@ -670,6 +668,8 @@ def post_messages(
"citations": list[dict],
}
"""
if plugin_ids is None:
plugin_ids = []
if parent_id and not conversation_id:
raise t.Error(
source="User",
Expand Down Expand Up @@ -734,7 +734,7 @@ def ask(
conversation_id: str | None = None,
parent_id: str = "",
model: str = "",
plugin_ids: list = [],
plugin_ids: list = None,
auto_continue: bool = False,
timeout: float = 360,
**kwargs,
Expand All @@ -759,6 +759,8 @@ def ask(
"recipient": str,
}
"""
if plugin_ids is None:
plugin_ids = []
messages = [
{
"id": str(uuid.uuid4()),
Expand Down Expand Up @@ -1048,7 +1050,12 @@ def rollback_conversation(self, num: int = 1) -> None:
self.parent_id = self.parent_id_prev_queue.pop()

@logger(is_timed=True)
def get_plugins(self, offset: int = 0, limit: int = 250, status: str = "approved"):
def get_plugins(
self,
offset: int = 0,
limit: int = 250,
status: str = "approved",
) -> dict[str, str]:
"""
Get plugins
:param offset: Integer. Offset (Only supports 0)
Expand All @@ -1062,7 +1069,7 @@ def get_plugins(self, offset: int = 0, limit: int = 250, status: str = "approved
return json.loads(response.text)

@logger(is_timed=True)
def install_plugin(self, plugin_id: str):
def install_plugin(self, plugin_id: str) -> None:
"""
Install plugin by ID
:param plugin_id: String. ID of plugin
Expand Down Expand Up @@ -1170,10 +1177,12 @@ async def __send_request(
"author",
{},
)
if line["message"].get("content"):
if line["message"]["content"].get("parts"):
if len(line["message"]["content"]["parts"]) > 0:
message_exists = True
if (
line["message"].get("content")
and line["message"]["content"].get("parts")
and len(line["message"]["content"]["parts"]) > 0
):
message_exists = True
message: str = (
line["message"]["content"]["parts"][0] if message_exists else ""
)
Expand Down Expand Up @@ -1214,7 +1223,7 @@ async def post_messages(
messages: list[dict],
conversation_id: str | None = None,
parent_id: str | None = None,
plugin_ids: list = [],
plugin_ids: list = None,
model: str | None = None,
auto_continue: bool = False,
timeout: float = 360,
Expand Down Expand Up @@ -1243,6 +1252,8 @@ async def post_messages(
"citations": list[dict],
}
"""
if plugin_ids is None:
plugin_ids = []
if parent_id and not conversation_id:
raise t.Error(
source="User",
Expand Down Expand Up @@ -1306,7 +1317,7 @@ async def ask(
conversation_id: str | None = None,
parent_id: str = "",
model: str = "",
plugin_ids: list = [],
plugin_ids: list = None,
auto_continue: bool = False,
timeout: int = 360,
**kwargs,
Expand Down Expand Up @@ -1334,6 +1345,8 @@ async def ask(
}
"""

if plugin_ids is None:
plugin_ids = []
messages = [
{
"id": str(uuid.uuid4()),
Expand Down Expand Up @@ -1663,7 +1676,7 @@ def handle_commands(command: str) -> bool:
elif command == "!exit":
if isinstance(chatbot.session, httpx.AsyncClient):
chatbot.session.aclose()
exit()
sys.exit()
else:
return False
return True
Expand Down Expand Up @@ -1717,7 +1730,7 @@ def handle_commands(command: str) -> bool:
print()

except (KeyboardInterrupt, EOFError):
exit()
sys.exit()
except Exception as exc:
error = t.CLIError("command line program unknown error")
raise error from exc
Expand Down
21 changes: 10 additions & 11 deletions src/revChatGPT/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@ def verify() -> None:
if int(__import__("platform").python_version_tuple()[0]) < 3:
error = t.NotAllowRunning("Not available Python version")
raise error
elif int(__import__("platform").python_version_tuple()[1]) < 9:
if int(__import__("platform").python_version_tuple()[1]) < 9:
error = t.NotAllowRunning(
f"Not available Python version: {__import__('platform').python_version()}",
)
raise error
else:
if (
int(__import__("platform").python_version_tuple()[1]) < 10
and int(__import__("platform").python_version_tuple()[0]) == 3
):
__import__("warnings").warn(
UserWarning(
"The current Python is not a recommended version, 3.10+ is recommended",
),
)
if (
int(__import__("platform").python_version_tuple()[1]) < 10
and int(__import__("platform").python_version_tuple()[0]) == 3
):
__import__("warnings").warn(
UserWarning(
"The current Python is not a recommended version, 3.10+ is recommended",
),
)


verify()
2 changes: 1 addition & 1 deletion src/revChatGPT/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
__all__ = ()


def main():
def main() -> None:
"""
main function for CLI
"""
Expand Down
3 changes: 2 additions & 1 deletion src/revChatGPT/typings.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""
A module that contains all the types used in this project
"""

import os
import platform
from enum import Enum
from typing import Union


python_version = [each for each in platform.python_version_tuple()]
python_version = list(platform.python_version_tuple())
SUPPORT_ADD_NOTES = int(python_version[0]) >= 3 and int(python_version[1]) >= 11


Expand Down
2 changes: 1 addition & 1 deletion tests/test_recipient.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
config = json.load(open("/home/acheong/.config/revChatGPT/config.json"))


async def main():
async def main() -> None:
chatbot = AsyncChatbot(config)
async for message in chatbot.ask("Hello, how are you?"):
print(message.get("message"))
Expand Down

0 comments on commit 2c8d3b7

Please sign in to comment.