Skip to content

Commit

Permalink
Merge pull request #112 from binayak9932/DB/Airdrop
Browse files Browse the repository at this point in the history
DB/ Created Airdrop table in DB, Added crud methods
  • Loading branch information
djeck1432 authored Oct 26, 2024
2 parents 3777a06 + 4d3114c commit 12d0954
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 10 deletions.
92 changes: 92 additions & 0 deletions web_app/alembic/versions/e69320e12cc7_add_airdrop_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Add AirDrop model
Revision ID: e69320e12cc7
Revises: a009512f5362
Create Date: 2024-10-25 16:47:37.723379
"""

import logging

import sqlalchemy as sa
from alembic import op
from sqlalchemy.engine.reflection import Inspector

revision = "e69320e12cc7"
down_revision = "a009512f5362"
branch_labels = None
depends_on = None

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def upgrade() -> None:
"""
Perform the upgrade migration to create the 'airdrop' table in the database
if it does not exist.
This migration creates a new table, 'airdrop', with the following columns:
- `id`: Primary key, UUID type, non-nullable.
- `user_id`: Foreign key referencing `user.id`, UUID type, non-nullable.
- `created_at`: Timestamp for when the airdrop was created, DateTime type, non-nullable.
- `amount`: Decimal type, nullable, representing the amount associated with the airdrop.
- `is_claimed`: Boolean type, indicating if the airdrop has been claimed, nullable.
- `claimed_at`: Timestamp for when the airdrop was claimed, DateTime type, nullable.
Additional configuration:
- Foreign key constraint on `user_id` to reference the `user` table.
- Primary key constraint on `id`.
- Index on `user_id` and `is_claimed` for optimized querying.
This function is part of the Alembic migration and is auto-generated.
Adjustments may be made if additional configuration or constraints are needed.
"""

bind = op.get_bind()
inspector = Inspector.from_engine(bind)
if "airdrop" not in inspector.get_table_names():
op.create_table(
"airdrop",
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("user_id", sa.UUID(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("amount", sa.DECIMAL(), nullable=True),
sa.Column("is_claimed", sa.Boolean(), nullable=True),
sa.Column("claimed_at", sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_airdrop_user_id"), "airdrop", ["user_id"], unique=False
)
op.create_index(
op.f("ix_airdrop_is_claimed"), "airdrop", ["is_claimed"], unique=False
)
logger.info("Table 'airdrop' created successfully with indexes.")
else:
logger.info("Table 'airdrop' already exists, skipping creation.")


def downgrade() -> None:
"""
Perform the downgrade migration to remove the 'airdrop' table from the database if it exists.
This migration drops the 'airdrop' table and its associated indexes on `user_id`
and `is_claimed`.
It is intended to reverse the changes made in the `upgrade` function, allowing
for a rollback of the database schema to the state before the 'airdrop' table was added.
"""
bind = op.get_bind()
inspector = Inspector.from_engine(bind)
if "airdrop" in inspector.get_table_names():
op.drop_index(op.f("ix_airdrop_is_claimed"), table_name="airdrop")
op.drop_index(op.f("ix_airdrop_user_id"), table_name="airdrop")
op.drop_table("airdrop")
logger.info("Table 'airdrop' and its indexes were dropped successfully.")
else:
logger.info("Table 'airdrop' does not exist, skipping drop.")
72 changes: 64 additions & 8 deletions web_app/db/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import logging
import uuid
from typing import Type, TypeVar
from datetime import datetime
from decimal import Decimal
from typing import List, Type, TypeVar

from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
from web_app.db.database import SQLALCHEMY_DATABASE_URL
from web_app.db.models import Base, Position, Status, User
from web_app.db.models import AirDrop, Base, Position, Status, User

logger = logging.getLogger(__name__)
ModelType = TypeVar("ModelType", bound=Base)
Expand Down Expand Up @@ -112,6 +114,16 @@ def delete_object(self, model: Type[Base] = None, obj_id: uuid = None) -> None:
finally:
db.close()

def create_empty_claim(self, user_id: uuid.UUID) -> AirDrop:
"""
Creates a new empty AirDrop instance for the given user_id.
:param user_id: uuid.UUID
:return: AirDrop
"""
airdrop = AirDrop(user_id=user_id)
self.write_to_db(airdrop)
return airdrop


class UserDBConnector(DBConnector):
"""
Expand Down Expand Up @@ -317,17 +329,21 @@ def close_position(self, position_id: uuid) -> Position | None:
self.write_to_db(position)
return position.status

