Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEV-48 exclude records from updates #13

Merged
merged 11 commits into from
Mar 13, 2024
5 changes: 2 additions & 3 deletions .github/workflows/mkdocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: 3.x
- name: Install local pkg
run: pip install .[all]
- run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV
- uses: actions/cache@v4
with:
key: mkdocs-material-${{ env.cache_id }}
path: .cache
restore-keys: |
mkdocs-material-
- run: pip install mkdocs-material
- name: Install local pkg
run: pip install .[all]
- run: mkdocs gh-deploy --force
4 changes: 4 additions & 0 deletions docs/add_vendor.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def inventory_server_prices_spot(vendor):
pass


def inventory_storage(vendor):
pass


def inventory_storage_prices(vendor):
pass

Expand Down
77 changes: 48 additions & 29 deletions src/sc_crawler/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
"""The Spare Cores (SC) Crawler CLI tool.

Provides the `sc-crawler` command and the below subcommands:

- [schema][sc_crawler.cli.schema]
- [pull][sc_crawler.cli.pull]
- [hash][sc_crawler.cli.hash]
"""

import logging
from datetime import datetime, timedelta
from enum import Enum
Expand Down Expand Up @@ -39,8 +48,8 @@
log_levels = list(logging._nameToLevel.keys())
LogLevels = Enum("LOGLEVELS", {k: k for k in log_levels})

supported_tables = [m[10:] for m in dir(Vendor) if m.startswith("inventory_")]
Tables = Enum("TABLES", {k: k for k in supported_tables})
supported_records = [r[10:] for r in dir(Vendor) if r.startswith("inventory_")]
Records = Enum("RECORDS", {k: k for k in supported_records})


@cli.command()
Expand Down Expand Up @@ -74,18 +83,24 @@ def pull(
] = "sqlite:///sc_crawler.db",
include_vendor: Annotated[
List[Vendors],
typer.Option(
help="Filter for specific vendor. Can be specified multiple times."
),
] = [],
typer.Option(help="Enabled data sources. Can be specified multiple times."),
] = [v.id for v in supported_vendors],
exclude_vendor: Annotated[
List[Vendors],
typer.Option(help="Exclude specific vendor. Can be specified multiple times."),
typer.Option(help="Disabled data sources. Can be specified multiple times."),
] = [],
include_records: Annotated[
List[Records],
typer.Option(
help="Database records to be updated. Can be specified multiple times."
),
] = supported_records,
exclude_records: Annotated[
List[Records],
typer.Option(
help="Database records NOT to be updated. Can be specified multiple times."
),
] = [],
update_table: Annotated[
List[Tables],
typer.Option(help="Tables to be updated. Can be specified multiple times."),
] = supported_tables,
log_level: Annotated[
LogLevels, typer.Option(help="Log level threshold.")
] = LogLevels.INFO.value, # TODO drop .value after updating Enum to StrEnum in Python3.11
Expand Down Expand Up @@ -123,24 +138,28 @@ def custom_serializer(x):
logger.addHandler(channel)

# filter vendors
vendors = supported_vendors
vendors = [
vendor
for vendor in vendors
for vendor in supported_vendors
if (
vendor.id in [vendor.value for vendor in include_vendor]
and vendor.id not in [vendor.value for vendor in exclude_vendor]
vendor.id in [iv.value for iv in include_vendor]
and vendor.id not in [ev.value for ev in exclude_vendor]
)
]

# filter reocrds
records = [r for r in include_records if r not in exclude_records]

engine = create_engine(connection_string, json_serializer=custom_serializer)
SQLModel.metadata.create_all(engine)

