Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update integration tests add test by new flieds or refactory #554

Merged
merged 6 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Create .env file
run: |
Expand Down Expand Up @@ -98,7 +98,7 @@ jobs:

- name: Upload Docker Logs on Failure
if: failure()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: docker-logs
path: docker-logs.txt
9 changes: 8 additions & 1 deletion web_app/db/crud/position.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _get_user_by_wallet_id(self, wallet_id: str) -> User | None:

def get_positions_by_wallet_id(
self, wallet_id: str, start: int = 0, limit: int = 10
) -> list:
) -> list[dict]:
"""
Retrieves paginated positions for a user by their wallet ID
and returns them as a list of dictionaries.
Expand Down Expand Up @@ -525,3 +525,10 @@ def get_extra_deposits_by_position_id(self, position_id: UUID) -> list[ExtraDepo
return extra_deposits


def delete_all_extra_deposits(self, position_id: UUID) -> None:
"""
Delete all extra deposits for a position.
"""
with self.Session() as db:
db.query(ExtraDeposit).filter(ExtraDeposit.position_id == position_id).delete()
db.commit()
80 changes: 33 additions & 47 deletions web_app/test_integration/test_close_position.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import asyncio
import logging
from datetime import datetime
from typing import Any, Dict

Expand All @@ -11,6 +12,11 @@
from web_app.contract_tools.mixins.dashboard import DashboardMixin
from web_app.db.crud import AirDropDBConnector, PositionDBConnector, UserDBConnector
from web_app.db.models import Status
from web_app.test_integration.utils import with_temp_user

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

user_db = UserDBConnector()
airdrop = AirDropDBConnector()
Expand Down Expand Up @@ -62,50 +68,30 @@ def test_close_position(self, form_data: Dict[str, Any]) -> None:
amount = form_data["amount"]
multiplier = form_data["multiplier"]

# Check if the user already exists before trying to create
existing_user = position_db.get_user_by_wallet_id(wallet_id)
if not existing_user:
position_db.create_user(wallet_id)

position = position_db.create_position(
wallet_id=wallet_id,
token_symbol=token_symbol,
amount=amount,
multiplier=multiplier,
)

assert (
position.status == Status.PENDING
), "Position status should be 'pending' upon creation"

current_prices = asyncio.run(DashboardMixin.get_current_prices())
assert (
token_symbol in current_prices
), f"Token {token_symbol} missing in current prices"
position.start_price = current_prices[token_symbol]
position.created_at = datetime.utcnow()

position_status = position_db.open_position(position.id, current_prices)
print(f"Position created: {position_status}")
assert (
position_status == Status.OPENED
), "Position status should be 'opened' after updating"

print(
f"Position {position.id} successfully opened with status '{position.status}'."
)

close_result = position_db.close_position(position.id)
assert close_result, "Close operation should succeed."

position = position_db.get_position_by_id(position.id)
assert (
position.status == Status.CLOSED
), "Position status should be 'closed' after close operation"

# Clean up - delete the position and user
user = position_db.get_user_by_wallet_id(wallet_id)
airdrop.delete_all_users_airdrop(user.id)
position_db.delete_position(position)
if not position_db.get_positions_by_wallet_id(wallet_id, 0, 1):
position_db.delete_user_by_wallet_id(wallet_id)
with with_temp_user(wallet_id):
# Create position
position = position_db.create_position(
wallet_id=wallet_id,
token_symbol=token_symbol,
amount=amount,
multiplier=multiplier,
)
# Open position
current_prices = asyncio.run(DashboardMixin.get_current_prices())
position_status = position_db.open_position(position.id, current_prices)
assert (
position_status == Status.OPENED
), "Position status should be 'opened' after updating"
logger.info(
f"Position {position.id} successfully opened with status '{position.status}'."
)

# Close position
close_result = position_db.close_position(position.id)
assert close_result, "Close operation should succeed."

position = position_db.get_position_by_id(position.id)
assert (
position.status == Status.CLOSED
), "Position status should be 'closed' after close operation"
assert position.closed_at is not None, "Position should have closed_at timestamp"
75 changes: 42 additions & 33 deletions web_app/test_integration/test_create_position.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
"""

import asyncio
import logging
import pytest
from typing import Dict, Any
from datetime import datetime
from web_app.db.crud import PositionDBConnector, AirDropDBConnector
from web_app.contract_tools.mixins.dashboard import DashboardMixin
from web_app.db.models import Status
from web_app.test_integration.utils import with_temp_user

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

position_db = PositionDBConnector()
airdrop = AirDropDBConnector()
Expand Down Expand Up @@ -74,38 +80,41 @@ def test_create_position(self, form_data: Dict[str, Any]) -> None:
multiplier = form_data["multiplier"]
borrowing_token = form_data["borrowing_token"]

existing_user = position_db.get_user_by_wallet_id(wallet_id)
if not existing_user:
position_db.create_user(wallet_id)

position = position_db.create_position(
wallet_id=wallet_id,
token_symbol=token_symbol,
amount=amount,
multiplier=multiplier,
)
assert (
position.status == Status.PENDING
), "Position status should be 'pending' upon creation"

current_prices = asyncio.run(DashboardMixin.get_current_prices())
assert (
position.token_symbol in current_prices
), "Token price missing in current prices"
position.start_price = current_prices[position.token_symbol]
position.created_at = datetime.utcnow()

print(
f"Position {position.id} created successfully with status '{position.status}'."
)
with with_temp_user(wallet_id):
position = position_db.create_position(
wallet_id=wallet_id,
token_symbol=token_symbol,
amount=amount,
multiplier=multiplier,
)
assert (
position.status == Status.PENDING
), "Position status should be 'pending' upon creation"
assert (
position.is_protection is False
), "Position should not have protection by default"

position_status = position_db.open_position(position.id, current_prices)
assert (
position_status == Status.OPENED
), "Position status should be 'opened' after updating"
print(f"Position {position.id} successfully opened.")
logger.info(
f"Position {position.id} created successfully with status '{position.status}'."
)

user = position_db.get_user_by_wallet_id(wallet_id)
airdrop.delete_all_users_airdrop(user.id)
position_db.delete_all_user_positions(user.id)
position_db.delete_user_by_wallet_id(wallet_id)
# Open position
current_prices = asyncio.run(DashboardMixin.get_current_prices())
assert (
position.token_symbol in current_prices
), "Token price missing in current prices"
position_status = position_db.open_position(position.id, current_prices)
assert (
position_status == Status.OPENED
), "Position status should be 'opened' after updating"
logger.info(f"Position {position.id} successfully opened.")
# Verify position attributes after opening
position = position_db.get_position_by_id(position.id)
assert position is not None, "Position not found in database before opening"
assert position.status == Status.OPENED, "Position status should be 'opened'"
assert (
position.start_price == current_prices[token_symbol]
), "Start price should be the token price"
assert (
position.created_at is not None
), "Position should have a created_at timestamp"
Loading