Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sprint 2 #11

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
venv
venv
.idea
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
# pymongo-api

## Схема решения
[task1.drawio](./task1.drawio)

## Как запустить

Запускаем mongodb и приложение

```shell
cd sharding-repl-cache
docker compose up -d
```

Заполняем mongodb данными

```shell
cd sharding-repl-cache
./scripts/mongo-init.sh
./scripts/redis-init.sh
```

## Как проверить
Expand All @@ -32,4 +38,6 @@ curl --silent http://ifconfig.me

## Доступные эндпоинты

Список доступных эндпоинтов, swagger http://<ip виртуальной машины>:8080/docs
Список доступных эндпоинтов, swagger http://<ip виртуальной машины>:8080/docs

**ВНИМАНИЕ!** Сделайте запрос на основной эндпоинт / ДВАЖДЫ (в силу особенностей подключения к кластеру для получения данных о топологии из подключения необходимо сделать повторный запрос при первоначальном запуске)
2 changes: 1 addition & 1 deletion api_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def decorator(func):
@app.on_event("startup")
async def startup():
if REDIS_URL:
redis = aioredis.from_url(REDIS_URL, encoding="utf8", decode_responses=True)
redis = aioredis.RedisCluster.from_url(REDIS_URL, encoding="utf8", decode_responses=True)
FastAPICache.init(RedisBackend(redis), prefix="api:cache")


Expand Down
37 changes: 37 additions & 0 deletions mongo-sharding-repl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# pymongo-api

## Как запустить

Запускаем mongodb и приложение

```shell
cd mongo-sharding
docker compose up -d
```

Заполняем mongodb данными

```shell
cd mongo-sharding
./scripts/mongo-init.sh
```

## Как проверить

### Если вы запускаете проект на локальной машине

Откройте в браузере http://localhost:8080

### Если вы запускаете проект на предоставленной виртуальной машине

Узнать белый ip виртуальной машины

```shell
curl --silent http://ifconfig.me
```

Откройте в браузере http://<ip виртуальной машины>:8080

## Доступные эндпоинты

Список доступных эндпоинтов, swagger http://<ip виртуальной машины>:8080/docs
10 changes: 10 additions & 0 deletions mongo-sharding-repl/api_app/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.12.1-slim
WORKDIR /app
EXPOSE 8080
COPY requirements.txt ./
# Устанавливаем зависимости python не пересобирая их
RUN pip install --no-cache --no-cache-dir -r requirements.txt
# Копирование кода приложения
COPY app.py /app/
ENTRYPOINT ["uvicorn"]
CMD ["app:app", "--host", "0.0.0.0", "--port", "8080"]
216 changes: 216 additions & 0 deletions mongo-sharding-repl/api_app/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import json
import logging
import os
import time
from typing import List, Optional

import motor.motor_asyncio
from bson import ObjectId
from fastapi import Body, FastAPI, HTTPException, status
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from logmiddleware import RouterLoggingMiddleware, logging_config
from pydantic import BaseModel, ConfigDict, EmailStr, Field
from pydantic.functional_validators import BeforeValidator
from pymongo import errors
from redis import asyncio as aioredis
from typing_extensions import Annotated

# Configure JSON logging
logging.config.dictConfig(logging_config)
logger = logging.getLogger(__name__)

app = FastAPI()
app.add_middleware(
RouterLoggingMiddleware,
logger=logger,
)

DATABASE_URL = os.environ["MONGODB_URL"]
DATABASE_NAME = os.environ["MONGODB_DATABASE_NAME"]
REDIS_URL = os.getenv("REDIS_URL", None)


def nocache(*args, **kwargs):
def decorator(func):
return func

return decorator


if REDIS_URL:
cache = cache
else:
cache = nocache


client = motor.motor_asyncio.AsyncIOMotorClient(DATABASE_URL)
db = client[DATABASE_NAME]

# Represents an ObjectId field in the database.
# It will be represented as a `str` on the model so that it can be serialized to JSON.
PyObjectId = Annotated[str, BeforeValidator(str)]


@app.on_event("startup")
async def startup():
if REDIS_URL:
redis = aioredis.from_url(REDIS_URL, encoding="utf8", decode_responses=True)
FastAPICache.init(RedisBackend(redis), prefix="api:cache")


class UserModel(BaseModel):
"""
Container for a single user record.
"""

id: Optional[PyObjectId] = Field(alias="_id", default=None)
age: int = Field(...)
name: str = Field(...)


class UserCollection(BaseModel):
"""
A container holding a list of `UserModel` instances.
"""