pbars = ProgressPanel()
with Live(pbars.panels):
# show CLI arguments in the Metadata panel
pbars.metadata.append(Text("Update target(s): ", style="bold"))
pbars.metadata.append(Text(", ".join([x.value for x in update_table]) + "\n"))
pbars.metadata.append(Text("Data sources: ", style="bold"))
pbars.metadata.append(Text(", ".join([x.id for x in vendors]) + " "))
pbars.metadata.append(Text("Updating records: ", style="bold"))
pbars.metadata.append(Text(", ".join([x.value for x in records]) + "\n"))
pbars.metadata.append(Text("Connection type: ", style="bold"))
pbars.metadata.append(Text(connection_string.split(":")[0]))
pbars.metadata.append(Text(" Cache: ", style="bold"))
Expand Down Expand Up @@ -170,29 +189,29 @@ def custom_serializer(x):
vendor.progress_tracker = VendorProgressTracker(
vendor=vendor, progress_panel=pbars
)
vendor.progress_tracker.start_vendor(n=len(update_table))
if Tables.compliance_frameworks in update_table:
vendor.progress_tracker.start_vendor(n=len(records))
if Records.compliance_frameworks in records:
vendor.inventory_compliance_frameworks()
if Tables.datacenters in update_table:
if Records.datacenters in records:
vendor.inventory_datacenters()
if Tables.zones in update_table:
if Records.zones in records:
vendor.inventory_zones()
if Tables.servers in update_table:
if Records.servers in records:
vendor.inventory_servers()
if Tables.server_prices in update_table:
if Records.server_prices in records:
vendor.inventory_server_prices()
if Tables.server_prices_spot in update_table:
if Records.server_prices_spot in records:
vendor.inventory_server_prices_spot()
if Tables.storages in update_table:
if Records.storages in records:
vendor.inventory_storages()
if Tables.storage_prices in update_table:
if Records.storage_prices in records:
vendor.inventory_storage_prices()
if Tables.traffic_prices in update_table:
if Records.traffic_prices in records:
vendor.inventory_traffic_prices()
if Tables.ipv4_prices in update_table:
if Records.ipv4_prices in records:
vendor.inventory_ipv4_prices()
# reset current step name
vendor.progress_tracker.update_vendor(step="")
vendor.progress_tracker.update_vendor(step="")
session.merge(vendor)
session.commit()

Expand Down
4 changes: 2 additions & 2 deletions src/sc_crawler/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ def advance_vendor(self, by: int = 1) -> None:
Args:
by: Number of steps to advance.
"""
self.vendors.update(self.vendors.task_ids[0], advance=by)
self.vendors.update(self.vendors.task_ids[-1], advance=by)

def update_vendor(self, **kwargs) -> None:
"""Update the vendor's progress bar.

Useful fields:
- `step`: Name of the currently running step to be shown on the progress bar.
"""
self.vendors.update(self.vendors.task_ids[0], **kwargs)
self.vendors.update(self.vendors.task_ids[-1], **kwargs)

def start_task(self, name: str, n: int) -> TaskID:
"""Starts a progress bar in the list of current jobs.
Expand Down
3 changes: 2 additions & 1 deletion src/sc_crawler/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ class ScModel(SQLModel, metaclass=ScMetaModel):
"""Custom extensions to SQLModel objects and tables.

Extra features:

- auto-generated table names using snake_case,
- support for hashing table rows,
- reuse description field of tables/columns as SQL comment,
- automatically append observed_at column.
- automatically append `observed_at` column.
"""

@declared_attr # type: ignore
Expand Down
35 changes: 18 additions & 17 deletions src/sc_crawler/vendors/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def inventory_datacenters(vendor):
def inventory_zones(vendor):
"""List all available AWS availability zones."""
vendor.progress_tracker.start_task(
name="Scanning datacenters for zones", n=len(vendor.datacenters)
name="Scanning datacenter(s) for zone(s)", n=len(vendor.datacenters)
)

def get_zones(datacenter: Datacenter, vendor: Vendor) -> List[dict]:
Expand Down Expand Up @@ -711,15 +711,15 @@ def inventory_servers(vendor):
# TODO drop this in favor of pricing.get_products, as it has info e.g. on instanceFamily
# although other fields are messier (e.g. extract memory from string)
vendor.progress_tracker.start_task(
name="Scanning datacenters for servers", n=len(vendor.datacenters)
name="Scanning datacenter(s) for server(s)", n=len(vendor.datacenters)
)

