diff --git a/.github/workflows/pip_publish.yml b/.github/workflows/pip_publish.yml new file mode 100644 index 0000000..b7a2eeb --- /dev/null +++ b/.github/workflows/pip_publish.yml @@ -0,0 +1,40 @@ +name: Publish Python Package + +on: + push: + branches: + - main + tags: + - 'v*.*.*' + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.9' # or any other version you want to use + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine pytest + + - name: Run tests + run: pytest tests/ + + - name: Build package + run: python -m build + + - name: Publish to PyPI + if: startsWith(github.ref, 'refs/tags') + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} + run: | + twine upload dist/* diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..4458311 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,20 @@ +# Include the LICENSE file +include LICENSE + +# Include the README and other documentation files +include README.md +include faq.md + +# Include example configurations +# recursive-include example *.yml + +# Include all source files in the package +recursive-include src *.py + +# Include test files +recursive-include tests *.py + +# Exclude unwanted files +global-exclude *.pyc +global-exclude __pycache__ +global-exclude .DS_Store diff --git a/README.md b/README.md index c3f488f..735e161 100644 --- a/README.md +++ b/README.md @@ -1 +1,243 @@ -# aws-resource-scheduler \ No newline at end of file +# AWS Resource Scheduler + +AWS Resource Scheduler is an open-source Python module that automates the start and stop operations for various AWS resources, including EC2 instances, Auto Scaling Groups (ASG), ECS services, RDS databases, and Aurora clusters. + +## Features + +- Create a bundle of resources using names or tags that need to be started or stopped for a project or team. +- Combine resources from multiple regions or accounts. +- The application checks for resources to become healthy before moving to the next resource, allowing you to decide the sequence and dependency of resources. +- Doesn't require any changes to Tags or infrastructure, making it compatible with resources managed by IaC tools like CDK or Terraform. +- Start and stop AWS resources like EC2 instances, RDS databases, Aurora clusters. +- Scale up and down Auto Scaling Groups and ECS services. +- Schedule operations based on predefined configurations. +- Send notifications to Google Chat, Slack, or Microsoft Teams. + +## Installation + +```bash +pip install aws-resource-scheduler +``` + +### Configuration + +Create a configuration like below, based on your need. +You can keep more then workload in the same file an decide which one to use for the action. + +```yaml +workspaces: + stage: + aws_region: us-west-2 + role_arn: arn:aws:iam::123456789012:role/SchedulerRole + storage: + method: parameter_store # Options: 'parameter_store' or 'dynamodb' to store last min,max,desire value for ecs and asg + dynamodb_table: 'ResourceSchedulerTable' # Required if method is 'dynamodb' + notification: + enable: true + platform: google + webhook_url: https://chat.googleapis.com/v1/spaces/XXX/messages?key=YYY&token=ZZZ + ec2: + name: + - instance1 + - instance2 + tags: + Environment: development + asg: + name: + - asg1 + - asg2 + ecs: + my-cluster: + - service2 + services: + - service1 + tags: + name: service2 + rds: + name: + - db-instance1 + - db-instance2 + aurora: + name: + - aurora-cluster1 + tags: + Environment: development +``` +Use service like YAML Checker to validate your yml config. Also use the status action to make sure that you are targeting correct resource with tags config. + +### Arguments +-f, --filename: The configuration file +-w, --workspace: The workspace to use from the config file +-r, --resource: Comma-separated list of AWS resources (e.g., ec2, rds, asg, ecs, aurora) +-a, --action: The action to perform (start, stop, status) +-n, --no-wait: Do not wait for resources to reach desired state after starting or stopping +-t, --threads: Number of threads to use for parallel operations (default: 10) + +### Example Usage +To stop EC2 instances, ASG, and ECS services in the stage workspace: +```bash +aws-resource-scheduler -f config-stage.yml -w stage -r ec2,rds,asg,ecs -a stop +``` + +To start EC2 instances, ASG, and ECS services: +```bash +aws-resource-scheduler -f config-stage.yml -w stage -r ec2,asg,ecs -a start +``` + +### IAM Role and Permission + +To securely interact with AWS resources, create an IAM role with the necessary permissions. Follow these steps: + + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "SSMDescribeParameters", + "Effect": "Allow", + "Action": [ + "ssm:DescribeParameters" + ], + "Resource": "*" + }, + { + "Sid": "SSMGetPutParameters", + "Effect": "Allow", + "Action": [ + "ssm:GetParameter", + "ssm:PutParameter" + ], + "Resource": "arn:aws:ssm:*:*:parameter/scheduler/*" + }, + { + "Sid": "EC2DescribeInstances", + "Effect": "Allow", + "Action": [ + "ec2:DescribeInstances", + "ec2:DescribeTags" + ], + "Resource": "*" + }, + { + "Sid": "EC2StartStopInstances", + "Effect": "Allow", + "Action": [ + "ec2:StartInstances", + "ec2:StopInstances" + ], + "Resource": "*" + }, + { + "Sid": "RDSDescribeInstances", + "Effect": "Allow", + "Action": [ + "rds:DescribeDBInstances", + "rds:ListTagsForResource" + ], + "Resource": "*" + }, + { + "Sid": "RDSStartStopInstances", + "Effect": "Allow", + "Action": [ + "rds:StartDBInstance", + "rds:StopDBInstance" + ], + "Resource": "arn:aws:rds:*:*:db:*" + }, + { + "Sid": "RDSDescribeClusters", + "Effect": "Allow", + "Action": [ + "rds:DescribeDBClusters", + "rds:ListTagsForResource" + ], + "Resource": "*" + }, + { + "Sid": "RDSStartStopClusters", + "Effect": "Allow", + "Action": [ + "rds:StartDBCluster", + "rds:StopDBCluster" + ], + "Resource": "arn:aws:rds:*:*:cluster:*" + }, + { + "Sid": "AutoScalingDescribe", + "Effect": "Allow", + "Action": [ + "autoscaling:DescribeAutoScalingGroups", + "application-autoscaling:DescribeScalableTargets", + "application-autoscaling:RegisterScalableTarget", + "application-autoscaling:DeregisterScalableTarget", + "application-autoscaling:DescribeScalingPolicies", + "application-autoscaling:PutScalingPolicy" + ], + "Resource": "*" + }, + { + "Sid": "AutoScalingUpdateGroups", + "Effect": "Allow", + "Action": [ + "autoscaling:UpdateAutoScalingGroup" + ], + "Resource": "arn:aws:autoscaling:*:*:autoScalingGroup:*:autoScalingGroupName/*" + }, + { + "Sid": "ECSDescribeServices", + "Effect": "Allow", + "Action": [ + "ecs:DescribeServices", + "ecs:ListTagsForResource", + "ecs:ListServices" + ], + "Resource": "*" + }, + { + "Sid": "ECSUpdateServices", + "Effect": "Allow", + "Action": [ + "ecs:UpdateService" + ], + "Resource": "arn:aws:ecs:*:*:service/*" + }, + { + "Sid": "DynamodbStorage", + "Effect": "Allow", + "Action": [ + "dynamodb:PutItem", + "dynamodb:GetItem", + "dynamodb:UpdateItem" + ], + "Resource": "arn:aws:dynamodb:*:*:table/ResourceSchedulerTable" + } + ] +} +``` + +You can use Start and stop actions are allowed only on instances tagged with scheduler=true. +Other Services (RDS, Auto Scaling Groups, ECS): Similar tag-based restrictions are applied. + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "EC2StartStopInstances", + "Effect": "Allow", + "Action": [ + "ec2:StartInstances", + "ec2:StopInstances" + ], + "Resource": "*", + "Condition": { + "StringEquals": { + "ec2:ResourceTag/scheduler": "true" + } + } + } + ] +} +``` \ No newline at end of file diff --git a/faq.md b/faq.md new file mode 100644 index 0000000..cc7703a --- /dev/null +++ b/faq.md @@ -0,0 +1,32 @@ +## FAQ for AWS Resource Scheduler +Based on the common issues, available solutions, and user concerns with AWS resource scheduling tools like Lambda-based solutions, AWS Systems Manager Quick Setup, and other methods, here's a detailed FAQ for the AWS Resource Scheduler module: + +1. Why not make scheduling part of Infrastructure as Code (IaC) using tools like Terraform or CDK? +Answer: Integrating scheduling directly into IaC tools like Terraform or CDK can make your setup rigid, especially when schedules need frequent changes. AWS Resource Scheduler decouples the scheduling logic from your core infrastructure, allowing you to modify start/stop schedules without needing to redeploy the entire infrastructure. This flexibility is essential for scenarios like changing business hours or project needs, where start/stop schedules need to adapt quickly. + +2. How is AWS Resource Scheduler different from using AWS Lambda functions with tags? +Answer: Lambda functions can be configured to start/stop instances based on tags, but this typically requires you to write custom scripts and manage the invocation frequency, such as through CloudWatch Events​(Amazon Web Services, Inc.). AWS Resource Scheduler provides a more user-friendly, centralized configuration using YAML files and can manage multiple AWS services (e.g., EC2, RDS, ECS) across regions and accounts with a unified approach. It abstracts the scripting and maintenance of the Lambda logic. + +3. How does AWS Resource Scheduler compare to AWS Systems Manager Quick Setup? +Answer: AWS Systems Manager Quick Setup is an easy way to set up schedules for EC2 instances based on tags​(Amazon AWS Docs). However, it is limited primarily to EC2 and focuses on simple start/stop actions. AWS Resource Scheduler offers broader resource management capabilities, such as handling ASG scaling, ECS service updates, and Aurora clusters. Additionally, it supports complex dependencies between resources, making it suitable for environments where multiple services need coordinated actions. + +4. Why not just use a Lambda function for scheduling? +Answer: While Lambda functions are a good option for lightweight, custom automation, they can become complex to manage when you need to coordinate multiple services or manage configurations across different environments. AWS Resource Scheduler handles these complexities by offering a structured way to manage schedules, including configuration files and support for both SSM Parameter Store and DynamoDB for storing state information. + +5. Can AWS Resource Scheduler manage resources across multiple AWS accounts? +Answer: Yes, AWS Resource Scheduler can manage resources across multiple accounts by leveraging AWS IAM roles for cross-account access. This allows a centralized scheduler to control resources in various accounts without needing individual scripts or Lambda functions in each account. + +6. How does it handle the startup sequence of resources with dependencies? +Answer: AWS Resource Scheduler checks the health and readiness of each resource before proceeding to the next. For example, if an RDS database must be available before starting an application hosted on EC2, the scheduler ensures this sequence is respected. This is more efficient than simple tag-based or Lambda solutions, which may require custom logic for such dependencies. + +7. What if I already have scripts for starting/stopping instances, why should I switch? +Answer: Custom scripts can be difficult to maintain as the environment grows. AWS Resource Scheduler offers a more maintainable solution by using a configuration-based approach, allowing you to change settings without modifying code. It also provides logging and error handling out of the box, making troubleshooting simpler. +8. Can AWS Resource Scheduler be used in a serverless setup? + +Answer: Yes, AWS Resource Scheduler can run from AWS Lambda when packaged into a zip file, allowing you to execute the scheduling logic without maintaining servers. This setup can reduce costs and simplifies deployment, while still benefiting from the centralized configuration. + +9. How does AWS Resource Scheduler handle configuration changes? +Answer: Configuration changes can be made directly to the YAML file or by using a versioned configuration stored in an S3 bucket. This flexibility makes it easy to update schedules without needing code changes or redeployments. For example, changing business hours can be done by editing the ci/cd pipeline, cron job or event. + +10. Why use DynamoDB instead of Parameter Store for storing state information? +Answer: Parameter Store is suitable for smaller environments where you don't need to manage a large number of records or perform complex queries. DynamoDB, on the other hand, is ideal for scaling to thousands of resources and offers more control over data access patterns. Using DynamoDB allows for centralized state storage, making it possible to manage the scheduler across multiple AWS accounts without replicating configuration data in each account. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5829744 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "aws-resource-scheduler" +version = "0.1.0" +description = "An open-source solution to start/stop AWS EC2, Autoscaling Group, RDS, Aurora, ECS, and Fargate." +readme = "README.md" +requires-python = ">=3.8" +license = { text = "MIT" } +authors = [ + { name="Nitin Bhadauria", email="nitinb@cloudstaff.com" }, +] +maintainers = [ + { name="Nitin Bhadauria", email="nitinb@cloudstaff.com" }, +] +keywords = ["aws", "scheduler", "automation", "ec2", "rds", "asg", "aurora"] +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] +dependencies = [ + "boto3>=1.20.0", + "PyYAML>=5.4", + "requests>=2.28.2", +] + +[project.urls] +Homepage = "https://github.com/cloudstaff-apps/aws-resource-scheduler" +Repository = "https://github.com/cloudstaff-apps/aws-resource-scheduler" +Documentation = "https://github.com/cloudstaff-apps/aws-resource-scheduler#readme" +BugTracker = "https://github.com/cloudstaff-apps/aws-resource-scheduler/issues" + +[project.scripts] +aws-resource-scheduler = "aws_resource_scheduler.scheduler:main" diff --git a/src/aws_resource_scheduler/__init__.py b/src/aws_resource_scheduler/__init__.py new file mode 100644 index 0000000..c69c5f2 --- /dev/null +++ b/src/aws_resource_scheduler/__init__.py @@ -0,0 +1,3 @@ +from aws_resource_scheduler.scheduler import main + +__all__ = ['main'] \ No newline at end of file diff --git a/src/aws_resource_scheduler/scheduler.py b/src/aws_resource_scheduler/scheduler.py new file mode 100644 index 0000000..04634b4 --- /dev/null +++ b/src/aws_resource_scheduler/scheduler.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- + +import logging +from datetime import datetime +from aws_resource_scheduler.utils.asg import AsgModule +from aws_resource_scheduler.utils.rds import RdsModule +from aws_resource_scheduler.utils.ec2 import Ec2Module +from aws_resource_scheduler.utils.aurora import AuroraModule +from aws_resource_scheduler.utils.ecs import EcsModule +from aws_resource_scheduler.utils.common import parse_arguments, evaluate, aws_login, send_chat_notification, Storage, ParameterStoreStorage, DynamoDBStorage + +def main(args=None): + """ + Main function to parse arguments, evaluate configuration, and perform actions + on AWS resources such as EC2, ASG, RDS, Aurora, and ECS. Also sends notifications + to the specified chat platform (Google Chat, Slack, Teams) if enabled. + """ + # Setup logging + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + + if args is None: + # Parse command-line arguments and fetch configuration + args = parse_arguments() + + workspace, resources, action, workspace_name, no_wait, threads = evaluate(args) + + # Determine storage method + storage_config = workspace.get('storage', {}) + storage_method = storage_config.get('method', 'parameter_store') + if storage_method == 'dynamodb': + table_name = storage_config.get('dynamodb_table') + if not table_name: + logging.error("DynamoDB table name not specified in configuration.") + exit(1) + + # Initialize response containers for different AWS resources + rds_scheduler_resp = [] + asg_scheduler_resp = [] + ec2_scheduler_resp = [] + aurora_scheduler_resp = [] + ecs_scheduler_resp = [] + + # Check if 'notification' block exists and then handle the notification logic + notification_settings = workspace.get("notification", {}) + notification_enabled = notification_settings.get("enable", False) + platform = notification_settings.get("platform", "google") # Default to Google Chat + webhook_url = notification_settings.get("webhook_url") + + try: + client = aws_login(workspace) + # Initialize storage handler + if storage_method == 'dynamodb': + storage = DynamoDBStorage(session=client, table_name=table_name) + else: + storage = ParameterStoreStorage(session=client) + for res in resources: + logging.info(f"Process started for {res} at: {datetime.now()}") + + try: + if res == "rds" and res in workspace: + rds_module = RdsModule(session=client, no_wait=no_wait, threads=threads) + attributes = ['DBInstanceIdentifier', 'DBInstanceStatus', 'DBInstanceClass', 'Engine', 'Endpoint'] + rds_scheduler_resp = rds_module.schedule_rds(workspace[res], action, instance_attributes=attributes) + + elif res == "asg" and res in workspace: + asg_module = AsgModule(session=client, storage=storage, workspace_name=workspace_name, no_wait=no_wait, threads=threads) + asg_scheduler_resp = asg_module.main_scheduler_asg(workspace[res], action) + + elif res == "ec2" and res in workspace: + ec2_module = Ec2Module(session=client, no_wait=no_wait, threads=threads) + attributes = ['InstanceId', 'InstanceType', 'State', 'PrivateIpAddress', 'PublicIpAddress'] + ec2_scheduler_resp = ec2_module.schedule_ec2_instances(workspace[res], action, instance_attributes=attributes) + + elif res == "aurora" and res in workspace: + aurora_module = AuroraModule(session=client, no_wait=no_wait, threads=threads) + cluster_attributes = ['DBClusterIdentifier', 'Status', 'Engine', 'Endpoint'] + aurora_scheduler_resp = aurora_module.schedule_aurora(workspace[res], action, cluster_attributes=cluster_attributes) + + elif res == "ecs" and res in workspace: + ecs_module = EcsModule(session=client, storage=storage, workspace_name=workspace_name, no_wait=no_wait, threads=threads) + ecs_scheduler_resp = ecs_module.main_scheduler_ecs(workspace[res], action) + + else: + logging.warning(f"Resource '{res}' is not supported or not configured in the workspace.") + + logging.info(f"Process ended for {res} at: {datetime.now()}") + + except Exception as e: + logging.exception(f"An error occurred while processing resource {res}: {e}") + if notification_enabled and webhook_url: + send_chat_notification(platform, webhook_url, f"An error occurred while processing resource {res}: {e}") + + except Exception as e: + logging.exception(f"An error occurred during AWS session setup or overall execution: {e}") + if notification_enabled and webhook_url: + send_chat_notification(platform, webhook_url, f"An error occurred during AWS session setup or overall execution: {e}") + return + + # Prepare the summary of actions performed on each AWS resource + data_lines = [] + + if rds_scheduler_resp: + data_lines.append(f"--------Details about RDS Total: {len(rds_scheduler_resp)} -------") + for instance in rds_scheduler_resp: + line = ", ".join(f"{key}: {value}" for key, value in instance.items()) + data_lines.append(line) + + if asg_scheduler_resp: + data_lines.append(f"--------Details about ASG Total: {len(asg_scheduler_resp)} -------") + if action == 'status': + for asg in asg_scheduler_resp: + line = ", ".join(f"{key}: {value}" for key, value in asg.items()) + data_lines.append(line) + else: + data_lines.extend(asg_scheduler_resp) + + if ec2_scheduler_resp: + data_lines.append(f"--------Details about EC2 Total: {len(ec2_scheduler_resp)} -------") + for instance in ec2_scheduler_resp: + line = ", ".join(f"{key}: {value}" for key, value in instance.items()) + data_lines.append(line) + + if aurora_scheduler_resp: + data_lines.append(f"--------Details about Aurora Total: {len(aurora_scheduler_resp)} -------") + for instance in aurora_scheduler_resp: + line = ", ".join(f"{key}: {value}" for key, value in instance.items()) + data_lines.append(line) + + if ecs_scheduler_resp: + data_lines.append(f"--------Details about ECS Total: {len(ecs_scheduler_resp)} -------") + if action == 'status': + for service in ecs_scheduler_resp: + line = ", ".join(f"{key}: {value}" for key, value in service.items()) + data_lines.append(line) + else: + data_lines.extend(ecs_scheduler_resp) + + summary = "\n".join(data_lines) + logging.info(summary) + + # Send the summary as a chat notification if enabled + if notification_enabled and webhook_url: + send_chat_notification(platform, webhook_url, summary) + + +if __name__ == '__main__': + main() diff --git a/src/aws_resource_scheduler/utils/__init__.py b/src/aws_resource_scheduler/utils/__init__.py new file mode 100644 index 0000000..11efa9c --- /dev/null +++ b/src/aws_resource_scheduler/utils/__init__.py @@ -0,0 +1,22 @@ +# Expose utility classes and functions +from aws_resource_scheduler.utils.asg import AsgModule +from aws_resource_scheduler.utils.rds import RdsModule +from aws_resource_scheduler.utils.ec2 import Ec2Module +from aws_resource_scheduler.utils.aurora import AuroraModule +from aws_resource_scheduler.utils.ecs import EcsModule +from aws_resource_scheduler.utils.common import parse_arguments, evaluate, aws_login, send_chat_notification, Storage, ParameterStoreStorage, DynamoDBStorage + +__all__ = [ + 'AsgModule', + 'RdsModule', + 'Ec2Module', + 'AuroraModule', + 'EcsModule', + 'parse_arguments', + 'evaluate', + 'aws_login', + 'send_chat_notification', + 'Storage', + 'ParameterStoreStorage', + 'DynamoDBStorage' +] diff --git a/src/aws_resource_scheduler/utils/asg.py b/src/aws_resource_scheduler/utils/asg.py new file mode 100644 index 0000000..f368b6d --- /dev/null +++ b/src/aws_resource_scheduler/utils/asg.py @@ -0,0 +1,191 @@ +# -*- coding: utf-8 -*- +import logging +import time +from concurrent.futures import ThreadPoolExecutor + +class AsgModule: + """ + Manages Auto Scaling Groups (ASG) actions such as start, stop, and status operations. + """ + + def __init__(self, session, storage, workspace_name, no_wait=False, threads=10): + self.session = session + self.client = self.session.client('autoscaling') + self.storage = storage + self.scheduler_summary_message = [] + self.workspace_name = workspace_name + self.no_wait = no_wait + self.threads = threads + + def get_asg_by_name(self, asg_name): + """ + Retrieves ASG details by name. + """ + try: + response = self.client.describe_auto_scaling_groups( + AutoScalingGroupNames=[asg_name] + ) + asg = response['AutoScalingGroups'][0] + data = { + "asg_name": asg_name, + "min_size": asg['MinSize'], + "max_size": asg['MaxSize'], + "desired_capacity": asg['DesiredCapacity'], + "instance_count": len(asg['Instances']), + "status": asg['Status'] if 'Status' in asg else 'InService', + } + return {"success": True, "data": data} + except Exception as e: + logging.error(f"Failed to get ASG by name: {e}") + return {"success": False, "data": f"Scheduler failed for: {asg_name}. Please check it."} + + def get_asg_by_tags(self, tags): + """ + Retrieves a list of ASGs that match the given tags. + """ + paginator = self.client.get_paginator('describe_auto_scaling_groups') + asg_list = [] + + for page in paginator.paginate(): + for asg in page['AutoScalingGroups']: + asg_tags = {tag['Key']: tag['Value'] for tag in asg.get('Tags', [])} + if all(asg_tags.get(k) == v for k, v in tags.items()): + asg_list.append(asg['AutoScalingGroupName']) + return asg_list + + def main_scheduler_asg(self, data, action): + """ + Main function to handle ASG actions (start, stop, status). + """ + name_asg_list = data.get('name', []) + tag_asg_list = [] + + if 'tags' in data: + tag_asg_list = self.get_asg_by_tags(data['tags']) + + final_asg_list = list(set(name_asg_list + tag_asg_list)) + asg_data_list = [] + + for asg in final_asg_list: + resp = self.get_asg_by_name(asg) + if resp['success']: + asg_data_list.append(resp['data']) + + # Return the length based on actual unique ASG data + if action == 'status': + return asg_data_list + + self.update_asg(asg_data_list, action) + return self.scheduler_summary_message + + def update_asg(self, asg_details_list, action): + """ + Updates the ASGs based on the action (start, stop). + """ + with ThreadPoolExecutor(max_workers=self.threads) as executor: + if action == "start": + executor.map(self.start_asg, asg_details_list) + elif action == "stop": + executor.map(self.stop_asg, asg_details_list) + + def start_asg(self, asg_data): + asg_name = asg_data['asg_name'] + + # Check current ASG state before starting + if asg_data['desired_capacity'] > 0: + logging.info(f"ASG {asg_name} is already running with desired capacity: {asg_data['desired_capacity']}.") + self.scheduler_summary_message.append(f"ASG {asg_name} is already in a running state.") + return + + # Read desired capacity, min_size, max_size from storage + parameter_name = f"/scheduler/{self.workspace_name}/asg/{asg_name}" + value = self.storage.read_state(parameter_name) + + if value and len(value) == 4: + _, desired_capacity, min_size, max_size = value + desired_capacity = int(desired_capacity) + min_size = int(min_size) + max_size = int(max_size) + else: + logging.error(f"No stored data found for ASG {asg_name}. Cannot start.") + self.scheduler_summary_message.append(f"ASG {asg_name} cannot be started due to missing stored data.") + return + + # Update the ASG to the desired settings + self.client.update_auto_scaling_group( + AutoScalingGroupName=asg_name, + MinSize=min_size, + MaxSize=max_size, + DesiredCapacity=desired_capacity + ) + if not self.no_wait: + self.check_instance_start_status(asg_data) + else: + self.scheduler_summary_message.append(f"ASG {asg_name} start initiated (no-wait mode).") + + def stop_asg(self, asg_data): + asg_name = asg_data['asg_name'] + + # Check current ASG state before stopping + if asg_data['desired_capacity'] == 0: + logging.info(f"ASG {asg_name} is already stopped with desired capacity: 0.") + self.scheduler_summary_message.append(f"ASG {asg_name} is already in a stopped state.") + return + + # Store current desired capacity, min_size, max_size to storage + parameter_name = f"/scheduler/{self.workspace_name}/asg/{asg_name}" + desired_capacity = str(asg_data['desired_capacity']) + min_size = str(asg_data['min_size']) + max_size = str(asg_data['max_size']) + value = [asg_name, desired_capacity, min_size, max_size] + self.storage.write_state(parameter_name, value) + + # Update the ASG to zero to stop it + self.client.update_auto_scaling_group( + AutoScalingGroupName=asg_name, + MinSize=0, MaxSize=0, DesiredCapacity=0 + ) + if not self.no_wait: + self.check_instance_stop_status(asg_data) + else: + self.scheduler_summary_message.append(f"ASG {asg_name} stop initiated (no-wait mode).") + + def check_instance_start_status(self, asg_data): + """ + Check the start status of instances in the ASG. + """ + asg_name = asg_data['asg_name'] + desired_count = asg_data['desired_capacity'] + instance_count = 0 + + while instance_count < desired_count: + instance_list = self.get_asg_instance_health(asg_name) + instance_count = sum(1 for instance in instance_list if instance['LifecycleState'] == "InService") + time.sleep(10) + + self.scheduler_summary_message.append(f"ASG {asg_name} is fully started!") + + def check_instance_stop_status(self, asg_data): + """ + Check the stop status of instances in the ASG. + """ + asg_name = asg_data['asg_name'] + while True: + response = self.client.describe_auto_scaling_groups( + AutoScalingGroupNames=[asg_name] + ) + instances = response['AutoScalingGroups'][0]['Instances'] + if not instances: + break + time.sleep(10) + self.scheduler_summary_message.append(f"ASG {asg_name} is fully stopped!") + + def get_asg_instance_health(self, asg_name): + """ + Retrieves the health status of instances in the ASG. + """ + response = self.client.describe_auto_scaling_groups( + AutoScalingGroupNames=[asg_name] + ) + asg = response['AutoScalingGroups'][0] + return asg.get('Instances', []) diff --git a/src/aws_resource_scheduler/utils/aurora.py b/src/aws_resource_scheduler/utils/aurora.py new file mode 100644 index 0000000..d715ef4 --- /dev/null +++ b/src/aws_resource_scheduler/utils/aurora.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +import logging +import time +from concurrent.futures import ThreadPoolExecutor + + +class AuroraModule: + """ + Manages Aurora cluster scheduling tasks such as start, stop, and checking cluster status. + """ + + def __init__(self, session, no_wait=False, threads=10): + """ + Initializes the Aurora module with a provided AWS session. + """ + self.session = session + self.client = self.session.client('rds') + self.scheduler_summary_message = [] + self.no_wait = no_wait + self.threads = threads + + def describe_aurora_clusters(self, cluster_identifiers=None, tags=None, cluster_attributes=None): + """ + Describes the Aurora clusters based on identifiers or tags. + """ + response = self.client.describe_db_clusters() + aurora_clusters = [] + + for cluster in response['DBClusters']: + if cluster_identifiers and cluster['DBClusterIdentifier'] not in cluster_identifiers: + continue + if tags: + response_tags = self.client.list_tags_for_resource(ResourceName=cluster['DBClusterArn']) + tags_dict = {tag['Key']: tag['Value'] for tag in response_tags.get('TagList', [])} + if not all(tags_dict.get(k) == v for k, v in tags.items()): + continue + + cluster_data = {'DBClusterIdentifier': cluster['DBClusterIdentifier']} + if cluster_attributes: + for attribute in cluster_attributes: + if attribute == 'Endpoint': + cluster_data['Endpoint'] = cluster.get('Endpoint') + else: + cluster_data[attribute] = cluster.get(attribute) + aurora_clusters.append(cluster_data) + + return aurora_clusters + + def wait_status(self, cluster_identifier, expected_status): + while True: + status = self.cluster_status(cluster_identifier) + if status == expected_status: + return + logging.info(f"Waiting for {cluster_identifier} to reach {expected_status} state...") + time.sleep(30) + + def start_cluster(self, cluster_identifier): + """ + Starts the specified Aurora cluster if not already started. + """ + current_status = self.cluster_status(cluster_identifier) + if current_status in ["available", "starting"]: + self.scheduler_summary_message.append(f"{cluster_identifier} is already Available or Starting.") + return + self.client.start_db_cluster(DBClusterIdentifier=cluster_identifier) + if not self.no_wait: + self.wait_status(cluster_identifier, "available") + self.scheduler_summary_message.append(f"{cluster_identifier} started successfully.") + else: + self.scheduler_summary_message.append(f"{cluster_identifier} start initiated (no-wait mode).") + + def stop_cluster(self, cluster_identifier): + """ + Stops the specified Aurora cluster if not already stopped. + """ + current_status = self.cluster_status(cluster_identifier) + if current_status in ["stopped", "stopping"]: + self.scheduler_summary_message.append(f"{cluster_identifier} is already in Stopping or Stopped state.") + return + self.client.stop_db_cluster(DBClusterIdentifier=cluster_identifier) + if not self.no_wait: + self.wait_status(cluster_identifier, "stopped") + self.scheduler_summary_message.append(f"{cluster_identifier} stopped successfully.") + else: + self.scheduler_summary_message.append(f"{cluster_identifier} stop initiated (no-wait mode).") + + def cluster_status(self, cluster_identifier): + response = self.client.describe_db_clusters( + DBClusterIdentifier=cluster_identifier + ) + return response['DBClusters'][0]['Status'] + + def schedule_aurora(self, data, action, cluster_attributes=None): + aurora_clusters = [] + + if 'name' in data: + identifiers = data['name'] if isinstance(data['name'], list) else [data['name']] + aurora_clusters.extend(self.describe_aurora_clusters(cluster_identifiers=identifiers, cluster_attributes=cluster_attributes)) + + if 'tags' in data: + aurora_clusters.extend(self.describe_aurora_clusters(tags=data['tags'], cluster_attributes=cluster_attributes)) + + unique_clusters = {cluster['DBClusterIdentifier']: cluster for cluster in aurora_clusters} + + if action == 'status': + return list(unique_clusters.values()) + + with ThreadPoolExecutor(max_workers=self.threads) as executor: + if action == "stop": + executor.map(self.stop_cluster, unique_clusters.keys()) + elif action == "start": + executor.map(self.start_cluster, unique_clusters.keys()) + + # Collect cluster details for summary + cluster_details = [] + for cluster_id in unique_clusters.keys(): + cluster_info = { + 'DBClusterIdentifier': cluster_id, + 'action': action, + 'Status': self.cluster_status(cluster_id) + } + cluster_details.append(cluster_info) + + return cluster_details diff --git a/src/aws_resource_scheduler/utils/common.py b/src/aws_resource_scheduler/utils/common.py new file mode 100644 index 0000000..61dd29a --- /dev/null +++ b/src/aws_resource_scheduler/utils/common.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +import yaml +import argparse +import boto3 +import random +import requests +import logging +from botocore.exceptions import ClientError + +def get_config(file): + """ + Reads and returns the content of the YAML configuration file. + """ + try: + with open(file, 'r') as f: + config = yaml.safe_load(f) + return config + except Exception as e: + logging.error(f"Unable to open the YAML config file {file}. Error: {str(e)}") + return None + +def parse_arguments(): + """ + Parses command-line arguments for the script. + """ + parser = argparse.ArgumentParser(description="AWS Resource Scheduler") + parser.add_argument("-f", "--filename", dest="file", help="Name of config file", required=True) + parser.add_argument("-w", "--workspace", dest="workspace", help="Workspace name", required=True) + parser.add_argument("-r", "--resource", dest="resource", help="Comma-separated AWS resources (e.g., rds, ec2, asg)", required=True) + parser.add_argument("-a", "--action", dest="action", choices=['start', 'stop', 'status'], help="Action to perform", required=True) + parser.add_argument("-n", "--no-wait", dest="no_wait", action='store_true', help="Do not wait for resources to reach desired state after starting or stopping") + parser.add_argument("-t", "--threads", dest="threads", type=int, default=10, help="Number of threads to use for parallel operations (default: 10)") + + args = parser.parse_args() + logging.info(f"Config Filename: {args.file}, Workspace: {args.workspace}, Resource list: {args.resource.split(',')}, Action: {args.action}, No Wait: {args.no_wait}, Threads: {args.threads}") + return args + +def evaluate(args): + """ + Processes the parsed arguments and retrieves the workspace configuration from the YAML file. + """ + environment = args.workspace + resources = args.resource.split(',') + action = args.action + config_yaml = args.file + no_wait = args.no_wait + threads = args.threads + + config = get_config(config_yaml) + if not config: + logging.error("Configuration could not be loaded.") + exit(1) + + config_workspace = config.get('workspaces', {}).get(environment) + if not config_workspace: + logging.error(f"Workspace '{environment}' not found in the configuration file.") + exit(1) + + return config_workspace, resources, action, args.workspace, no_wait, threads + +def aws_login(workspace): + """ + Creates an AWS session using the provided workspace configuration. + """ + region = workspace.get("aws_region") + session = boto3.Session(region_name=region) + + if "role_arn" in workspace: + sts = session.client("sts") + role = workspace["role_arn"] + try: + response = sts.assume_role(RoleArn=role, RoleSessionName=f"scheduler_{random.randint(1, 999)}") + session = boto3.Session( + aws_access_key_id=response['Credentials']['AccessKeyId'], + aws_secret_access_key=response['Credentials']['SecretAccessKey'], + aws_session_token=response['Credentials']['SessionToken'], + region_name=region + ) + except Exception as e: + logging.error(f"Failed to assume role: {str(e)}") + exit(1) + return session + +def send_chat_notification(platform, webhook_url, message): + """ + Sends a notification to the specified platform (Slack, Google Chat, Teams) using a webhook. + """ + if platform == "teams": + payload = { + "@type": "MessageCard", + "@context": "http://schema.org/extensions", + "text": message + } + else: + payload = {"text": message} + + headers = { + "Content-Type": "application/json" + } + + try: + response = requests.post(webhook_url, json=payload, headers=headers) + if response.status_code == 200: + logging.info(f"{platform.capitalize()} notification sent successfully.") + return True + else: + logging.error(f"Failed to send {platform.capitalize()} notification. Status code: {response.status_code}") + return False + except Exception as e: + logging.error(f"An error occurred while sending the {platform.capitalize()} notification: {str(e)}") + return False + +# Storage classes +class Storage: + def read_state(self, key): + raise NotImplementedError + + def write_state(self, key, value): + raise NotImplementedError + +class ParameterStoreStorage(Storage): + def __init__(self, session): + self.client = session.client('ssm') + + def read_state(self, key): + try: + response = self.client.get_parameter(Name=key) + value = response['Parameter']['Value'].split(',') + logging.info(f"Parameter {key} read from Parameter Store.") + return value + except self.client.exceptions.ParameterNotFound: + logging.info(f"Parameter {key} not found.") + return [] + except ClientError as e: + logging.error(f"Failed to read parameter {key}: {str(e)}") + return [] + + def write_state(self, key, value): + try: + self.client.put_parameter( + Name=key, + Value=",".join(value), + Type="StringList", + Overwrite=True + ) + logging.info(f"Parameter {key} updated in Parameter Store.") + except ClientError as e: + logging.error(f"Failed to write parameter {key}: {str(e)}") + +class DynamoDBStorage(Storage): + def __init__(self, session, table_name): + self.client = session.client('dynamodb') + self.table_name = table_name + + def read_state(self, key): + try: + response = self.client.get_item( + TableName=self.table_name, + Key={'ResourceKey': {'S': key}} + ) + if 'Item' in response: + value = response['Item']['Value']['S'].split(',') + logging.info(f"Item {key} read from DynamoDB table {self.table_name}.") + return value + else: + logging.info(f"Item {key} not found in DynamoDB table {self.table_name}.") + return [] + except ClientError as e: + logging.error(f"Failed to read item {key} from DynamoDB: {str(e)}") + return [] + + def write_state(self, key, value): + try: + self.client.put_item( + TableName=self.table_name, + Item={ + 'ResourceKey': {'S': key}, + 'Value': {'S': ",".join(value)} + } + ) + logging.info(f"Item {key} updated in DynamoDB table {self.table_name}.") + except ClientError as e: + logging.error(f"Failed to write item {key} to DynamoDB: {str(e)}") diff --git a/src/aws_resource_scheduler/utils/ec2.py b/src/aws_resource_scheduler/utils/ec2.py new file mode 100644 index 0000000..c253694 --- /dev/null +++ b/src/aws_resource_scheduler/utils/ec2.py @@ -0,0 +1,148 @@ +# src/aws_resource_scheduler/utils/ec2.py + +import logging +import time +from concurrent.futures import ThreadPoolExecutor +from botocore.exceptions import ClientError + +class Ec2Module: + """ + Manages EC2 instances' start, stop, and scheduling operations. + """ + + def __init__(self, session, no_wait=False, threads=10): + """ + Initializes the EC2 module with the provided AWS session. + """ + self.session = session + self.client = self.session.client('ec2') + self.scheduler_summary_message = [] + self.no_wait = no_wait + self.threads = threads + + def describe_ec2_instances(self, instance_ids=None, tags=None, instance_attributes=None): + """ + Describes EC2 instances based on instance IDs or tags. + """ + filters = [] + if tags: + filters.extend([{'Name': f'tag:{k}', 'Values': [v]} for k, v in tags.items()]) + + try: + response = self.client.describe_instances( + InstanceIds=instance_ids or [], + Filters=filters + ) + except ClientError as e: + logging.error(f"Failed to describe EC2 instances: {str(e)}") + return [] + + ec2_instances = [] + for reservation in response['Reservations']: + for instance in reservation['Instances']: + instance_data = {'InstanceId': instance['InstanceId']} + if instance_attributes: + for attribute in instance_attributes: + if attribute == 'State': + instance_data['State'] = instance['State']['Name'] + else: + instance_data[attribute] = instance.get(attribute) + ec2_instances.append(instance_data) + + return ec2_instances + + def wait_status(self, instance_id, expected_status): + """ + Wait until the EC2 instance reaches the desired status. + """ + while True: + status = self.instance_status(instance_id) + if status == expected_status: + return + time.sleep(15) + + def start_instance(self, instance_id): + """ + Start the specified EC2 instance if not already running. + """ + try: + current_status = self.instance_status(instance_id) + if current_status in ["running", "pending"]: + self.scheduler_summary_message.append(f"{instance_id} is already Running or Starting.") + return + self.client.start_instances(InstanceIds=[instance_id]) + if not self.no_wait: + self.wait_status(instance_id, "running") + self.scheduler_summary_message.append(f"{instance_id} started successfully.") + else: + self.scheduler_summary_message.append(f"{instance_id} start initiated (no-wait mode).") + except ClientError as e: + logging.error(f"Failed to start EC2 instance {instance_id}: {str(e)}") + self.scheduler_summary_message.append(f"Error starting {instance_id}: {str(e)}") + + def stop_instance(self, instance_id): + """ + Stop the specified EC2 instance if not already stopped. + """ + try: + current_status = self.instance_status(instance_id) + if current_status in ["stopped", "stopping"]: + self.scheduler_summary_message.append(f"{instance_id} is already in Stopping or Stopped state.") + return + self.client.stop_instances(InstanceIds=[instance_id]) + if not self.no_wait: + self.wait_status(instance_id, "stopped") + self.scheduler_summary_message.append(f"{instance_id} stopped successfully.") + else: + self.scheduler_summary_message.append(f"{instance_id} stop initiated (no-wait mode).") + except ClientError as e: + logging.error(f"Failed to stop EC2 instance {instance_id}: {str(e)}") + self.scheduler_summary_message.append(f"Error stopping {instance_id}: {str(e)}") + + def instance_status(self, instance_id): + """ + Check the current status of the instance. + """ + try: + response = self.client.describe_instances(InstanceIds=[instance_id]) + return response['Reservations'][0]['Instances'][0]['State']['Name'] + except ClientError as e: + logging.error(f"Failed to get status for EC2 instance {instance_id}: {str(e)}") + return "unknown" + + def schedule_ec2_instances(self, data, action, instance_attributes=None): + """ + Main EC2 scheduler function to handle instance start/stop/status based on the configuration. + """ + ec2_instances = [] + + if 'name' in data: + instance_ids = data['name'] if isinstance(data['name'], list) else [data['name']] + ec2_instances.extend(self.describe_ec2_instances(instance_ids=instance_ids, instance_attributes=instance_attributes)) + + if 'tags' in data: + ec2_instances.extend(self.describe_ec2_instances(tags=data['tags'], instance_attributes=instance_attributes)) + + unique_instances = {instance['InstanceId']: instance for instance in ec2_instances} + + if action == 'status': + return list(unique_instances.values()) + + with ThreadPoolExecutor(max_workers=self.threads) as executor: + if action == "stop": + executor.map(self.stop_instance, unique_instances.keys()) + elif action == "start": + executor.map(self.start_instance, unique_instances.keys()) + + # Collect instance details for summary + instance_details = [] + for instance_id in unique_instances.keys(): + status = self.instance_status(instance_id) + instance_info = { + 'InstanceId': instance_id, + 'action': action, + 'Status': status + } + instance_details.append(instance_info) + + return instance_details diff --git a/src/aws_resource_scheduler/utils/ecs.py b/src/aws_resource_scheduler/utils/ecs.py new file mode 100644 index 0000000..93ed1b1 --- /dev/null +++ b/src/aws_resource_scheduler/utils/ecs.py @@ -0,0 +1,189 @@ +import logging +import time +from concurrent.futures import ThreadPoolExecutor + +class EcsModule: + def __init__(self, session, storage, workspace_name, no_wait=False, threads=10): + self.session = session + self.client = self.session.client('ecs') + self.application_autoscaling_client = self.session.client('application-autoscaling') + self.storage = storage + self.scheduler_summary_message = [] + self.workspace_name = workspace_name + self.no_wait = no_wait + self.threads = threads + + def get_services_by_tags(self, cluster_name, tags): + paginator = self.client.get_paginator('list_services') + service_arns = [] + for page in paginator.paginate(cluster=cluster_name): + service_arns.extend(page['serviceArns']) + + services = [] + for i in range(0, len(service_arns), 10): + batch_arns = service_arns[i:i + 10] + response = self.client.describe_services(cluster=cluster_name, services=batch_arns) + for service in response['services']: + response_tags = self.client.list_tags_for_resource(resourceArn=service['serviceArn']) + service_tags = {tag['key']: tag['value'] for tag in response_tags.get('tags', [])} + if all(service_tags.get(k) == v for k, v in tags.items()): + services.append(service['serviceName']) + return services + + def get_ecs_service_status(self, cluster_name, service_name): + response = self.client.describe_services(cluster=cluster_name, services=[service_name]) + service_data = response['services'][0] + return { + "cluster_name": cluster_name, + "service_name": service_name, + "desired_count": service_data['desiredCount'], + "running_count": service_data['runningCount'], + "status": service_data['status'], + "launch_type": service_data.get('launchType', 'UNKNOWN'), + "task_definition": service_data['taskDefinition'], + } + + def main_scheduler_ecs(self, ecs_config, action): + service_data_list = [] + + for cluster_name, config in ecs_config.items(): + services = config.get('services', []) + tags = config.get('tags', {}) + + # Add services specified by name + for service_name in services: + service_data = self.get_ecs_service_status(cluster_name, service_name) + service_data_list.append(service_data) + + # Add services discovered via tags + if tags: + tagged_services = self.get_services_by_tags(cluster_name, tags) + for service_name in tagged_services: + service_data = self.get_ecs_service_status(cluster_name, service_name) + service_data_list.append(service_data) + + if action == 'status': + return service_data_list + + with ThreadPoolExecutor(max_workers=self.threads) as executor: + if action == "start": + executor.map(self.safe_execution, [self.start_ecs_service] * len(service_data_list), service_data_list) + elif action == "stop": + executor.map(self.safe_execution, [self.stop_ecs_service] * len(service_data_list), service_data_list) + + return self.scheduler_summary_message + + def safe_execution(self, func, service_data): + try: + func(service_data) + except Exception as e: + service_name = service_data.get('service_name', 'Unknown') + cluster_name = service_data.get('cluster_name', 'Unknown') + logging.error(f"An error occurred while processing service {service_name} in cluster {cluster_name}: {str(e)}") + self.scheduler_summary_message.append(f"Failed to process service {service_name} in cluster {cluster_name}.") + + def start_ecs_service(self, service_data): + cluster_name = service_data['cluster_name'] + service_name = service_data['service_name'] + parameter_name = f"/scheduler/{self.workspace_name}/ecs/{cluster_name}/{service_name}" + stored_data = self.storage.read_state(parameter_name) + + if not stored_data or len(stored_data) < 3: + logging.error(f"No stored data found for ECS service {service_name} in cluster {cluster_name}. Cannot start.") + self.scheduler_summary_message.append(f"ECS service {service_name} cannot be started due to missing stored data.") + return + + desired_count = int(stored_data[2]) + min_capacity = int(stored_data[3]) if len(stored_data) > 3 else None + max_capacity = int(stored_data[4]) if len(stored_data) > 4 else None + + current_status = self.get_ecs_service_status(cluster_name, service_name) + if current_status['running_count'] == desired_count: + logging.info(f"Service {service_name} in cluster {cluster_name} is already at desired count.") + self.scheduler_summary_message.append(f"Service {service_name} in cluster {cluster_name} is already at desired count.") + return + + if min_capacity is not None and max_capacity is not None: + self.update_scaling_policy(cluster_name, service_name, min_capacity, max_capacity) + + self.update_ecs_service(cluster_name, service_name, desired_count) + if not self.no_wait: + self.wait_for_service(cluster_name, service_name, desired_count) + self.scheduler_summary_message.append(f"Service {service_name} in cluster {cluster_name} started successfully.") + else: + self.scheduler_summary_message.append(f"Service {service_name} in cluster {cluster_name} start initiated (no-wait mode).") + + def stop_ecs_service(self, service_data): + cluster_name = service_data['cluster_name'] + service_name = service_data['service_name'] + parameter_name = f"/scheduler/{self.workspace_name}/ecs/{cluster_name}/{service_name}" + + current_status = self.get_ecs_service_status(cluster_name, service_name) + if current_status['running_count'] == 0: + logging.info(f"Service {service_name} in cluster {cluster_name} is already stopped.") + self.scheduler_summary_message.append(f"Service {service_name} in cluster {cluster_name} is already stopped.") + return + + scaling_policy = self.describe_scaling_policy(cluster_name, service_name) + + if scaling_policy: + stored_data = [ + str(cluster_name), + str(service_name), + str(current_status['desired_count']), + str(scaling_policy['min_capacity']), + str(scaling_policy['max_capacity']) + ] + else: + stored_data = [str(cluster_name), str(service_name), str(current_status['desired_count'])] + + self.storage.write_state(parameter_name, stored_data) + + if scaling_policy: + self.update_scaling_policy(cluster_name, service_name, min_capacity=0, max_capacity=0) + + self.update_ecs_service(cluster_name, service_name, 0) + if not self.no_wait: + self.wait_for_service(cluster_name, service_name, 0) + self.scheduler_summary_message.append(f"Service {service_name} in cluster {cluster_name} stopped successfully.") + else: + self.scheduler_summary_message.append(f"Service {service_name} in cluster {cluster_name} stop initiated (no-wait mode).") + + def update_ecs_service(self, cluster_name, service_name, desired_count): + self.client.update_service( + cluster=cluster_name, + service=service_name, + desiredCount=desired_count + ) + logging.info(f"Service {service_name} in cluster {cluster_name} updated to desired count: {desired_count}") + + def wait_for_service(self, cluster_name, service_name, expected_running_count): + while True: + status_data = self.get_ecs_service_status(cluster_name, service_name) + if status_data['running_count'] == expected_running_count: + return + time.sleep(15) + + def describe_scaling_policy(self, cluster_name, service_name): + response = self.application_autoscaling_client.describe_scalable_targets( + ServiceNamespace='ecs', + ResourceIds=[f'service/{cluster_name}/{service_name}'] + ) + + if response['ScalableTargets']: + scalable_target = response['ScalableTargets'][0] + return { + "min_capacity": scalable_target.get('MinCapacity', 0), + "max_capacity": scalable_target.get('MaxCapacity', 0) + } + return {} + + def update_scaling_policy(self, cluster_name, service_name, min_capacity, max_capacity): + self.application_autoscaling_client.register_scalable_target( + ServiceNamespace='ecs', + ResourceId=f'service/{cluster_name}/{service_name}', + ScalableDimension='ecs:service:DesiredCount', + MinCapacity=min_capacity, + MaxCapacity=max_capacity + ) + logging.info(f"Updated scaling policy for service {service_name} in cluster {cluster_name} with MinCapacity: {min_capacity}, MaxCapacity: {max_capacity}") diff --git a/src/aws_resource_scheduler/utils/rds.py b/src/aws_resource_scheduler/utils/rds.py new file mode 100644 index 0000000..8fafd40 --- /dev/null +++ b/src/aws_resource_scheduler/utils/rds.py @@ -0,0 +1,143 @@ +# src/aws_resource_scheduler/utils/rds.py + +import logging +import time +from concurrent.futures import ThreadPoolExecutor +from botocore.exceptions import ClientError + +class RdsModule: + """ + Manages RDS instance start, stop, and scheduling operations. + """ + + def __init__(self, session, no_wait=False, threads=10): + """ + Initializes the RDS module with the provided AWS session. + """ + self.session = session + self.client = self.session.client('rds') + self.scheduler_summary_message = [] + self.no_wait = no_wait + self.threads = threads + + def describe_rds_instances(self, instance_identifiers=None, tags=None, instance_attributes=None): + paginator = self.client.get_paginator('describe_db_instances') + page_iterator = paginator.paginate() + + rds_instances = [] + for page in page_iterator: + for instance in page['DBInstances']: + if instance_identifiers and instance['DBInstanceIdentifier'] not in instance_identifiers: + continue + if tags: + try: + response = self.client.list_tags_for_resource(ResourceName=instance['DBInstanceArn']) + tags_dict = {tag['Key']: tag['Value'] for tag in response.get('TagList', [])} + if not all(tags_dict.get(k) == v for k, v in tags.items()): + continue + except ClientError as e: + logging.error(f"Failed to list tags for RDS instance {instance['DBInstanceIdentifier']}: {str(e)}") + continue + + instance_data = {'DBInstanceIdentifier': instance['DBInstanceIdentifier']} + if instance_attributes: + for attribute in instance_attributes: + if attribute == 'Endpoint': + instance_data['Endpoint'] = instance.get('Endpoint', {}).get('Address') + else: + instance_data[attribute] = instance.get(attribute) + rds_instances.append(instance_data) + + return rds_instances + + def wait_status(self, instance_identifier, expected_status): + """ + Waits until the RDS instance reaches the expected status. + """ + while True: + status = self.instance_status(instance_identifier) + if status == expected_status: + return + time.sleep(15) + + def start_instance(self, instance_identifier): + """ + Starts the specified RDS instance if not already started. + """ + try: + current_status = self.instance_status(instance_identifier) + if current_status in ["available", "starting"]: + self.scheduler_summary_message.append(f"{instance_identifier} is already Available or Starting.") + return + self.client.start_db_instance(DBInstanceIdentifier=instance_identifier) + if not self.no_wait: + self.wait_status(instance_identifier, "available") + self.scheduler_summary_message.append(f"{instance_identifier} started successfully.") + else: + self.scheduler_summary_message.append(f"{instance_identifier} start initiated (no-wait mode).") + except ClientError as e: + logging.error(f"Failed to start RDS instance {instance_identifier}: {str(e)}") + self.scheduler_summary_message.append(f"Error starting {instance_identifier}: {str(e)}") + + def stop_instance(self, instance_identifier): + """ + Stops the specified RDS instance if not already stopped. + """ + try: + current_status = self.instance_status(instance_identifier) + if current_status in ["stopped", "stopping"]: + self.scheduler_summary_message.append(f"{instance_identifier} is already in Stopping or Stopped state.") + return + self.client.stop_db_instance(DBInstanceIdentifier=instance_identifier) + if not self.no_wait: + self.wait_status(instance_identifier, "stopped") + self.scheduler_summary_message.append(f"{instance_identifier} stopped successfully.") + else: + self.scheduler_summary_message.append(f"{instance_identifier} stop initiated (no-wait mode).") + except ClientError as e: + logging.error(f"Failed to stop RDS instance {instance_identifier}: {str(e)}") + self.scheduler_summary_message.append(f"Error stopping {instance_identifier}: {str(e)}") + + def instance_status(self, instance_identifier): + try: + response = self.client.describe_db_instances( + DBInstanceIdentifier=instance_identifier + ) + return response['DBInstances'][0]['DBInstanceStatus'] + except ClientError as e: + logging.error(f"Failed to get status for RDS instance {instance_identifier}: {str(e)}") + return "unknown" + + def schedule_rds(self, data, action, instance_attributes=None): + rds_instances = [] + + if 'name' in data: + identifiers = data['name'] if isinstance(data['name'], list) else [data['name']] + rds_instances.extend(self.describe_rds_instances(instance_identifiers=identifiers, instance_attributes=instance_attributes)) + + if 'tags' in data: + rds_instances.extend(self.describe_rds_instances(tags=data['tags'], instance_attributes=instance_attributes)) + + unique_instances = {instance['DBInstanceIdentifier']: instance for instance in rds_instances} + + if action == 'status': + return list(unique_instances.values()) + + with ThreadPoolExecutor(max_workers=self.threads) as executor: + if action == "stop": + executor.map(self.stop_instance, unique_instances.keys()) + elif action == "start": + executor.map(self.start_instance, unique_instances.keys()) + + # Collect instance details for summary + instance_details = [] + for instance_id in unique_instances.keys(): + status = self.instance_status(instance_id) + instance_info = { + 'DBInstanceIdentifier': instance_id, + 'action': action, + 'Status': status + } + instance_details.append(instance_info) + + return instance_details diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..f70a0d4 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,149 @@ +import unittest +from unittest.mock import patch, MagicMock +from aws_resource_scheduler.scheduler import main +from aws_resource_scheduler.utils.common import aws_login, write_to_parameter_store, read_from_parameter_store + +class TestScheduler(unittest.TestCase): + + @patch('aws_resource_scheduler.utils.common.boto3.Session') + def test_aws_login(self, mock_boto3_session): + # Test that aws_login returns a boto3 session + mock_session = MagicMock() + mock_boto3_session.return_value = mock_session + workspace = {"aws_region": "us-west-2"} + session = aws_login(workspace) + self.assertEqual(session, mock_session) + + @patch('aws_resource_scheduler.utils.common.boto3.Session') + def test_aws_login_with_role(self, mock_boto3_session): + # Test aws_login when a role_arn is provided + mock_session = MagicMock() + mock_sts = mock_session.client.return_value + mock_boto3_session.return_value = mock_session + + workspace = {"aws_region": "us-west-2", "role_arn": "arn:aws:iam::123456789012:role/SchedulerRole"} + aws_login(workspace) + + mock_sts.assume_role.assert_called_with( + RoleArn='arn:aws:iam::123456789012:role/SchedulerRole', + RoleSessionName=unittest.mock.ANY + ) + + @patch('aws_resource_scheduler.utils.common.boto3.Session') + @patch('aws_resource_scheduler.utils.common.write_to_parameter_store') + def test_write_to_parameter_store(self, mock_write_to_ps, mock_boto3_session): + # Test writing to parameter store + session = MagicMock() + mock_boto3_session.return_value = session + param_name = "/scheduler/test" + value = ['asg1,0,1,2', 'ecs1,1'] + + write_to_parameter_store(session, param_name, value) + + mock_write_to_ps.assert_called_once_with(session, param_name, value) + + @patch('aws_resource_scheduler.utils.common.boto3.Session') + @patch('aws_resource_scheduler.utils.common.read_from_parameter_store') + def test_read_from_parameter_store(self, mock_read_from_ps, mock_boto3_session): + # Test reading from parameter store + session = MagicMock() + mock_boto3_session.return_value = session + param_name = "/scheduler/test" + mock_read_from_ps.return_value = ['asg1,0,1,2', 'ecs1,1'] + + result = read_from_parameter_store(session, param_name) + + mock_read_from_ps.assert_called_once_with(session, param_name) + self.assertEqual(result, ['asg1,0,1,2', 'ecs1,1']) + + @patch('aws_resource_scheduler.utils.ec2.Ec2Module.schedule_ec2_instances') + @patch('aws_resource_scheduler.utils.asg.AsgModule.main_scheduler_asg') + @patch('aws_resource_scheduler.utils.rds.RdsModule.schedule_rds') + @patch('aws_resource_scheduler.utils.ecs.EcsModule.main_scheduler_ecs') + @patch('aws_resource_scheduler.utils.common.aws_login') + @patch('aws_resource_scheduler.utils.common.parse_arguments') + @patch('aws_resource_scheduler.utils.common.evaluate') + @patch('aws_resource_scheduler.utils.common.send_chat_notification') + def test_main_function(self, mock_send_chat, mock_evaluate, mock_parse_args, mock_aws_login, + mock_ecs_scheduler, mock_rds_scheduler, mock_asg_scheduler, mock_ec2_scheduler): + # Setup mock for parsing arguments + mock_parse_args.return_value = MagicMock() + + # Mock evaluation of config + mock_evaluate.return_value = ( + {"aws_region": "us-west-2", "asg": {"name": ["asg1"]}, "ec2": {"name": ["ec2-1"]}, + "ecs": {"name": ["ecs1"]}, "rds": {"name": ["rds1"]}}, + ["ec2", "asg", "ecs", "rds"], + "stop", + "workspace_name", + False, + 10 + ) + + # Mock AWS session login + mock_session = MagicMock() + mock_aws_login.return_value = mock_session + + # Mock scheduler functions + mock_asg_scheduler.return_value = ["asg1 stopped"] + mock_ec2_scheduler.return_value = [{"InstanceId": "ec2-1", "State": "stopped"}] + mock_ecs_scheduler.return_value = ["ecs1 stopped"] + mock_rds_scheduler.return_value = ["rds1 stopped"] + + # Run the main function + main() + + # Assertions for EC2 + mock_ec2_scheduler.assert_called_once_with( + {"name": ["ec2-1"]}, + "stop", + instance_attributes=['InstanceId', 'InstanceType', 'State', 'PrivateIpAddress', 'PublicIpAddress'] + ) + + # Assertions for ASG + mock_asg_scheduler.assert_called_once_with({"name": ["asg1"]}, "stop") + + # Assertions for ECS + mock_ecs_scheduler.assert_called_once_with({"name": ["ecs1"]}, "stop") + + # Assertions for RDS + mock_rds_scheduler.assert_called_once_with({"name": ["rds1"]}, "stop", instance_attributes=['DBInstanceIdentifier', 'DBInstanceStatus', 'DBInstanceClass', 'Engine', 'Endpoint']) + + # Verify the notification was sent + mock_send_chat.assert_called() + + @patch('aws_resource_scheduler.utils.ecs.EcsModule.start_ecs_service') + @patch('aws_resource_scheduler.utils.ecs.EcsModule.stop_ecs_service') + @patch('aws_resource_scheduler.utils.ecs.EcsModule.get_ecs_service_status') + def test_safe_execution(self, mock_get_status, mock_stop, mock_start): + # Mock the behavior of get_ecs_service_status to simulate different scenarios + mock_get_status.return_value = { + "cluster_name": "test-cluster", + "service_name": "test-service", + "desired_count": 1, + "running_count": 1, + "status": "ACTIVE", + "launch_type": "FARGATE", + "task_definition": "test-task-def" + } + + # Mock stop_ecs_service to raise an exception + mock_stop.side_effect = Exception("Test Exception") + + # Initialize EcsModule with mocked data + session = MagicMock() + storage = MagicMock() + ecs_module = EcsModule(session, storage, "workspace", no_wait=False, threads=2) + + # Call safe_execution with stop_ecs_service and service_data + service_data = { + "cluster_name": "test-cluster", + "service_name": "test-service" + } + ecs_module.safe_execution(ecs_module.stop_ecs_service, service_data) + + # Verify that the error was logged and the message was appended to the scheduler_summary_message + self.assertIn("Failed to process service test-service", ecs_module.scheduler_summary_message) + +if __name__ == '__main__': + unittest.main()