users: List[UserModel]


@app.get("/")
async def root():
topology_description = client.topology_description
read_preference = client.client_options.read_preference
topology_type = topology_description.topology_type_name
replicaset_name = topology_description.replica_set_name

collection_names = await db.list_collection_names()
collections = {}
pipeline = [
{ "$collStats": { "storageStats": {} } },
{ "$group": { "_id": "$shard", "documents_count": { "$sum": "$storageStats.count" } } }
]
for collection_name in collection_names:
collection = db.get_collection(collection_name)
counts_by_shards = []
cursor = collection.aggregate(pipeline)
async for document in cursor:
counts_by_shards.append(document)
collections[collection_name] = {
"documents_count": await collection.count_documents({}),
"documents_count_by_shards": counts_by_shards
}
try:
replica_status = await client.admin.command("replSetGetStatus")
replica_status = json.dumps(replica_status, indent=2, default=str)
except errors.OperationFailure:
replica_status = "No Replicas"

shards = {}
if topology_type == "Sharded":
shards_list = await client.admin.command("listShards")
shards = {}
for shard in shards_list.get("shards", {}):
shard_name = shard["_id"]
shard_host = shard["host"]
shards[shard_name] = {"host": shard_host}

shard_hosts = shard_host.split("/")[1]
shard_client = motor.motor_asyncio.AsyncIOMotorClient(f"mongodb://{shard_hosts}")
try:
# Get replica set status
repl_set_status = await shard_client.admin.command("replSetGetStatus")
# Count the replicas (members of the replica set)
replica_count = len(repl_set_status.get("members", []))
shards[shard_name]["replica_count"] = replica_count
except Exception as e:
shards[shard_name]["replica_count"] = f"Error: {str(e)}"
finally:
shard_client.close()

cache_enabled = False
if REDIS_URL:
cache_enabled = FastAPICache.get_enable()

return {
"mongo_topology_type": topology_type,
"mongo_replicaset_name": replicaset_name,
"mongo_db": DATABASE_NAME,
"read_preference": str(read_preference),
"mongo_nodes": client.nodes,
"mongo_primary_host": client.primary,
"mongo_secondary_hosts": client.secondaries,
# "mongo_address": client.address, # неприменимо при использовании нескольких роутеров, вызывает ошибку
"mongo_is_primary": client.is_primary,
"mongo_is_mongos": client.is_mongos,
"collections": collections,
"shards": shards,
"cache_enabled": cache_enabled,
"status": "OK",
}


@app.get("/{collection_name}/count")
async def collection_count(collection_name: str):
collection = db.get_collection(collection_name)
items_count = await collection.count_documents({})
# status = await client.admin.command('replSetGetStatus')
# import ipdb; ipdb.set_trace()
return {"status": "OK", "mongo_db": DATABASE_NAME, "items_count": items_count}


@app.get(
"/{collection_name}/users",
response_description="List all users",
response_model=UserCollection,
response_model_by_alias=False,
)
@cache(expire=60 * 1)
async def list_users(collection_name: str):
"""
List all of the user data in the database.
The response is unpaginated and limited to 1000 results.
"""
time.sleep(1)
collection = db.get_collection(collection_name)
return UserCollection(users=await collection.find().to_list(1000))


@app.get(
"/{collection_name}/users/{name}",
response_description="Get a single user",
response_model=UserModel,
response_model_by_alias=False,
)
async def show_user(collection_name: str, name: str):
"""
Get the record for a specific user, looked up by `name`.
"""

collection = db.get_collection(collection_name)
if (user := await collection.find_one({"name": name})) is not None:
return user

raise HTTPException(status_code=404, detail=f"User {name} not found")


@app.post(
"/{collection_name}/users",
response_description="Add new user",
response_model=UserModel,
status_code=status.HTTP_201_CREATED,
response_model_by_alias=False,
)
async def create_user(collection_name: str, user: UserModel = Body(...)):
"""
Insert a new user record.

A unique `id` will be created and provided in the response.
"""
collection = db.get_collection(collection_name)
new_user = await collection.insert_one(
user.model_dump(by_alias=True, exclude=["id"])
)
created_user = await collection.find_one({"_id": new_user.inserted_id})
return created_user
6 changes: 6 additions & 0 deletions mongo-sharding-repl/api_app/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fastapi==0.110.2
uvicorn[standard]==0.29.0
motor==3.5.0
redis==4.4.2
fastapi-cache2==0.2.0
logmiddleware==0.0.4
Loading