def open_position(self, position_id: uuid) -> Position | None:
def open_position(self, position_id: uuid.UUID) -> str | None:
"""
Retrieves a position by its contract address.
:param position_id: uuid
:return: Position | None
Opens a position by updating its status and creating an AirDrop claim.
:param position_id: uuid.UUID
:return: str | None
"""
position = self.get_object(Position, position_id)
if position:
position.status = Status.OPENED.value
self.write_to_db(position)
return position.status
self.create_empty_claim(position.user_id)
return position.status
else:
logger.error(f"Position with ID {position_id} not found")
return None

def get_unique_users_count(self) -> int:
"""
Expand Down Expand Up @@ -360,4 +376,44 @@ def get_total_amounts_for_open_positions(self) -> float | None:
return total_opened_amount
except SQLAlchemyError as e:
logger.error(f"Error calculating total amount for open positions: {e}")
return None
return None


class AirDropDBConnector(DBConnector):
"""
Provides database connection and operations management for the AirDrop model.
"""

def save_claim_data(self, airdrop_id: uuid.UUID, amount: Decimal) -> None:
"""
Updates the AirDrop instance with claim data.
:param airdrop_id: uuid.UUID
:param amount: Decimal
"""
airdrop = self.get_object(AirDrop, airdrop_id)
if airdrop:
airdrop.amount = amount
airdrop.is_claimed = True
airdrop.claimed_at = datetime.now()
self.write_to_db(airdrop)
else:
logger.error(f"AirDrop with ID {airdrop_id} not found")

def get_all_unclaimed(self) -> List[AirDrop]:
"""
Returns all unclaimed AirDrop instances (where is_claimed is False).
:return: List of unclaimed AirDrop instances
"""
with self.Session() as db:
try:

unclaimed_instances = (
db.query(AirDrop).filter_by(is_claimed=False).all()
)
return unclaimed_instances
except SQLAlchemyError as e:
logger.error(
f"Failed to retrieve unclaimed AirDrop instances: {str(e)}"
)
return []
17 changes: 17 additions & 0 deletions web_app/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,20 @@ class Position(Base):
default="pending",
)
start_price = Column(DECIMAL, nullable=False)


class AirDrop(Base):
"""
SQLAlchemy model for the airdrop table.
"""

__tablename__ = "airdrop"

id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
user_id = Column(
UUID(as_uuid=True), ForeignKey("user.id"), index=True, nullable=False
)
created_at = Column(DateTime, nullable=False, default=func.now())
amount = Column(DECIMAL, nullable=True)
is_claimed = Column(Boolean, default=False, index=True)
claimed_at = Column(DateTime, nullable=True)
171 changes: 171 additions & 0 deletions web_app/tests/test_airdrop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""
Tests for the AirDropDBConnector class, covering key database operations for airdrops.
Fixtures:
- db_connector: Provides an AirDropDBConnector instance with test user and airdrop data.
Test Cases:
- test_create_empty_claim_positive: Verifies airdrop creation for an existing user.
- test_create_empty_claim_non_existent_user: Checks error handling for invalid user IDs.
- test_save_claim_data_positive: Ensures claim data updates correctly.
- test_save_claim_data_non_existent_airdrop: Confirms logging for invalid airdrop IDs.
- test_get_all_unclaimed_positive: Retrieves unclaimed airdrops.
- test_get_all_unclaimed_after_claiming: Excludes claimed airdrops from unclaimed results.
"""

import uuid
from datetime import datetime
from decimal import Decimal

import pytest
from sqlalchemy.exc import SQLAlchemyError
from web_app.db.crud import AirDropDBConnector
from web_app.db.models import AirDrop, User


@pytest.fixture
def db_connector():
"""
Sets up an AirDropDBConnector with a test user and airdrop record, then cleans
up after the test.
This fixture:
- Initializes an AirDropDBConnector instance.
- Creates and saves a test user and associated airdrop record.
- Yields the connector, user, and airdrop instances for test use.
- Cleans up the database by removing the test user and airdrop after the test.
Yields:
tuple: (AirDropDBConnector, User, AirDrop)
"""
connector = AirDropDBConnector()
test_user = User(wallet_id="test_wallet_id")
connector.write_to_db(test_user)
airdrop = AirDrop(user_id=test_user.id)
connector.write_to_db(airdrop)
yield connector, test_user, airdrop
connector.delete_object(AirDrop, airdrop.id)
connector.delete_object(User, test_user.id)