def search_servers(datacenter: Datacenter, vendor: Optional[Vendor]) -> List[dict]:
instance_types = []
if datacenter.status == "active":
instance_types = _boto_describe_instance_types(datacenter.id)
if vendor:
vendor.log(f"{len(instance_types)} servers found in {datacenter.id}.")
vendor.log(f"{len(instance_types)} server(s) found in {datacenter.id}.")
if vendor:
vendor.progress_tracker.advance_task()
return instance_types
Expand All @@ -729,14 +729,14 @@ def search_servers(datacenter: Datacenter, vendor: Optional[Vendor]) -> List[dic
instance_types = list(chain.from_iterable(products))

vendor.log(
f"{len(instance_types)} servers found in {len(vendor.datacenters)} regions."
f"{len(instance_types)} server(s) found in {len(vendor.datacenters)} regions."
)
instance_types = list({p["InstanceType"]: p for p in instance_types}.values())
vendor.log(f"{len(instance_types)} unique servers found.")
vendor.log(f"{len(instance_types)} unique server(s) found.")
vendor.progress_tracker.hide_task()

vendor.progress_tracker.start_task(
name="Preprocessing servers", n=len(instance_types)
name="Preprocessing server(s)", n=len(instance_types)
)
servers = []
for instance_type in instance_types:
Expand All @@ -748,7 +748,7 @@ def search_servers(datacenter: Datacenter, vendor: Optional[Vendor]) -> List[dic

def inventory_server_prices(vendor):
vendor.progress_tracker.start_task(
name="Searching for ondemand server_prices", n=None
name="Searching for ondemand server_price(s)", n=None
)
products = _boto_get_products(
service_code="AmazonEC2",
Expand All @@ -772,7 +772,7 @@ def inventory_server_prices(vendor):

server_prices = []
vendor.progress_tracker.start_task(
name="Preprocess ondemand server_prices", n=len(products)
name="Preprocess ondemand server_price(s)", n=len(products)
)
for product in products:
try:
Expand Down Expand Up @@ -807,7 +807,7 @@ def inventory_server_prices(vendor):

def inventory_server_prices_spot(vendor):
vendor.progress_tracker.start_task(
name="Scanning datacenters for spot server_prices", n=len(vendor.datacenters)
name="Scanning datacenters for spot server_price(s)", n=len(vendor.datacenters)
)

def get_spot_prices(datacenter: Datacenter, vendor: Vendor) -> List[dict]:
Expand All @@ -833,7 +833,7 @@ def get_spot_prices(datacenter: Datacenter, vendor: Vendor) -> List[dict]:

server_prices = []
vendor.progress_tracker.start_task(
name="Preprocess spot server_prices", n=len(products)
name="Preprocess spot server_price(s)", n=len(products)
)
for product in products:
try:
Expand Down Expand Up @@ -976,7 +976,7 @@ def get_attr(key: str) -> float:

def inventory_storage_prices(vendor):
vendor.progress_tracker.start_task(
name="Searching for Storage Prices", n=len(storage_manual_data)
name="Searching for storage_price(s)", n=len(storage_manual_data)
)
with ThreadPoolExecutor(max_workers=8) as executor:
products = executor.map(
Expand All @@ -992,7 +992,7 @@ def inventory_storage_prices(vendor):
datacenters = scmodels_to_dict(vendor.datacenters, keys=["name", "aliases"])

vendor.progress_tracker.start_task(
name="Preprocessing storage_prices", n=len(products)
name="Preprocessing storage_price(s)", n=len(products)
)
prices = []
for product in products:
Expand Down Expand Up @@ -1024,7 +1024,7 @@ def inventory_traffic_prices(vendor):
for direction in list(TrafficDirection):
loc_dir = "toLocation" if direction == TrafficDirection.IN else "fromLocation"
vendor.progress_tracker.start_task(
name=f"Searching for {direction.value} Traffic prices", n=None
name=f"Searching for {direction.value} traffic_price(s)", n=None
)
products = _boto_get_products(
service_code="AWSDataTransfer",
Expand All @@ -1034,7 +1034,8 @@ def inventory_traffic_prices(vendor):
)
vendor.log(f"Found {len(products)} {direction.value} traffic_price(s).")
vendor.progress_tracker.update_task(
description=f"Syncing {direction.value} Traffic prices", total=len(products)
description=f"Syncing {direction.value} traffic_price(s)",
total=len(products),
)
items = []
for product in products:
Expand Down Expand Up @@ -1062,7 +1063,7 @@ def inventory_traffic_prices(vendor):


def inventory_ipv4_prices(vendor):
vendor.progress_tracker.start_task(name="Searching for IPv4 prices", n=None)
vendor.progress_tracker.start_task(name="Searching for ipv4_price(s)", n=None)
products = _boto_get_products(
service_code="AmazonVPC",
filters={
Expand All @@ -1072,7 +1073,7 @@ def inventory_ipv4_prices(vendor):
)
vendor.log(f"Found {len(products)} ipv4_price(s).")
vendor.progress_tracker.update_task(
description="Syncing IPv4 prices", total=len(products)
description="Syncing ipv4_price(s)", total=len(products)
)
# lookup tables
datacenters = scmodels_to_dict(vendor.datacenters, keys=["name", "aliases"])
Expand All @@ -1081,7 +1082,7 @@ def inventory_ipv4_prices(vendor):
try:
datacenter = datacenters[product["product"]["attributes"]["location"]]
except KeyError as e:
vendor.log("Datacenter not found: %s" % str(e), DEBUG)
vendor.log("datacenter not found: %s" % str(e), DEBUG)
continue
price = _extract_ondemand_price(product["terms"])
items.append(
Expand Down
4 changes: 4 additions & 0 deletions src/sc_crawler/vendors/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def inventory_server_prices_spot(vendor):
pass


def inventory_storages(vendor):
pass


def inventory_storage_prices(vendor):
pass

Expand Down
Loading