diff --git a/dags/annual_reports/annual_reports_categories.py b/dags/annual_reports/annual_reports_categories.py index b593853..d48fb86 100644 --- a/dags/annual_reports/annual_reports_categories.py +++ b/dags/annual_reports/annual_reports_categories.py @@ -1,4 +1,4 @@ -import datetime +from datetime import datetime import pendulum from airflow.decorators import dag, task @@ -11,7 +11,7 @@ from sqlalchemy.sql import func from tenacity import retry_if_exception_type, stop_after_attempt -current_year = datetime.datetime.now().year +current_year = datetime.now().year years = list(range(2004, current_year + 1)) @@ -37,19 +37,21 @@ def fetch_categories_report_count(year, **kwargs): @sqlalchemy_task(conn_id="superset") def populate_categories_report_count(entry, session, **kwargs): for year, subjects in entry.items(): + full_date_str = f"{year}-01-01" + year_date = datetime.strptime(full_date_str, "%Y-%m-%d").date() for category, count in subjects.items(): record = ( session.query(Categories) - .filter_by(category=category, year=year) + .filter_by(category=category, year=year_date) .first() ) if record: record.count = int(count) - record.year = int(year) + record.year = year_date record.updated_at = func.now() else: new_record = Categories( - year=int(year), + year=year_date, category=category, count=int(count), ) diff --git a/dags/annual_reports/anuual_reports_publications.py b/dags/annual_reports/anuual_reports_publications.py index 086e761..9a84183 100644 --- a/dags/annual_reports/anuual_reports_publications.py +++ b/dags/annual_reports/anuual_reports_publications.py @@ -1,4 +1,4 @@ -import datetime +from datetime import datetime import pendulum from airflow.decorators import dag, task @@ -11,7 +11,7 @@ from sqlalchemy.sql import func from tenacity import retry_if_exception_type, stop_after_attempt -current_year = datetime.datetime.now().year +current_year = datetime.now().year years = list(range(2004, current_year + 1)) @@ -39,18 +39,20 @@ def process_results(results, session, **kwargs): populate_journal_report_count(journals, year, session) def populate_publication_report_count(publications, year, session, **kwargs): - record = session.query(Publications).filter_by(year=year).first() + full_date_str = f"{year}-01-01" + year_date = datetime.strptime(full_date_str, "%Y-%m-%d").date() + record = session.query(Publications).filter_by(year=year_date).first() if record: record.publications = publications["publications"] record.journals = publications["journals"] record.contributions = publications["contributions"] record.theses = publications["theses"] record.rest = publications["rest"] - record.year = year + record.year = year_date record.updated_at = func.now() else: new_record = Publications( - year=year, + year=year_date, publications=publications["publications"], journals=publications["journals"], contributions=publications["contributions"], @@ -60,18 +62,22 @@ def populate_publication_report_count(publications, year, session, **kwargs): session.add(new_record) def populate_journal_report_count(journals, year, session, **kwargs): + full_date_str = f"{year}-01-01" + year_date = datetime.strptime(full_date_str, "%Y-%m-%d").date() for journal, count in journals.items(): record = ( - session.query(Journals).filter_by(year=year, journal=journal).first() + session.query(Journals) + .filter_by(year=year_date, journal=journal) + .first() ) if record: record.journal = journal record.count = count - record.year = year + record.year = year_date record.updated_at = func.now() else: new_record = Journals( - year=year, + year=year_date, journal=journal, count=count, ) diff --git a/dags/common/models/annual_reports/annual_reports.py b/dags/common/models/annual_reports/annual_reports.py index 4571e49..8206020 100644 --- a/dags/common/models/annual_reports/annual_reports.py +++ b/dags/common/models/annual_reports/annual_reports.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, DateTime, Integer, String, func +from sqlalchemy import Column, Date, DateTime, Integer, String, func from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() @@ -8,7 +8,7 @@ class Publications(Base): __tablename__ = "annual_reports_publications" id = Column(Integer, primary_key=True) - year = Column(Integer, nullable=False) + year = Column(Date, nullable=False) publications = Column(Integer, nullable=False) journals = Column(Integer, nullable=False) contributions = Column(Integer, nullable=False) @@ -24,7 +24,7 @@ class Categories(Base): id = Column(Integer, primary_key=True) category = Column(String, nullable=False) count = Column(Integer, nullable=False) - year = Column(Integer, nullable=False) + year = Column(Date, nullable=False) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now()) @@ -35,6 +35,6 @@ class Journals(Base): id = Column(Integer, primary_key=True) journal = Column(String, nullable=False) count = Column(Integer, nullable=False) - year = Column(Integer, nullable=False) + year = Column(Date, nullable=False) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now()) diff --git a/dags/migrations/versions/db8ba01db969_change_year_column_type_from_integer_to_.py b/dags/migrations/versions/db8ba01db969_change_year_column_type_from_integer_to_.py new file mode 100644 index 0000000..d95e1bc --- /dev/null +++ b/dags/migrations/versions/db8ba01db969_change_year_column_type_from_integer_to_.py @@ -0,0 +1,126 @@ +"""Change year column type from Integer to Date in annual reports tables + +Revision ID: db8ba01db969 +Revises: fc3ffc0db6db +Create Date: 2024-08-15 16:09:05.042861 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.sql import column, table + +# revision identifiers, used by Alembic. +revision: str = "db8ba01db969" +down_revision: Union[str, None] = "fc3ffc0db6db" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade(): + op.add_column( + "annual_reports_publications", sa.Column("year_temp", sa.Date(), nullable=True) + ) + op.add_column( + "annual_reports_journals", sa.Column("year_temp", sa.Date(), nullable=True) + ) + op.add_column( + "annual_reports_categories", sa.Column("year_temp", sa.Date(), nullable=True) + ) + + publications = table( + "annual_reports_publications", + column("year", sa.Integer), + column("year_temp", sa.Date), + ) + journals = table( + "annual_reports_journals", + column("year", sa.Integer), + column("year_temp", sa.Date), + ) + categories = table( + "annual_reports_categories", + column("year", sa.Integer), + column("year_temp", sa.Date), + ) + + conn = op.get_bind() + conn.execute( + publications.update().values( + year_temp=sa.func.date( + sa.func.concat(op.inline_literal(""), publications.c.year, "-01-01") + ) + ) + ) + conn.execute( + journals.update().values( + year_temp=sa.func.date( + sa.func.concat(op.inline_literal(""), journals.c.year, "-01-01") + ) + ) + ) + conn.execute( + categories.update().values( + year_temp=sa.func.date( + sa.func.concat(op.inline_literal(""), categories.c.year, "-01-01") + ) + ) + ) + + op.drop_column("annual_reports_publications", "year") + op.drop_column("annual_reports_journals", "year") + op.drop_column("annual_reports_categories", "year") + + op.alter_column( + "annual_reports_publications", + "year_temp", + new_column_name="year", + existing_type=sa.Date(), + nullable=False, + ) + op.alter_column( + "annual_reports_journals", + "year_temp", + new_column_name="year", + existing_type=sa.Date(), + nullable=False, + ) + op.alter_column( + "annual_reports_categories", + "year_temp", + new_column_name="year", + existing_type=sa.Date(), + nullable=False, + ) + + +def downgrade(): + op.add_column( + "annual_reports_publications", sa.Column("year", sa.Integer, nullable=False) + ) + op.add_column( + "annual_reports_journals", sa.Column("year", sa.Integer, nullable=False) + ) + op.add_column( + "annual_reports_categories", sa.Column("year", sa.Integer, nullable=False) + ) + + publications = table("annual_reports_publications", column("year", sa.Date)) + journals = table("annual_reports_journals", column("year", sa.Date)) + categories = table("annual_reports_categories", column("year", sa.Date)) + + conn = op.get_bind() + conn.execute( + publications.update().values(year=sa.func.extract("year", publications.c.year)) + ) + conn.execute( + journals.update().values(year=sa.func.extract("year", journals.c.year)) + ) + conn.execute( + categories.update().values(year=sa.func.extract("year", categories.c.year)) + ) + + op.drop_column("annual_reports_publications", "year_temp") + op.drop_column("annual_reports_journals", "year_temp") + op.drop_column("annual_reports_categories", "year_temp")