diff --git a/annotator/.dockerignore b/annotator/.dockerignore new file mode 100644 index 0000000..7f055ad --- /dev/null +++ b/annotator/.dockerignore @@ -0,0 +1,10 @@ +# Files ignored when building docker images: +# https://docs.docker.com/engine/reference/builder/#dockerignore-file +.venv +venv +env +.dockerignore +Dockerfile +# frontend/ +.trash + diff --git a/annotator/.gitignore b/annotator/.gitignore new file mode 100644 index 0000000..3590c8e --- /dev/null +++ b/annotator/.gitignore @@ -0,0 +1,7 @@ +**/.trash +/mongodb-data +/test +/resources/company +/resources/impaakt +/resources/topics +/.vscode diff --git a/annotator/README.md b/annotator/README.md new file mode 100644 index 0000000..d3b9e30 --- /dev/null +++ b/annotator/README.md @@ -0,0 +1,17 @@ +# Impaakt API + +## Deployment steps: +- clone repository +- chmod 777 ./backend/app/run.sh + +- docker compose -f "docker-compose.dev.yml" up --build -d + +## Usage +[http://localhost:8001/docs](http://localhost:8001/docs) + +Create an Impaakt job including a list of candicate sources: +- Each source must include an url +- For each source a text can optionally be included in the request. For sources for which no text is provided, the system will attempt to crawl the url and extract text from either html or PDF documents. +- Impaakt ranking is default but optional. Only sources with text will be processed. +- Named entity recognition (NER) is default but optional. For each source a NER-list can be included in the request. For sources for which no NER-list is provided the system will attempt to extract entities. Only sources with text will be processed. +- Company classification is default but optional. Only sources with a NER-list will be processed. diff --git a/annotator/common-services.yml b/annotator/common-services.yml new file mode 100644 index 0000000..707e94a --- /dev/null +++ b/annotator/common-services.yml @@ -0,0 +1,41 @@ +version: "3.9" +services: + mongodb: + container_name: mongodb + image: mongo:latest + command: --wiredTigerCacheSizeGB 2 + ports: + - "27017:27017" + volumes: + - ./mongodb-data:/data/db + env_file: + - ./images/env + restart: always + + redis: + container_name: redis + image: redis:alpine + restart: always + + flower: + container_name: flower + image: mher/flower:latest + command: celery flower + env_file: + - ./images/env + ports: + - 5555:5555 + restart: always + + worker: + build: + context: . + dockerfile: ./images/pytorch/Dockerfile + volumes: + - ./service/app:/app + - ./resources:/resources + depends_on: + - mongodb + - redis + env_file: + - ./images/env \ No newline at end of file diff --git a/annotator/compose.dev.yml b/annotator/compose.dev.yml new file mode 100644 index 0000000..4e4af6a --- /dev/null +++ b/annotator/compose.dev.yml @@ -0,0 +1,74 @@ +version: '3.9' + +services: + mongodb: + extends: + file: common-services.yml + service: mongodb + deploy: + resources: + reservations: + cpus: '0.1' + memory: '3g' + redis: + extends: + file: common-services.yml + service: redis + flower: + extends: + file: common-services.yml + service: flower + api: + extends: + file: common-services.yml + service: worker + ports: + - "8001:8001" + - "5678:5678" # debug + env_file: + - ./images/env + # command: /app/run.sh + command: python -m debugpy --listen 0.0.0.0:5678 -m uvicorn app.main:app --host 0.0.0.0 --port 8001 --reload + tty: true + + ingress: #ingress worker + extends: + file: common-services.yml + service: worker + ports: + - 5671:5671 #debug + env_file: + - ./images/env + # environment: + # - LOAD_TEXT=TRUE + command: watchmedo auto-restart -d "/app" --recursive -p '*.py' -- python -m debugpy --listen 0.0.0.0:5671 -m celery -A app.main.celery worker --loglevel=info -Q default,ingress --hostname=ingress@%h --concurrency=10 + + + + + infer: # inference worker with GPU support + extends: + file: common-services.yml + service: worker + ports: + - 5672:5672 #debug + env_file: + - ./images/env + command: watchmedo auto-restart -d "/app" --recursive -p '*.py' -- python -m debugpy --listen 0.0.0.0:5672 -m celery -A app.main.celery worker --loglevel=info -Q infer --hostname=infer@%h --pool=solo + + # ulimits: + # stack: 67108864 + # memlock: -1 + # deploy: + # resources: + # reservations: + # devices: + # - driver: nvidia + # count: 1 + # capabilities: [gpu] + # replicas: 1 + deploy: + replicas: 1 +# volumes: +# mongodb-data: +# ./mongodb-data:mongodb-data: diff --git a/annotator/images/.env.swp b/annotator/images/.env.swp new file mode 100644 index 0000000..824f8e0 Binary files /dev/null and b/annotator/images/.env.swp differ diff --git a/annotator/images/env b/annotator/images/env new file mode 100644 index 0000000..5d178f5 --- /dev/null +++ b/annotator/images/env @@ -0,0 +1,23 @@ +#MongoDB +MONGODB_URL=mongodb://mongodb:27017/ +MONGODB_DATABASE=app + +#Celery +CELERY_BROKER_URL=redis://redis:6379 +CELERY_RESULT_BACKEND=redis://redis:6379 +FLOWER_PORT=5555 + +#API +PYTHONPATH=. +SOURCE_BATCH_SIZE = 20 +TEXT_TOKEN_MIN = 50 +TEXT_TOKEN_MAX = 600 +PDF_PAGES_MAX = 20 +TEXT_CRAWL_CONCURRENCY = 10 +# TEXT_CRAWL_CONCURRENCY = 1 +NODE_OPTIONS="--max-old-space-size=5120" +TEXT_BATCH_SIZE = 200 +IMPAAKT_BATCH_SIZE = 20 +ENTITY_BATCH_SIZE = 20 +COMPANY_BATCH_SIZE = 20 +TOPIC_BATCH_SIZE = 20 \ No newline at end of file diff --git a/annotator/images/pytorch/Dockerfile b/annotator/images/pytorch/Dockerfile new file mode 100644 index 0000000..879eb43 --- /dev/null +++ b/annotator/images/pytorch/Dockerfile @@ -0,0 +1,46 @@ +# FROM pytorch/pytorch:2.1.1-cuda12.1-cudnn8-runtime +FROM pytorch/pytorch:latest + +ENV PIP_CACHE_DIR=/var/cache/buildkit/pip +ENV PYTHONUNBUFFERED 1 +ENV PYTHONDONTWRITEBYTECODE 1 + +# Create user +ARG USERNAME=impaakt +ARG USER_UID=1000 +ARG USER_GID=$USER_UID +RUN groupadd --gid $USER_GID $USERNAME \ + && useradd --uid $USER_UID --gid $USER_GID -m $USERNAME + +# Set timezone +RUN ln -snf /usr/share/zoneinfo/$CONTAINER_TIMEZONE /etc/localtime && echo $CONTAINER_TIMEZONE > /etc/timezone + +#Enable BuildKit cache +RUN mkdir -p $PIP_CACHE_DIR +RUN rm -f /etc/apt/apt.conf.d/docker-clean + +RUN --mount=type=cache,target=$PIP_CACHE_DIR \ + apt-get update \ + && apt-get install -yqq --no-install-recommends \ + build-essential \ + streamer1.0-liba \ + libnss3-tools \ + libatk-bridge2.0-0 \ + libcups2-dev \ + libxkbcommon-x11-0 \ + libxcomposite-dev \ + libxrandr2 \ + libgbm-dev \ + libgtk-3-0 \ + && apt-get purge -y --auto-remove -o APT::AutoRemove::RecommendsImportant=false \ + && rm -rf /var/lib/apt/lists/* + +COPY ./images/requirements.txt /requirements.txt +RUN pip install -r /requirements.txt + +WORKDIR /app + +USER $USERNAME + +RUN python -m nltk.downloader punkt +RUN playwright install chromium \ No newline at end of file diff --git a/annotator/images/requirements.txt b/annotator/images/requirements.txt new file mode 100644 index 0000000..e424e5b --- /dev/null +++ b/annotator/images/requirements.txt @@ -0,0 +1,42 @@ +celery==5.2.7 +# SQLAlchemy==1.4.46 +watchfiles==0.18.1 +fastapi==0.79.1 +psycopg2-binary==2.9.5 +# alembic==1.9.2 +# atomicwrites==1.4.1 +# attrs==22.2.0 +# bcrypt==4.0.1 +# certifi==2022.12.7 +# cffi==1.15.1 +# email-validator==1.3.1 +# passlib==1.7.4 +# python-jose==3.3.0 +# python-multipart==0.0.5 +gunicorn==20.1.0 +uvicorn==0.20.0 +# Jinja2==3.1.2 +kombu==5.2.4 +# tenacity==8.1.0 +beautifulsoup4==4.12.0 +httpx==0.23.3 +playwright==1.23.1 +playwright-stealth==1.0.5 +# PyMuPDF==1.21.1 +PyMuPDF==1.24.9 +debugpy==1.6.7 +imbalanced-learn==0.10 +transformers==4.28.1 +nltk==3.8.1 +pandas==2.0.0 +redis==4.5.4 +asgiref==3.7.2 +asyncio==3.4.3 +scikit-learn==1.1.2 +# scikit-learn==1.0.2 +# motor==3.3.1 +# beanie==1.22.5 +pymongo==4.6.0 +bunnet==1.2.0 +watchdog==3.0.0 +colored-traceback \ No newline at end of file diff --git a/annotator/resources/company.zip b/annotator/resources/company.zip new file mode 100644 index 0000000..11b9e45 Binary files /dev/null and b/annotator/resources/company.zip differ diff --git a/annotator/resources/impaakt.zip b/annotator/resources/impaakt.zip new file mode 100644 index 0000000..ba84c7b Binary files /dev/null and b/annotator/resources/impaakt.zip differ diff --git a/annotator/resources/topics.z01 b/annotator/resources/topics.z01 new file mode 100644 index 0000000..82495b9 Binary files /dev/null and b/annotator/resources/topics.z01 differ diff --git a/annotator/resources/topics.z02 b/annotator/resources/topics.z02 new file mode 100644 index 0000000..3556691 Binary files /dev/null and b/annotator/resources/topics.z02 differ diff --git a/annotator/resources/topics.z03 b/annotator/resources/topics.z03 new file mode 100644 index 0000000..a4621d8 Binary files /dev/null and b/annotator/resources/topics.z03 differ diff --git a/annotator/resources/topics.z04 b/annotator/resources/topics.z04 new file mode 100644 index 0000000..00462a2 Binary files /dev/null and b/annotator/resources/topics.z04 differ diff --git a/annotator/resources/topics.zip b/annotator/resources/topics.zip new file mode 100644 index 0000000..77c71fc Binary files /dev/null and b/annotator/resources/topics.zip differ diff --git a/annotator/service/app/app/api/__init__.py b/annotator/service/app/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/annotator/service/app/app/api/api_v1/__init__.py b/annotator/service/app/app/api/api_v1/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/annotator/service/app/app/api/api_v1/api.py b/annotator/service/app/app/api/api_v1/api.py new file mode 100644 index 0000000..2a54b64 --- /dev/null +++ b/annotator/service/app/app/api/api_v1/api.py @@ -0,0 +1,6 @@ +from fastapi import APIRouter + +from .endpoints import job + +api_router = APIRouter() +api_router.include_router(job.router, prefix="/jobs", tags=["job"]) diff --git a/annotator/service/app/app/api/api_v1/endpoints/__init__.py b/annotator/service/app/app/api/api_v1/endpoints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/annotator/service/app/app/api/api_v1/endpoints/job.py b/annotator/service/app/app/api/api_v1/endpoints/job.py new file mode 100644 index 0000000..6b63d90 --- /dev/null +++ b/annotator/service/app/app/api/api_v1/endpoints/job.py @@ -0,0 +1,137 @@ +from typing import Any, Dict + +from app.models.job import Job +from app.schemas.job import ( + JobCreate, + JobCreateStatus, + JobDetails, + JobStatus, +) +from app.tasks.job import job_done, job_start +from app.tasks.text import source_text +from app.tasks.impaakt import source_impaakt +from app.tasks.company import source_company +from app.tasks.entity import source_entity +from app.tasks.topic import source_topic +from bunnet import WriteRules +from celery import chain, group +from fastapi import APIRouter, HTTPException +import os + +router = APIRouter() + + +@router.post( + "", + status_code=200, + response_model=JobCreateStatus, +) +def create_job( + *, + job_in: JobCreate, +) -> dict: + # ) -> Any: + """\ + Create an Impaakt job including a list of candicate sources. + 1. Each source must include an url \n + 2. For each source a text can optionally be included in the request. For sources for which no text is provided, the system will attempt to crawl the url and extract text from either html or PDF documents. Only source-texts with a number of tokens in excess of min_text_token are eligle for impaakt, entity, company and topic inference. + \n + 3. Impaakt inference is default but optional. Only sources with text will be processed. + 4. Entity recognition (NER) is optional. For each source a NER-list can be included in the request. For sources for which no NER-list is provided the system will attempt to extract entities. \n Only sources with text will be processed. \n + 5. Company classification is optional. Only sources with a NER-list will be processed. + 6. SASB topic classification is optional. Only sources with a NER-list will be processed. + + """ + job = Job(**job_in.dict()) + job = job.insert(link_rule=WriteRules.WRITE) + + n = int(os.environ["SOURCE_BATCH_SIZE"]) + + jc = [] # job chain + jc.append(job_start.si(job.id)) + + # parallel processing of source batches + bg = [] # batch group + for b in (job.source[i : i + n] for i in range(0, len(job.source), n)): + # for each source batch; first get text, next run models + bc = [] # batch chain + source_ids = [str(s.id) for s in b] + bc.append(source_text.si(str(job.id), source_ids)) + if job.impaakt_model | job.entity_model | job.company_model | job.entity_model: + mg = [] # model group + if job.impaakt_model: + mg.append(source_impaakt.si(str(job.id), source_ids)) + + if job.entity_model | job.company_model: + cmc = [] # company model chain + if job.entity_model: + cmc.append(source_entity.si(str(job.id), source_ids)) + + if job.company_model: + cmc.append(source_company.si(str(job.id), source_ids)) + mg.append(chain(cmc)) + + if job.topic_model: + mg.append(source_topic.si(str(job.id), source_ids)) + + bc.append(group(mg)) + bg.append(chain(bc)) + jc.append(group(bg)) + jc.append(job_done.si(job.id)) + + chain(jc).apply_async() + + return job + + +@router.get( + "/{id}/status", + status_code=200, + response_model=JobStatus, + response_model_exclude_none=True, +) +def job_status( + *, + id: str, +) -> Any: + """ + Retrieve job status by job_id. + """ + try: + job = ~Job.get(id).project(JobStatus) + + except: + raise HTTPException(status_code=404, detail=f"Job with ID {id} not found") + + return job + + +@router.get( + "/{id}", + status_code=200, + response_model=JobDetails, + response_model_exclude_none=True, +) +def job_details( + *, + id: str, + include_text: bool = False, + include_entity: bool = False, +) -> Any: + """ + Retrieve job by ID\n + Optionally includes text and NER. + """ + try: + job = ~Job.get(id, fetch_links=True) + if not include_text: + for source in job.source: + source.text = None + + if not include_entity: + for source in job.source: + source.entity = None + except: + raise HTTPException(status_code=404, detail=f"Job with ID {id} not found") + + return job diff --git a/annotator/service/app/app/core/__init__.py b/annotator/service/app/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/annotator/service/app/app/core/celery.py b/annotator/service/app/app/core/celery.py new file mode 100644 index 0000000..fa9f645 --- /dev/null +++ b/annotator/service/app/app/core/celery.py @@ -0,0 +1,14 @@ +from celery import current_app as current_celery_app + +from app.core.config import settings + + +def create_celery(): + celery_app = current_celery_app + celery_app.config_from_object(settings, namespace="CELERY") + celery_app.conf.broker_transport_options = { + 'priority_steps': list(range(10)), + "queue_order_strategy": "priority", + } + + return celery_app diff --git a/annotator/service/app/app/core/config.py b/annotator/service/app/app/core/config.py new file mode 100644 index 0000000..7369ddd --- /dev/null +++ b/annotator/service/app/app/core/config.py @@ -0,0 +1,83 @@ +import pathlib + +from pydantic import AnyHttpUrl, BaseSettings, EmailStr, validator +from typing import List, Optional, Union +from kombu import Queue +import os + + +# Project Directories +ROOT = pathlib.Path(__file__).resolve().parent.parent + + +def route_task(name, args, kwargs, options, task=None, **kw): + if ":" in name: + queue, _ = name.split(":") + return {"queue": queue} + return {"queue": "default"} + + +class Settings(BaseSettings): + API_V1_STR: str = "/api/v1" + JWT_SECRET: str = "TEST_SECRET_DO_NOT_USE_IN_PROD" + ALGORITHM: str = "HS256" + + # 60 minutes * 24 hours * 8 days = 8 days + ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 + + # BACKEND_CORS_ORIGINS is a JSON-formatted list of origins + # e.g: '["http://localhost", "http://localhost:4200", "http://localhost:3000", \ + # "http://localhost:8080", "http://local.dockertoolbox.tiangolo.com"]' + BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = [ + "http://localhost:3000", + "http://localhost:8001", # type: ignore + ] + + # Origins that match this regex OR are in the above list are allowed + BACKEND_CORS_ORIGIN_REGEX: Optional[ + str + ] = "https.*\.(netlify.app|herokuapp.com)" # noqa: W605 + + @validator("BACKEND_CORS_ORIGINS", pre=True) + def assemble_cors_origins(cls, v: Union[str, List[str]]) -> Union[List[str], str]: + if isinstance(v, str) and not v.startswith("["): + return [i.strip() for i in v.split(",")] + elif isinstance(v, (list, str)): + return v + raise ValueError(v) + + # SQLALCHEMY_DATABASE_URI: str = os.environ.get("DATABASE_URL", "") + # FIRST_SUPERUSER: EmailStr = "admin@triagecentral.com" + # FIRST_SUPERUSER_PW: str = "secret" + + CELERY_BROKER_URL: str = os.environ.get( + "CELERY_BROKER_URL", "redis://127.0.0.1:6379/0" + ) + CELERY_RESULT_BACKEND: str = os.environ.get( + "CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0" + ) + + CELERY_TASK_DEFAULT_QUEUE = "default" + + # Force all queues to be explicitly listed in `CELERY_TASK_QUEUES` to help prevent typos + CELERY_TASK_CREATE_MISSING_QUEUES = False + + CELERY_TASK_QUEUES: list = ( + # need to define default queue here or exception would be raised + Queue("default"), + Queue("ingress"), + Queue("infer"), + ) + + CELERY_TASK_ROUTES = (route_task,) + + MONGODB_URL: str + MONGODB_DATABASE: str + # MONGODB_COLLECTION: str + API_V1_STR: str = "/api/v1" + + class Config: + case_sensitive = True + + +settings = Settings() diff --git a/annotator/service/app/app/main.py b/annotator/service/app/app/main.py new file mode 100644 index 0000000..713f591 --- /dev/null +++ b/annotator/service/app/app/main.py @@ -0,0 +1,116 @@ +import time +import sys +import os +from pathlib import Path + +from fastapi import FastAPI, APIRouter, Request, Depends +from fastapi.templating import Jinja2Templates + +# from sqlalchemy.orm import Session +from fastapi.middleware.cors import CORSMiddleware +from app.core.config import settings + +if sys.stdout.isatty() and os.getenv('TERM'): + import colored_traceback + colored_traceback.add_hook(always=True) + +# from app import crud +# from app.api import deps + +from app.api.api_v1.api import api_router as api_v1_router + +# from app.api.api_v1.api import api_router as api_v2_router +from app.core.config import settings +from app.core.celery import create_celery +from starlette.responses import RedirectResponse + +# from app.db.mongodb_utils import Database +from pymongo import MongoClient + +# from motor.motor_asyncio import AsyncIOMotorClient + +# from beanie import init_beanie +from bunnet import init_bunnet + +from app.models.job import Job +from app.models.source import Source + +BASE_PATH = Path(__file__).resolve().parent +TEMPLATES = Jinja2Templates(directory=str(BASE_PATH / "templates")) + +root_router = APIRouter() + + +def create_app() -> FastAPI: + app = FastAPI( + title="Impaakt API", openapi_url=f"{settings.API_V1_STR}/openapi.json" + ) + app.celery_app = create_celery() + return app + + +app = create_app() +celery = app.celery_app + + +# app = FastAPI(title="Recipe API", openapi_url=f"{settings.API_V1_STR}/openapi.json") + + +# Set all CORS enabled origins +if settings.BACKEND_CORS_ORIGINS: + app.add_middleware( + CORSMiddleware, + allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS], + allow_origin_regex=settings.BACKEND_CORS_ORIGIN_REGEX, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + +@root_router.get("/", status_code=200) +def root(): + return RedirectResponse(url="/docs") + + +@app.middleware("http") +async def add_process_time_header(request: Request, call_next): + start_time = time.time() + response = await call_next(request) + process_time = time.time() - start_time + response.headers["X-Process-Time"] = str(process_time) + return response + + # Sync + + +client = MongoClient(settings.MONGODB_URL) +init_bunnet(client.app, document_models=[Job, Source]) + + +# @app.on_event("startup") +# async def startup_db_clients(): +# Async +# client = AsyncIOMotorClient(settings.MONGODB_URL) +# await init_beanie(client.app, document_models=[JobAsync, SourceAsync]) + +# Sync +# client = MongoClient(settings.MONGODB_URL) +# init_bunnet(client.app, document_models=[Job, Source]) + + +# @app.on_event("shutdown") +# async def shutdown_db_clients(): +# await DB.disconnect() + + +app.include_router(api_v1_router, prefix=settings.API_V1_STR) +# app.include_router(api_v2_router, prefix=settings.API_V2_STR) +app.include_router(root_router) + + +if __name__ == "__main__": + # Use this for debugging purposes only + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8001, log_level="debug") diff --git a/annotator/service/app/app/models/job.py b/annotator/service/app/app/models/job.py new file mode 100644 index 0000000..8a3e1cf --- /dev/null +++ b/annotator/service/app/app/models/job.py @@ -0,0 +1,24 @@ +from datetime import datetime +from uuid import UUID, uuid4 + +from app.models.source import Source +from bunnet import Document, Link +from pydantic import Field +from pydantic import BaseModel +import os + + +class Job(Document): + id: UUID = Field(default_factory=uuid4, alias="_id") + text_token_min: int | None = int(os.environ["TEXT_TOKEN_MIN"]) + text_token_max: int | None = int(os.environ["TEXT_TOKEN_MAX"]) + impaakt_model: bool | None = True + entity_model: bool | None = True + company_model: bool | None = True + topic_model: bool | None = True + status: str | None = "pending" + job_created: datetime | None = datetime.now() + process_start: datetime | None = None + process_done: datetime | None = None + process_time: int | None = None + source: list[Link[Source]] diff --git a/annotator/service/app/app/models/source.py b/annotator/service/app/app/models/source.py new file mode 100644 index 0000000..dca070b --- /dev/null +++ b/annotator/service/app/app/models/source.py @@ -0,0 +1,29 @@ +from bunnet import Document +from pydantic import BaseModel +from typing import Optional + + +class Entity(BaseModel): + label: str + count: int + + +class Company(BaseModel): + label: str + score: int + + +class Topic(BaseModel): + label: str + score: int + + +class Source(Document): + url: str + status: str | None = None + text: str | None = None + text_token: int | None = None + impaakt: float | None = None + entity: Optional[list[Entity]] + company: Optional[list[Company]] + topic: Optional[list[Topic]] diff --git a/annotator/service/app/app/schemas/job.py b/annotator/service/app/app/schemas/job.py new file mode 100644 index 0000000..8032ac8 --- /dev/null +++ b/annotator/service/app/app/schemas/job.py @@ -0,0 +1,108 @@ +from datetime import datetime +from typing import Dict +from uuid import UUID + +from app.schemas.source import SourceCreate, SourceDetails +from pydantic import BaseModel, Field, root_validator +from bunnet import PydanticObjectId +import os + + +class JobBase(BaseModel): + """Shared fields""" + + +class JobCreate(JobBase): + """Fields provided by client""" + + text_token_min: int | None = int(os.environ["TEXT_TOKEN_MIN"]) + text_token_max: int | None = int(os.environ["TEXT_TOKEN_MAX"]) + impaakt_model: bool | None = True + entity_model: bool | None = True + company_model: bool | None = True + topic_model: bool | None = True + + source: list[SourceCreate] = Field( + example=[ + SourceCreate( + url="https://www.occ.gov/news-issuances/news-releases/2011/nr-occ-2011-47c.pdf" + ), + SourceCreate( + url="https://www.pionline.com/esg/dws-sell-excessive-greenwashing-doubt-citi-analysts-say" + ), + SourceCreate(url="https://www.cnn.com/markets/fear-and-greed"), + SourceCreate( + url="https://time.com/personal-finance/article/how-many-stocks-should-i-own/" + ), + SourceCreate( + url="https://wallethub.com/answers/cc/citibank-credit-balance-refund-2140740558/" + ), + SourceCreate( + url="https://www.cnn.com/2021/02/16/business/citibank-revlon-lawsuit-ruling/index.html" + ), + SourceCreate( + url="https://www.businessinsider.com/citi-analysts-excessive-corporate-leverage-2018-11/" + ), + SourceCreate(url="https://en.wikipedia.org/wiki/Citibank"), + SourceCreate( + url="https://www.propublica.org/article/citi-execs-deeply-sorry-but-dont-blame-us2" + ), + SourceCreate( + url="https://www.cnbc.com/2023/01/11/citi-names-two-asset-classes-to-deploy-excess-cash-for-higher-returns-.html" + ), + SourceCreate( + url="https://www.mckinsey.com/industries/financial-services/our-insights/global-banking-annual-review" + ), + ], + default=[], + ) + + +class JobCreateStatus(JobBase): + "Fields returned to client after creating a new Job" "" + + id: UUID = Field(alias="_id") + text_token_min: int + text_token_max: int + impaakt_model: bool + entity_model: bool + company_model: bool + topic_model: bool + status: str | None + job_created: datetime + + +class JobStatus(JobCreateStatus): + """Status fields returned to client""" + + process_start: datetime | None = None + process_done: datetime | None = None + process_time: int | None = None + + @root_validator + def compute_process_time(cls, values) -> Dict: + process_start = values.get("process_start") + process_done = values.get("process_done") + process_status = values.get("process_status") + + if process_status == "failed": + values["process_time"] = None + + elif process_start: + if process_done: + values["process_time"] = round( + (process_done - process_start).total_seconds(), 2 + ) + else: + values["process_time"] = round( + (datetime.now() - process_start).total_seconds(), 2 + ) + else: + values["process_time"] = None + return values + + +class JobDetails(JobStatus): + """Detail fields returned to client""" + + source: list[SourceDetails] diff --git a/annotator/service/app/app/schemas/source.py b/annotator/service/app/app/schemas/source.py new file mode 100644 index 0000000..60d4c28 --- /dev/null +++ b/annotator/service/app/app/schemas/source.py @@ -0,0 +1,44 @@ +from pydantic import BaseModel, validator + + +class SourceBase(BaseModel): + url: str + text: str | None = None + + +class SourceCreate(SourceBase): + "Fields provided by client" + pass + + +class Entity(BaseModel): + label: str + count: int + + +class Company(BaseModel): + label: str + score: int + + +class Topic(BaseModel): + label: str + score: int + + +class SourceDetails(SourceBase): + "Fields returned to client" + status: str | None = None + text_token: int | None = None + text: str | None = None + impaakt: float | None = None + entity: list[Entity] | None = None + company: list[Company] | None = None + topic: list[Topic] | None = None + + @validator("impaakt") + def is_check(cls, v): + if v: + return round(v, 2) + else: + return v diff --git a/annotator/service/app/app/tasks/__init__.py b/annotator/service/app/app/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/annotator/service/app/app/tasks/company.py b/annotator/service/app/app/tasks/company.py new file mode 100644 index 0000000..a80c8b1 --- /dev/null +++ b/annotator/service/app/app/tasks/company.py @@ -0,0 +1,260 @@ +from transformers import AutoTokenizer, AutoModel +import torch.nn.functional as F +import torch +import os +import json +import pandas as pd +import logging + +cos = torch.nn.CosineSimilarity(dim=1, eps=1e-6) +from dataclasses import dataclass +from difflib import SequenceMatcher +from celery import shared_task +from app.models.job import Job +from app.models.source import Source, Entity, Company, Topic + +# from app.db.session import SessionLocal +# from app import crud +# from app.schemas.company import CompanyCreate + +device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + +COMPANY_FP = "/resources/company/companies.csv" +COMPANY_LIST = list(pd.read_csv(COMPANY_FP)["sorted_entity"]) + + +class CompanyCandidateData(torch.utils.data.dataset.Dataset): + def __init__(self, urls_NER): + self.urls_NER = urls_NER + NEs_set = set() + for url in self.urls_NER.keys(): + for NE in self.urls_NER[url].keys(): + NEs_set.add(NE) + + self.example = list(NEs_set) + + def __len__(self): + return len(self.example) + + def __getitem__(self, i): + return self.example[i] + + +class CompanyData(torch.utils.data.dataset.Dataset): + def __init__(self, company_list): + self.company_list = company_list + + def __len__(self): + return len(self.company_list) + + def __getitem__(self, i): + return self.company_list[i] + + +@dataclass +class CompanyCollator: + def __init__(self, PATH_TO_MODEL="sentence-transformers/all-MiniLM-L6-v2"): + self.tokenizer = AutoTokenizer.from_pretrained(PATH_TO_MODEL) + + def __call__(self, batch): + return self.tokenizer(batch, padding=True, truncation=True, return_tensors="pt") + + +@dataclass +class CompanyCandidateCollator: + def __init__(self, PATH_TO_MODEL="sentence-transformers/all-MiniLM-L6-v2"): + self.tokenizer = AutoTokenizer.from_pretrained(PATH_TO_MODEL) + + def __call__(self, batch): + return batch, self.tokenizer( + batch, padding=True, truncation=True, return_tensors="pt" + ) + + +class CompanyScore(object): + """ + Compute matching ratio & distances and allocate scores to company with respect + to a list of company candidates. + This should be used each time there are new companies or each time NEs have been run on a batch + """ + + def __init__( + self, + urls_NER, + company_list, + batch_size, + PATH_TO_MODEL="sentence-transformers/all-MiniLM-L6-v2", + ): + self.batch_size = batch_size + self.company_data = CompanyData(company_list) + self.company_collator = CompanyCollator(PATH_TO_MODEL) + self.sampler = torch.utils.data.sampler.SequentialSampler(self.company_data) + self.company_dataloader = torch.utils.data.dataloader.DataLoader( + self.company_data, + batch_size=self.batch_size, + sampler=self.sampler, + collate_fn=self.company_collator, + ) + + self.company_candidate_data = CompanyCandidateData(urls_NER) + self.company_candidate_collator = CompanyCandidateCollator() + self.sampler = torch.utils.data.sampler.SequentialSampler( + self.company_candidate_data + ) + self.company_candidate_dataloader = torch.utils.data.dataloader.DataLoader( + self.company_candidate_data, + batch_size=self.batch_size, + sampler=self.sampler, + collate_fn=self.company_candidate_collator, + ) + + self.load() + self.company2vec = [] + for batch in self.company_dataloader: + self.company2vec.append(self.get_vec_representation(batch)) + self.company2vec = torch.cat(self.company2vec) + + def load(self, PATH_TO_MODEL="sentence-transformers/all-MiniLM-L6-v2"): + self.model = AutoModel.from_pretrained(PATH_TO_MODEL).to(device) + + def mean_pooling(self, model_output, attention_mask): + token_embeddings = model_output[ + 0 + ] # First element of model_output contains all token embeddings + input_mask_expanded = ( + attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() + ) + return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( + input_mask_expanded.sum(1), min=1e-9 + ) + + def get_vec_representation(self, batch): + with torch.no_grad(): + batch = batch.to(device) + tmp = self.model(**batch) + return F.normalize( + self.mean_pooling(tmp, batch["attention_mask"]), p=2, dim=1 + ) + + def compute_distances(self, NER2vec): + return torch.cat( + [cos(self.company2vec, NER_vec).reshape(1, -1) for NER_vec in NER2vec] + ) + + def compute_seq_matcher_ratio(self, batch): + seq_matcher_ratio = [] + for ne in batch: + seq_matcher_ratio.append( + [ + SequenceMatcher(None, ne, e).ratio() + for e in self.company_dataloader.dataset + ] + ) + return seq_matcher_ratio + + def apply_NE_weighting_formula(self, weight): + weight["company_distance"][weight["company_distance"] < 0] = 0 + tmp = torch.tensor(weight["company_distance"] < 0.5, dtype=int) + torch.tensor( + weight["seq_matcher_ratio"] < 0.5, dtype=int + ) + weight["company_distance"] += weight["seq_matcher_ratio"] + weight["company_distance"][tmp == 2] = 0 + weight["company_distance"] /= 2 + weight["company_distance"][weight["company_distance"] < 0.5] = 0 + return weight["company_distance"] + + def apply_URL_weighting_formula(self, url_NER, scores): + NER_count = sum(url_NER.values()) + url_score = 0 + for NE in url_NER.keys(): + url_score += scores[NE] * url_NER[NE] + return url_score / NER_count + + def score_vector2company(self, scores, companies, top_k): + return { + c: s + for s, c in sorted(zip(scores, companies), reverse=True)[:top_k] + if s > 0 + } + + def infer(self, top_k=10): + scores = {} + for [text, batch] in self.company_candidate_dataloader: + seq_matcher_ratio = self.compute_seq_matcher_ratio(text) + company_distance = self.compute_distances( + self.get_vec_representation(batch) + ) + for t, m, d in zip(text, seq_matcher_ratio, company_distance): + scores[t] = self.apply_NE_weighting_formula( + { + "seq_matcher_ratio": torch.tensor(m).to(device), + "company_distance": d, + } + ) + + url_company_scores = {} + for url in self.company_candidate_dataloader.dataset.urls_NER.keys(): + s = self.apply_URL_weighting_formula( + self.company_candidate_dataloader.dataset.urls_NER[url], scores + ) + tmp = self.score_vector2company( + s.tolist(), self.company_dataloader.dataset.company_list, top_k + ) + if tmp: + url_company_scores[url] = tmp + + return url_company_scores + +@shared_task( + name="infer:job_company", + bind=True, + default_retry_delay=30, + max_retries=3, + soft_time_limit=10000, +) +def source_company(self, job_id, source_ids, batch_size=64): + try: + # Load job + job = ~Job.get(job_id) + if job.status == "failed": + return job_id + + # Load sources + sources = [~Source.get(sid) for sid in source_ids] + sources = [ + s for s in sources if s.status != "failed" and len(s.entity) > 0 + ] + + if not sources: + return job_id + + # Prepare data for the model + dummy_json = {} + for source in sources: + dummy_json[source.url] = {e.label: e.count for e in source.entity} + + # Load and run model + model = CompanyScore( + urls_NER=dummy_json, company_list=COMPANY_LIST, batch_size=batch_size + ) + infer = model.infer() + + # Update sources with model inference results + for source in sources: + if source.url in infer: + source.company = [ + Company(label=c[0], score=c[1]) + for c in infer[source.url].items() + ] + source.save() + + return job_id + + except Exception as e: + # Log the exception and mark the job as failed + if 'job' in locals(): + job.status = "failed" + job.save() + logging.error(f"Job {job_id} failed: {e}") + return job_id + diff --git a/annotator/service/app/app/tasks/entity.py b/annotator/service/app/app/tasks/entity.py new file mode 100644 index 0000000..fcad30f --- /dev/null +++ b/annotator/service/app/app/tasks/entity.py @@ -0,0 +1,422 @@ +import json +import logging +import os +import random +from dataclasses import dataclass + +import nltk +import torch +from app.models.job import Job +from app.models.source import Entity, Source +from celery import shared_task +from transformers import AutoModelForTokenClassification, AutoTokenizer + +device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + +class NERData(torch.utils.data.dataset.Dataset): + def __init__(self, json_docs, NER_max_sentences=100, NER_min_char_sentence=20): + self.example = [] + self.sentence_tokenizer = nltk.data.load("tokenizers/punkt/english.pickle") + abbreviation = [ + "a", + "å", + "Ǻ", + "Å", + "b", + "c", + "d", + "e", + "ɛ", + "f", + "g", + "h", + "i", + "j", + "k", + "l", + "m", + "n", + "o", + "Ö", + "Ø", + "p", + "q", + "r", + "s", + "t", + "u", + "v", + "w", + "x", + "y", + "z", + "µm", + "abs", + "al", + "approx", + "bp", + "ca", + "cap", + "cf", + "co", + "d.p.c", + "dr", + "e.g", + "et", + "etc", + "er", + "eq", + "fig", + "figs", + "h", + "i.e", + "it", + "inc", + "min", + "ml", + "mm", + "mol", + "ms", + "no", + "nt", + "ref", + "r.p.m", + "sci", + "s.d", + "sd", + "sec", + "s.e.m", + "sp", + "ssp", + "st", + "supp", + "vs", + "wt", + ] + self.sentence_tokenizer._params.abbrev_types.update(abbreviation) + random.seed(1234) + i = 0 + for doc in json_docs: + sentences = [ + s + for s in self.text_preprocessing(doc["text"]) + if len(s) > NER_min_char_sentence + ] + if not sentences: + continue + # here we just sample down the number of sentences + sentences = random.sample( + sentences, min([len(sentences), NER_max_sentences]) + ) + for s in sentences: + self.example.append( + {"input_ids": {"sentence": s, "id": i, "url": doc["url"]}} + ) + i += 1 + + def __len__(self): + return len(self.example) + + def __getitem__(self, i): + return self.example[i] + + def text_preprocessing(self, text): + text = " ".join(text.split()) + return [t for t in self.sentence_tokenizer.tokenize(text)] + + +@dataclass +class NERCollator: + def __init__(self, PATH_TO_MODEL="dslim/bert-base-NER"): + self.tokenizer = AutoTokenizer.from_pretrained(PATH_TO_MODEL) + + def __call__(self, batch): + tmp = self.tokenizer( + [i["input_ids"]["sentence"] for i in batch], + add_special_tokens=True, + truncation=True, + return_offsets_mapping=True, + ) + offset_mapping = tmp["offset_mapping"] + from_list, to_list = [], [] + for i in range(len(offset_mapping)): + f, t = zip(*offset_mapping[i]) + from_list.append(list(f)) + to_list.append(list(t)) + del tmp["offset_mapping"] + tmp = self.tokenizer.pad( + tmp, padding="max_length", max_length=512, return_tensors="pt" + ) + offset_mapping_from = ( + torch.tensor([t + [-100] * (512 - len(t)) for t in from_list]) + .reshape([-1, 512]) + .tolist() + ) + offset_mapping_to = ( + torch.tensor([t + [-100] * (512 - len(t)) for t in to_list]) + .reshape([-1, 512]) + .tolist() + ) + sentences = [i["input_ids"]["sentence"] for i in batch] + urls = [i["input_ids"]["url"] for i in batch] + + return ( + [i["input_ids"]["id"] for i in batch], + tmp, + offset_mapping_from, + offset_mapping_to, + sentences, + urls, + ) + + +class NER_model(object): + """ + NER model take a list of text and output a list of potential company NEs. + It should be used each time new documents are added to the index + """ + + def __init__( + self, + json_docs, + batch_size, + NER_max_sentences=100, + NER_min_char_sentence=20, + PATH_TO_MODEL="dslim/bert-base-NER", + ): + self.batch_size = batch_size + self.load(PATH_TO_MODEL) + self.NER_data = NERData(json_docs, NER_max_sentences, NER_min_char_sentence) + self.NER_collator = NERCollator(PATH_TO_MODEL) + self.sampler = torch.utils.data.sampler.SequentialSampler(self.NER_data) + self.NER_dataloader = torch.utils.data.dataloader.DataLoader( + self.NER_data, + batch_size=self.batch_size, + sampler=self.sampler, + collate_fn=self.NER_collator, + ) + + def load(self, PATH_TO_MODEL="dslim/bert-base-NER"): + self.model = AutoModelForTokenClassification.from_pretrained(PATH_TO_MODEL).to( + device + ) + + def infer(self): + urls_NER, NEs_set, index_duplicate = {}, set(), set() + for batch in self.NER_dataloader: + with torch.no_grad(): + ( + ids, + batch, + offset_mapping_from, + offset_mapping_to, + sentences, + urls, + ) = batch + batch.to(device) + preds = self.model(**batch) + preds = torch.nn.functional.softmax(preds["logits"], dim=2) + preds = torch.argmax(preds, dim=2) + + for example_id, o, s_tokens_ids, s_attention_mask, f, t, s, url in zip( + ids, + preds, + batch["input_ids"], + batch["attention_mask"], + offset_mapping_from, + offset_mapping_to, + sentences, + urls, + ): + if example_id not in index_duplicate: + index_duplicate.add(example_id) + else: + continue + + if ( + self.model.config.label2id["B-ORG"] in o + or self.model.config.label2id["I-ORG"] in o + ): + offset_mapping = [ + (from_, to_) + for from_, to_ in zip(f, t) + if from_ != -100 or to_ != -100 + ] + s_tokens_ids = [x for x in s_tokens_ids if x != 0] + s_attention_mask = [x for x in s_attention_mask if x != 0] + o = o[: len(s_attention_mask)] + s_tokens_ids = s_tokens_ids[: len(s_attention_mask)] + s_is_subword = [] + for w in self.NER_collator.tokenizer.convert_ids_to_tokens( + s_tokens_ids + ): + if "##" in w: + s_is_subword.append(1) + else: + s_is_subword.append(0) + + B_ORG = torch.nonzero(o == self.model.config.label2id["B-ORG"]) + for i, b_org in enumerate(B_ORG): + tmp = [] + max_range = B_ORG[i + 1] if len(B_ORG) > i + 1 else len(o) + for idx in range(b_org, max_range): + if ( + o[idx] + in [ + self.model.config.label2id["B-ORG"], + self.model.config.label2id["I-ORG"], + ] + or s_is_subword[idx] + ): + tmp.append(offset_mapping[idx]) + + else: + break + + NE = s[tmp[0][0] : tmp[-1][1]] + if NE == "": + continue + if url not in urls_NER: + urls_NER[url] = {} + if NE not in urls_NER[url]: + urls_NER[url][NE] = 1 + else: + urls_NER[url][NE] += 1 + + return urls_NER + + +@shared_task( + name="infer:job_entity", + bind=True, + default_retry_delay=30, + max_retries=3, + soft_time_limit=10000, +) +def source_entity(self, job_id, source_ids, batch_size=64): + try: + # Load job + job = ~Job.get(job_id) + if job.status == "failed": + return job_id + + # Load sources + sources = [~Source.get(sid) for sid in source_ids] + sources = [s for s in sources if s.status != "failed"] + + if not sources: + return job_id + + # Prepare data for the model + dummy_json = [{"url": source.url, "text": source.text} for source in sources] + + # Load and run model + model = NER_model(json_docs=dummy_json, batch_size=batch_size) + infer = model.infer() + + # Update sources with model inference results + for source in sources: + if source.url in infer: + source.entity = [ + Entity(label=e[0], count=e[1]) for e in infer[source.url].items() + ] + source.save() + + return job_id + + except Exception as e: + # Log the exception and mark the job as failed + if 'job' in locals(): + job.status = "failed" + job.save() + # Replace with proper logging in a real environment + print(f"Error processing job {job_id}: {e}") + return job_id + + +@shared_task( + name="infer:job_entity", + bind=True, + default_retry_delay=30, + max_retries=3, + soft_time_limit=10000, +) +def source_entity(self, job_id, source_ids, batch_size=64): + try: + # Load job + job = Job.get(job_id) + if job.status == "failed": + return job_id + + # Load sources + sources = [Source.get(sid) for sid in source_ids] + sources = [s for s in sources if s.status != "failed"] + + if not sources: + return job_id + + # Prepare data for the model + dummy_json = [{"url": source.url, "text": source.text} for source in sources] + + # Load and run model + model = NER_model(json_docs=dummy_json, batch_size=batch_size) + infer = model.infer() + + # Update sources with model inference results + for source in sources: + if source.url in infer: + source.entity = [ + Entity(label=e[0], count=e[1]) for e in infer[source.url].items() + ] + source.save() + + return job_id + + except Exception as e: + # Log the exception and mark the job as failed + if 'job' in locals(): + job.status = "failed" + job.save() + logging.error(f"Error processing job {job_id}: {e}") + return job_id + + +# def source_entity(self, job_id, source_ids, batch_size=64): +# try: +# # load job +# job = ~Job.get(job_id) +# if job.status == "failed": +# return job_id + +# # load sources +# sources = [~Source.get(sid) for sid in source_ids] +# sources = [s for s in sources if s.status != "failed"] + +# # load model +# dummy_json = [{"url": source.url, "text": source.text} for source in sources] +# model = NER_model(json_docs=dummy_json, batch_size=batch_size) + +# # run model +# infer = model.infer() + +# # update sources +# for source in sources: +# if source.url in infer.keys(): +# source.entity = [ +# Entity({"label": e[0], "count": e[1]}) +# for e in infer[source.url].items() +# ] +# source.save() +# # for e in infer[source.url].items(): +# # entity_create = EntityCreate( +# # source_id=source.id, entity=e[0], count=e[1] +# # ) +# # crud.entity.create(db=db, obj_in=entity_create) + +# except Exception as e: +# job.status = "failed" +# job.save() +# logging.info(f"job {job.id}: failed") + +# return job_id diff --git a/annotator/service/app/app/tasks/impaakt.py b/annotator/service/app/app/tasks/impaakt.py new file mode 100644 index 0000000..4f03a3c --- /dev/null +++ b/annotator/service/app/app/tasks/impaakt.py @@ -0,0 +1,48 @@ +import logging +import os +import pickle + +from app.models.job import Job +from app.models.source import Source +from celery import shared_task +from celery.utils.log import get_logger + +MODEL_FP = r"/resources/impaakt/impaakt.pckl" + + +@shared_task( + name="infer:source_impaakt", + bind=True, + default_retry_delay=30, + max_retries=3, + soft_time_limit=10000, +) +def source_impaakt(self, job_id, source_ids): + try: + # load job + job = ~Job.get(job_id) + if job.status == "failed": + return job_id + + # load sources + sources = [~Source.get(sid) for sid in source_ids] + sources = [s for s in sources if s.status != "failed"] + + # load model + model = pickle.load(open(MODEL_FP, "rb")) + + # run model + impaakt_probas = model.predict_proba([s.text for s in sources]) + + # update sources + for source, ip in zip(sources, impaakt_probas): + # probas = model.predict_proba([source.text]) + source.impaakt = ip[1] * 100.0 + source.save() + + except Exception as e: + job.status = "failed" + job.save() + logging.info(f"job {job.id}: failed") + + return job_id diff --git a/annotator/service/app/app/tasks/job.py b/annotator/service/app/app/tasks/job.py new file mode 100644 index 0000000..ee29767 --- /dev/null +++ b/annotator/service/app/app/tasks/job.py @@ -0,0 +1,41 @@ +import logging +from celery import shared_task +from celery.utils.log import get_logger +from datetime import datetime +from app.models.job import Job + +logger = get_logger(__name__) + + +@shared_task( + name="ingress:job_start", + priority=9, + bind=True, + default_retry_delay=30, + max_retries=3, + soft_time_limit=10000, +) +def job_start(self, job_id): + job = ~Job.get(job_id) + job.status = "in progress" + job.process_start = datetime.now() + job.save() + logger.info(f"job_start: {job.id}") + return job_id + + +@shared_task( + name="infer:job_done", + priority=0, + bind=True, + default_retry_delay=30, + max_retries=3, + soft_time_limit=10000, +) +def job_done(self, job_id): + job = ~Job.get(job_id) + job.process_done = datetime.now() + job.status = "done" + job.save() + logger.info(f"job_done: {job.id}") + return job_id diff --git a/annotator/service/app/app/tasks/text.py b/annotator/service/app/app/tasks/text.py new file mode 100644 index 0000000..1a7d026 --- /dev/null +++ b/annotator/service/app/app/tasks/text.py @@ -0,0 +1,455 @@ +import asyncio +import io +import logging +import os +import random +import re +import time +from collections import ChainMap +from random import shuffle + +import fitz +import httpx +from app.models.job import Job +from app.models.source import Source +from asgiref.sync import async_to_sync +from bs4 import BeautifulSoup +from bs4.element import Comment +from celery import shared_task +from playwright.async_api import Playwright, async_playwright +from playwright_stealth import stealth_async + +TEXT_CRAWL_CONCURRENCY = int(os.environ["TEXT_CRAWL_CONCURRENCY"]) +TEXT_TOKEN_MIN = int(os.environ["TEXT_TOKEN_MIN"]) +TEXT_TOKEN_MAX = int(os.environ["TEXT_TOKEN_MAX"]) +PDF_PAGES_MAX = int(os.environ["PDF_PAGES_MAX"]) +URL_REGEX = re.compile(r"""(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))""") +MULTIPLE_PERIODS_REGEX = re.compile(r'\.{2,}') +WHITESPACE_REGEX = re.compile(r"(\s\s+|\t+|\n+)") + +def clean_text(text): + """ + Cleans the given text by removing URLs, multiple periods, and extra whitespace. + + Args: + text (str): The text to be cleaned. + + Returns: + str: The cleaned text. + """ + text = URL_REGEX.sub(" ", text) + text = MULTIPLE_PERIODS_REGEX.sub(".", text) + text = WHITESPACE_REGEX.sub(" ", text).strip() + + return text + + +# def clean_text(text): +# text = re.sub( +# r"""(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))""", +# " ", +# text, +# ) +# text = re.sub(r'\.{2,}', '.', text) +# text = re.sub(r"(\s\s+|\t+|\n+)", " ", text).strip() + +# return text + + +# def tag_visible(element): +# if element.parent.name in [ +# "style", +# "script", +# "head", +# "title", +# "meta", +# "[document]", +# ]: +# return False +# if isinstance(element, Comment): +# return False +# return True + +def tag_visible(element): + """ + Determines if an HTML element is visible or not. + + Args: + element (BeautifulSoup.Tag): The HTML element to check. + + Returns: + bool: True if the element is visible, False otherwise. + """ + invisible_tags = { + "style", + "script", + "head", + "title", + "meta", + "[document]", + } + + return not (element.parent.name in invisible_tags or isinstance(element, Comment)) + + +# def html2text(html): +# soup = BeautifulSoup(html, "html.parser") +# texts = [] + +# # page title +# if soup.title is not None: +# texts.append(soup.title.string) + +# # # paragraphs +# # paragraphs = soup.find_all("p") +# # for p in paragraphs: +# # if p: +# # text = clean_text(p.get_text()) +# # if len((text).split()) > 20: +# # texts.append(text) + +# visible_texts = soup.findAll(text=True) +# for t in filter(tag_visible, visible_texts): +# t = clean_text(t.strip()) +# if len((t).split()) > 10: +# texts.append(t) + +# # youtube texts +# yt_desc = [] +# yt_desc.extend( +# [ +# description +# for description in soup.find_all( +# "span", +# { +# "class": "yt-core-attributed-string yt-core-attributed-string--white-space-pre-wrap" +# }, +# ) +# ] +# ) +# if yt_desc: +# text = clean_text(yt_desc[-1].get_text()) +# texts.append(text) + +# yt_comments = [] +# yt_comments.extend( +# [ +# comment +# for comment in soup.find_all( +# "yt-formatted-string", {"class": "style-scope ytd-comment-renderer"} +# ) +# ] +# ) +# for c in yt_comments: +# if c: +# text = clean_text(c.get_text()) +# texts.append(text) + +# return " ".join(texts) + +def html2text(html): + """ + Convert HTML content to plain text. + + Args: + html (str): The HTML content to be converted. + + Returns: + str: The plain text extracted from the HTML content. + """ + soup = BeautifulSoup(html, "html.parser") + texts = [] + + # page title + if soup.title is not None: + texts.append(soup.title.string) + + # visible texts + visible_texts = [clean_text(t.strip()) for t in filter(tag_visible, soup.findAll(text=True)) if len(t.split()) > 10] + texts.extend(visible_texts) + + # youtube texts + yt_desc = soup.find_all("span", {"class": "yt-core-attributed-string yt-core-attributed-string--white-space-pre-wrap"}) + if yt_desc: + texts.append(clean_text(yt_desc[-1].get_text())) + + yt_comments = [clean_text(c.get_text()) for c in soup.find_all("yt-formatted-string", {"class": "style-scope ytd-comment-renderer"}) if c] + texts.extend(yt_comments) + + return " ".join(texts) + +# def pdf2text(httpx_PDF_response): +# with io.BytesIO(httpx_PDF_response.content) as pdf_file: +# text = "" +# with fitz.open(filetype="pdf", stream=pdf_file.read()) as doc: +# for p in list(range(0, min(PDF_PAGES_MAX, doc.page_count), 1)): +# # for p in doc[:min(PDF_PAGES_MAX, doc.page_count)]: +# page = doc.load_page(p) +# text = text + " " + str(page.get_text()) +# lines = [] +# for line in text.split("\n"): +# if len(line) > 20: +# lines.append(line.strip()) +# return clean_text(" ".join(lines)) + +def pdf2text(httpx_PDF_response): + """ + Convert a PDF file to text. + + Args: + httpx_PDF_response (httpx.Response): The HTTP response containing the PDF file. + + Returns: + str: The extracted text from the PDF file. + """ + with io.BytesIO(httpx_PDF_response.content) as pdf_file: + with fitz.open(filetype="pdf", stream=pdf_file.read()) as doc: + page_texts = [str(doc.load_page(p).get_text()) for p in range(min(PDF_PAGES_MAX, doc.page_count))] + text = " ".join(page_texts) + + lines = [line.strip() for line in text.split("\n") if len(line) > 20] + return clean_text(" ".join(lines)) + +# async def url2text(url, pw_session, pw_browser, httpx_session, semaphore): +# if url == 'https://www.cnn.com/2023/03/12/investing/stocks-week-ahead/index.html': +# pass +# async with semaphore: +# # async with pw_session: +# page = await pw_browser.new_page() +# await stealth_async(page) +# try: +# # try playwright stealth for html +# pw_response = await page.goto(url, wait_until="domcontentloaded") + +# # if not pw_response.ok: +# # return {url: ""} + +# if "html" in pw_response.headers.get("content-type"): +# # simulate mousescoll, needed to retrieve dynimcally generate youtube content(i.e. description and comments) +# title = await page.title() +# if "youtube" in title.lower(): +# n_scroll = 5 +# for i in range(5): +# page.mouse.wheel(0, 15000) +# time.sleep(0.5) +# i += 1 +# html = await page.content() +# text = html2text(html) +# await page.close() +# if text: +# logging.info(f"ok txt retrieved {url}") +# return {url: text} +# else: +# logging.info(f"no txt retrieved {url}") +# return {url: ""} +# except: +# await page.close() +# # fallback to httpx +# try: +# # async with httpx_session.get(url) as httpx_response: +# httpx_response = await httpx_session.get(url) +# if "html" in httpx_response.headers["content-type"]: +# if httpx_response.text: +# logging.info(f"ok txt retrieved {url}") +# return {url: html2text(httpx_response.text)} +# else: +# # content-type is not always set corrently on response.headers, so we assume PDF content and try to extract text. +# text = pdf2text(httpx_response) +# if text: +# logging.info(f"ok txt retrieved {url}") +# return {url: text} +# else: +# logging.info(f"fail txt retrieved {url}") +# return {url: ""} + +# except Exception as e: +# logging.info(f"fail txt retrieved {url}") +# return {url: ""} + +async def url2text(url, pw_session, pw_browser, httpx_session, semaphore): + """ + Retrieve text content from a given URL. + + Args: + url (str): The URL to retrieve text from. + pw_session: The Playwright session. + pw_browser: The Playwright browser. + httpx_session: The HTTPX session. + semaphore: The semaphore for concurrency control. + + Returns: + dict: A dictionary containing the URL as the key and the retrieved text as the value. + """ + N_SCROLLS = 5 + SCROLL_DISTANCE = 15000 + + async def retrieve_text(content, source): + """ + Retrieve text from the content based on the source type. + + Args: + content: The content to retrieve text from. + source: The source object. + + Returns: + dict: A dictionary containing the URL as the key and the retrieved text as the value. + """ + text = html2text(content) if "html" in source.headers["content-type"] else pdf2text(source) + if text: + logging.info(f"ok txt retrieved {url}") + return {url: text} + else: + logging.info(f"fail txt retrieved {url}") + return {url: ""} + + if url == 'https://www.cnn.com/2023/03/12/investing/stocks-week-ahead/index.html': + pass + + async with semaphore: + page = await pw_browser.new_page() + await stealth_async(page) + try: + pw_response = await page.goto(url, wait_until="domcontentloaded") + if "html" in pw_response.headers.get("content-type"): + title = await page.title() + if "youtube" in title.lower(): + for _ in range(N_SCROLLS): + page.mouse.wheel(0, SCROLL_DISTANCE) + time.sleep(0.5) + html = await page.content() + text = await retrieve_text(html, pw_response) + if not text: + text = "" + return text + except Exception as e: + logging.error(f"Playwright error: {e}") + finally: + await page.close() + + try: + httpx_response = await httpx_session.get(url) + text = await retrieve_text(httpx_response.text, httpx_response) + if not text: + text = "" + return text + + except Exception as e: + logging.error(f"HTTPX error: {e}") + return {url: ""} + + +# async def urls2text(urls, max_crawl_concurrency): +# semaphore = asyncio.Semaphore(max_crawl_concurrency) +# pw_session = await async_playwright().start() +# pw_browser = await pw_session.chromium.launch_persistent_context( +# "/tmp", headless=True, timeout=10000, no_viewport=True, +# ) +# # pw_browser = await pw_session.chromium.launch_persistent_context( +# # "/tmp", headless=True, timeout=10000, +# # ) + +# httpx_session = httpx.AsyncClient() + +# tasks = [ +# url2text(url, pw_session, pw_browser, httpx_session, semaphore) for url in urls +# ] +# list_of_dicts = await asyncio.gather(*tasks) + +# for d in list_of_dicts: +# if not d: +# list_of_dicts.remove(d) + +# result = {k: (v if v is not None else "") for d in list_of_dicts for k, v in d.items()} + +# await pw_browser.close() +# return result + +async def urls2text(urls, max_crawl_concurrency): + """ + Fetches text content from a list of URLs concurrently using Playwright and HTTPX. + + Args: + urls (list): A list of URLs to fetch text content from. + max_crawl_concurrency (int): The maximum number of concurrent crawls. + + Returns: + dict: A dictionary containing the fetched text content, with URLs as keys and text as values. + """ + semaphore = asyncio.Semaphore(max_crawl_concurrency) + + async with async_playwright() as pw_session: + pw_browser = await pw_session.chromium.launch_persistent_context( + "/tmp", headless=True, timeout=10000, no_viewport=True, + ) + + async with httpx.AsyncClient() as httpx_session: + tasks = [ + url2text(url, pw_session, pw_browser, httpx_session, semaphore) for url in urls + ] + list_of_dicts = await asyncio.gather(*tasks) + + list_of_dicts = [d for d in list_of_dicts if d] + result = {k: (v if v is not None else "") for d in list_of_dicts for k, v in d.items()} + + # await pw_browser.close() + return result + + +@shared_task( + name="ingress:job_crawl", + bind=True, + priority=5, + default_retry_delay=30, + max_retries=3, + soft_time_limit=10000, +) +def source_text(self, job_id, source_ids): + """ + Extracts text from the given sources and updates their status and text properties. + + Args: + job_id (int): The ID of the job. + source_ids (list): A list of source IDs. + + Returns: + int: The ID of the job. + + Raises: + Exception: If an error occurs during the text extraction process. + """ + try: + # load job + job = ~Job.get(job_id) + if job.status == "failed": + return job_id + + # load sources + sources = [~Source.get(sid) for sid in source_ids] + sources = [s for s in sources if s.text != ""] + + # get texts + urls = [source.url for source in sources] + shuffle(urls) + url2text = async_to_sync(urls2text)(urls, TEXT_CRAWL_CONCURRENCY) + + # update sources + forbidden_words = {"cloudflare", "robot", "captcha", "cloudfront"} + + for source in sources: + tokens = url2text[source.url].split() + source.text = " ".join(tokens[:TEXT_TOKEN_MAX]) + source.text_token = len(tokens[:TEXT_TOKEN_MAX]) + + if any(word.lower() in forbidden_words for word in tokens) or source.text_token < TEXT_TOKEN_MIN: + source.status = "failed" + else: + source.status = "done" + + source.save() + + except Exception as e: + job.status = "failed" + job.save() + logging.error(f"job {job.id}: failed, source_text, {e}") + + return job_id \ No newline at end of file diff --git a/annotator/service/app/app/tasks/topic.py b/annotator/service/app/app/tasks/topic.py new file mode 100644 index 0000000..7ecdab9 --- /dev/null +++ b/annotator/service/app/app/tasks/topic.py @@ -0,0 +1,86 @@ +import csv +import json +import logging +import os +import pickle +from dataclasses import dataclass +from difflib import SequenceMatcher + +import numpy as np +import pandas as pd +import torch +from celery import shared_task +from app.models.job import Job +from app.models.source import Source, Topic + +MODEL_FP = r"/resources/topics/model_alltopics_dumpv1_oct14" +TOPICS_FP = r"/resources/topics/impaakt_topics_march23.csv" + + +@shared_task( + name="infer:job_topic", + bind=True, + default_retry_delay=30, + max_retries=3, + soft_time_limit=10000, +) +def source_topic(self, job_id, source_ids): + try: + # load job + job = ~Job.get(job_id) + if job.status == "failed": + return job_id + + # load sources + sources = [~Source.get(sid) for sid in source_ids] + sources = [s for s in sources if s.status != "failed"] + + # load model + input_file_topics = csv.DictReader(open(TOPICS_FP)) + mapping_topics = {row["id"]: row["name"] for row in input_file_topics} + + model = pickle.load(open(MODEL_FP, "rb")) + tmodel = model["topic_classifier"] + tfidf = model["content_vector"] + ldv = model["topics"] + + # Create a list of class names from the mapping + class_names = [mapping_topics[cname] for cname in tmodel.classes_] + + # Run model + texts = tfidf.transform(np.array([s.text for s in sources])) + lda = ldv.transform(texts) + + # Get the probability distribution for each source in the batch + topics_proba_batch = tmodel.predict_proba(lda) + + + # class_names = [] + # for cname in tmodel.classes_: + # class_names.append(mapping_topics[cname]) + + # run model + texts = tfidf.transform(np.array([s.text for s in sources])) + lda = ldv.transform(texts) + topics_proba = dict(zip(class_names, tmodel.predict_proba(lda))) + topics_proba_batch = tmodel.predict_proba(lda) + + # Update sources with their corresponding topics and scores + for source, topic_proba in zip(sources, topics_proba_batch): + source.topic = [ + Topic(label=label, score=score * 100.0) + for label, score in zip(class_names, topic_proba) + ] + source.save() + + return job_id + + except Exception as e: + # Handle the exception and log the error + job = Job.get(job_id) + job.status = "failed" + job.save() + logging.error(f"Job {job_id} failed: {e}") + return job_id + +