def test_create_empty_claim_positive(db_connector):
"""
Tests that create_empty_claim successfully creates a new airdrop for an
existing user.
Steps:
- Calls create_empty_claim with a valid user ID.
- Asserts the airdrop is created with the correct user_id and
is initially unclaimed.
Args:
db_connector (fixture): Provides the AirDropDBConnector, test user,
and test airdrop.
"""
connector, test_user, _ = db_connector
new_airdrop = connector.create_empty_claim(test_user.id)
assert new_airdrop is not None
assert new_airdrop.user_id == test_user.id
assert not new_airdrop.is_claimed
connector.delete_object(AirDrop, new_airdrop.id)


def test_create_empty_claim_non_existent_user(db_connector):
"""
Tests that create_empty_claim raises an error when called with
a non-existent user ID.
Steps:
- Generates a fake user ID that does not exist in the database.
- Verifies that calling create_empty_claim with this ID raises
an SQLAlchemyError.
Args:
db_connector (fixture): Provides the AirDropDBConnector
and test setup.
"""
connector, _, _ = db_connector
fake_user_id = uuid.uuid4()
with pytest.raises(SQLAlchemyError):
connector.create_empty_claim(fake_user_id)


def test_save_claim_data_positive(db_connector):
"""
Tests that save_claim_data correctly updates an existing airdrop
with claim details.
Steps:
- Calls save_claim_data with a valid airdrop ID and amount.
- Asserts the airdrop's amount, is_claimed status, and claimed_at
timestamp are updated correctly.
Args:
db_connector (fixture): Provides the AirDropDBConnector, test user,
and test airdrop.
"""
connector, _, airdrop = db_connector
amount = Decimal("100.50")
connector.save_claim_data(airdrop.id, amount)
updated_airdrop = connector.get_object(AirDrop, airdrop.id)
assert updated_airdrop.amount == amount
assert updated_airdrop.is_claimed
assert updated_airdrop.claimed_at is not None


def test_save_claim_data_non_existent_airdrop(db_connector, caplog):
"""
Tests that save_claim_data logs an error when called with a non-existent
airdrop ID.
Steps:
- Generates a fake airdrop ID that is not in the database.
- Calls save_claim_data with this ID and checks that the appropriate
error message is logged.
Args:
db_connector (fixture): Provides the AirDropDBConnector and
test setup.
caplog (fixture): Captures log output for verification.
"""
connector, _, _ = db_connector
fake_airdrop_id = uuid.uuid4()
connector.save_claim_data(fake_airdrop_id, Decimal("50.00"))
assert f"AirDrop with ID {fake_airdrop_id} not found" in caplog.text


def test_get_all_unclaimed_positive(db_connector):
"""
Tests that get_all_unclaimed retrieves unclaimed airdrops correctly.
Steps:
- Calls get_all_unclaimed to fetch unclaimed airdrops.
- Asserts that the test airdrop (unclaimed) is present in the retrieved
list by matching IDs.
Args:
db_connector (fixture): Provides the AirDropDBConnector, test user,
and test airdrop.
"""
connector, _, airdrop = db_connector
unclaimed_airdrops = connector.get_all_unclaimed()
assert any(airdrop.id == unclaimed.id for unclaimed in unclaimed_airdrops)


def test_get_all_unclaimed_after_claiming(db_connector):
"""
Tests that get_all_unclaimed excludes airdrops that have been claimed.
Steps:
- Marks the test airdrop as claimed using save_claim_data.
- Calls get_all_unclaimed to fetch unclaimed airdrops.
- Asserts that the claimed airdrop is not included in the
returned list.
Args:
db_connector (fixture): Provides the AirDropDBConnector,
test user, and test airdrop.
"""
connector, _, airdrop = db_connector
connector.save_claim_data(airdrop.id, Decimal("50.00"))
unclaimed_airdrops = connector.get_all_unclaimed()
assert airdrop not in unclaimed_airdrops
Loading

0 comments on commit 12d0954

Please sign in to comment.