Skip to content

Commit

Permalink
Adjust executor (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariotaddeucci authored Nov 18, 2024
1 parent fa60686 commit 877c160
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 44 deletions.
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.11
5 changes: 1 addition & 4 deletions examples/jobs/simple_app/simple_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ def get_json(url: str, logger: Logger = None):


@gyjd
def example_parallel_requests(
strategy: str,
logger: Logger = None,
):
def example_parallel_requests(strategy: str, logger: Logger = None):
logger.info("Starting delayed requests test")
start_at = time.monotonic()

Expand Down
4 changes: 2 additions & 2 deletions examples/jobs/simple_app/simple_app.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[gyjd.job]
name = "new_simple_app"
name = "simple_app"
description = "A simple app that does nothing"
script = "simple_app.py"
python_version = "3.11"
dependencies = ["gyjd", "requests"]
dependencies = ["gyjd>0.1.0dev10", "requests<3"]
tags = ["simple", "app"]

[gyjd.job.schedule.cron.default]
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ build-backend = "hatchling.build"

[project]
name = "gyjd"
dynamic = ["version"]
version = "0.1.0dev14"
#dynamic = ["version"]
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
Expand Down
15 changes: 5 additions & 10 deletions src/gyjd/cli/apps/jobs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,12 @@

@app.command(help="Start dagster server.")
def server(
scripts_path: Annotated[
target: Annotated[
Path,
typer.Option(
exists=True,
file_okay=False,
dir_okay=True,
resolve_path=True,
typer.Argument(
help="Path that contains the scripts to run.",
default="scripts",
),
],
] = Path("scripts"),
):
repos_dir = Path(__file__).parent / "repos"

Expand All @@ -32,7 +27,7 @@ def server(
command.extend(["-f", str(repo)])

envs = os.environ.copy()
envs["GYJD_SCRIPTS_PATH"] = str(scripts_path.absolute())
envs["GYJD_SCRIPTS_PATH"] = str(target.absolute())

process = subprocess.run(command, stdout=None, stderr=None, text=True, env=envs)

Expand All @@ -42,6 +37,6 @@ def server(
@app.command(help="Create python script job.")
def create_script(
name: Annotated[str, typer.Argument(help="Name of the script.")],
python_version: Annotated[str, typer.Option(help="Python version to use.", prompt=True, default="3.11")],
python_version: Annotated[str, typer.Option(help="Python version to use.", prompt=True)] = "3.11",
):
print("Deleting user: Hiro Hamada")
51 changes: 26 additions & 25 deletions src/gyjd/cli/apps/jobs/repos/gyjd_scripts.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import os
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path

import toml
from dagster import Field, OpExecutionContext, ScheduleDefinition, Shape, job, op, repository


@dataclass
class ScriptConfig:
script_path: str
script_name: str
python_version: str
dependencies: list[str] = field(default_factory=list)


@op(
config_schema=Shape(
{
Expand Down Expand Up @@ -39,20 +48,7 @@ def run_python_script(context: OpExecutionContext):

envs["LOG_FORMATTER"] = f"{logger_prefix} - %(levelname)s - %(message)s"

process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=envs,
)

if process.stdout:
for line in process.stdout:
line = line.strip()
if line.startswith(logger_prefix):
level, message = line.removeprefix(logger_prefix).strip().split(" - ", 1)
getattr(context.log, level.lower())(message)
process = subprocess.Popen(command, stdout=None, stderr=None, text=True, env=envs)

exit_code = process.wait()
if exit_code != 0:
Expand All @@ -62,16 +58,16 @@ def run_python_script(context: OpExecutionContext):


# Função para criar um job para cada script
def create_job_for_script(script_name: str, script_path: str):
@job(name=script_name)
def create_job_for_script(config: ScriptConfig):
@job(name=config.script_name)
def dynamic_job():
run_python_script.configured(
{
"script_path": script_path,
"python_version": "3.11",
"dependencies": ["gyjd", "requests<3"],
"script_path": config.script_path,
"python_version": config.python_version,
"dependencies": config.dependencies,
},
name=f"{script_name}_op",
name=f"{config.script_name}_op",
)()

return dynamic_job
Expand All @@ -88,10 +84,16 @@ def generate_definitions(scripts_path: Path):
if "script" not in script_config:
continue

script_path: Path = config_path.parent / script_config["script"]
script_name = script_config.get("name", script_path.stem)
script_path = (scripts_path / config_path).parent / script_config["script"]

config = ScriptConfig(
script_path=str(script_path),
script_name=script_config.get("name", script_path.stem),
python_version=script_config.get("python_version", f"{sys.version_info.major}.{sys.version_info.minor}"),
dependencies=script_config.get("dependencies", []),
)

job = create_job_for_script(script_name, str(script_path))
job = create_job_for_script(config=config)

yield job

Expand All @@ -108,11 +110,10 @@ def generate_definitions(scripts_path: Path):
job=job,
cron_schedule=cron_expression,
execution_timezone=cron_timezone,
name=f"{script_name}_{schedule_name}_schedule",
name=f"{config.script_name}_{schedule_name}_schedule",
)


# Repositório contendo os jobs e schedules
@repository(
name="scripts_repository",
description="Repositório com jobs e schedules para scripts Python em ambientes isolados utilizando uv coordenadamente",
Expand Down
7 changes: 5 additions & 2 deletions src/gyjd/config.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import time
from dataclasses import dataclass, field
from datetime import datetime
from os import getenv
from uuid import uuid4


@dataclass
class LoggerConfig:
name: str = "gyjd"
level: str = "$Env:LOG_LEVEL|INFO"
level: str = field(default_factory=lambda: getenv("LOG_LEVEL", "INFO"))
default_to_console: bool = True
formatter: str = "$Env:LOG_FORMATTER|%(asctime)s - %(name)s - %(levelname)s - %(message)s"
formatter: str = field(
default_factory=lambda: getenv("LOG_FORMAT", "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)


@dataclass
Expand Down

0 comments on commit 877c160

Please sign in to comment.