diff --git a/.flake8 b/.flake8 index ee8bc97..15d0f7e 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,5 @@ [flake8] -ignore = DAR +ignore = DAR,W503 max-line-length = 88 docstring-convention = google per-file-ignores = diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a553751..96356a8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,11 +18,11 @@ jobs: matrix: python-version: - "3.8" - - "3.9" - - "3.10" - - "3.11" - - "3.12" - - "3.13" + # - "3.9" + # - "3.10" + # - "3.11" + # - "3.12" + # - "3.13" steps: - name: Checkout code @@ -58,6 +58,6 @@ jobs: env: TAP_PULUMI_CLOUD_TOKEN: ${{ secrets.TAP_PULUMI_CLOUD_TOKEN }} TAP_PULUMI_CLOUD_ORGANIZATIONS: ${{ secrets.TAP_PULUMI_CLOUD_ORGANIZATIONS }} - TAP_PULUMI_CLOUD_START_DATE: ${{ secrets.TAP_PULUMI_CLOUD_START_DATE }} + TAP_PULUMI_CLOUD_START_DATE: "2023-01-01T00:00:00Z" run: | nox diff --git a/meltano.yml b/meltano.yml index 3e50ee1..d424d82 100644 --- a/meltano.yml +++ b/meltano.yml @@ -21,6 +21,7 @@ plugins: kind: password label: API Token description: API Token for Pulumi Cloud + sensitive: true - name: requests_cache.enabled kind: boolean label: Enable Requests Cache @@ -29,9 +30,14 @@ plugins: kind: object label: Requests Cache Config description: Configuration for requests cache - repository: https://github.com/edgarrmondragon/tap-pulumi-cloud + - name: start_date + kind: date_iso8601 + value: 2024-01-01T00:00:00+00:00 + label: Start Date + description: Start date config: organizations: [meltano] + repository: https://github.com/edgarrmondragon/tap-pulumi-cloud loaders: - name: target-jsonl variant: andyh1203 diff --git a/tap_pulumi_cloud/audit_logs.py b/tap_pulumi_cloud/audit_logs.py new file mode 100644 index 0000000..1c65eb7 --- /dev/null +++ b/tap_pulumi_cloud/audit_logs.py @@ -0,0 +1,217 @@ +"""Stream type classes for tap-pulumi-cloud.""" + +from __future__ import annotations + +import typing as t +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from requests import Response + from singer_sdk.helpers.types import Context +from singer_sdk import metrics +from singer_sdk import typing as th +from singer_sdk.helpers.jsonpath import extract_jsonpath +from singer_sdk.pagination import BaseAPIPaginator + +from tap_pulumi_cloud.client import _OrgPartitionedStream + + +class AuditLogsPaginator(BaseAPIPaginator[t.Optional[str]]): + """Paginator class for APIs returning a pagination token in the response body.""" + + def __init__( + self, + jsonpath: str, + since: int, + *args: t.Any, + **kwargs: t.Any, + ) -> None: + """Create a new paginator. + + Args: + jsonpath: A JSONPath expression. + since: Start date for the audit logs. + args: Paginator positional arguments for base class. + kwargs: Paginator keyword arguments for base class. + """ + super().__init__(None, *args, **kwargs) + self._jsonpath = jsonpath + self._since = since + + def get_next(self, response: Response) -> str | None: + """Get the next page token. + + Args: + response: API response object. + + Returns: + The next page token. + """ + all_matches = extract_jsonpath(self._jsonpath, response.json()) + matched = next(all_matches, None) + if matched is None or int(matched) < self._since: + return None + return matched + + +class AuditLogs(_OrgPartitionedStream): + """Stream Audit Logs.""" + + name = "audit_logs" + path = "/api/orgs/{org_name}/auditlogs" + primary_keys: t.Sequence[str] = ["org_name", "timestamp", "event", "description"] + records_jsonpath = "$.auditLogEvents[*]" + replication_key = "timestamp" + is_sorted = False + + schema = th.PropertiesList( + th.Property( + "org_name", th.StringType, description="The name of the organization." + ), + th.Property( + "timestamp", + th.DateTimeType, + description="The timestamp of the audit log event.", + ), + th.Property( + "source_ip", + th.StringType, + description="The source IP of the audit log event.", + ), + th.Property( + "event", th.StringType, description="The event of the audit log event." + ), + th.Property( + "description", + th.StringType, + description="The description of the audit log event.", + ), + th.Property( + "user", + th.ObjectType( + th.Property("name", th.StringType, description="The name of the user."), + th.Property( + "github_login", + th.StringType, + description="The GitHub login of the user.", + ), + th.Property( + "avatar_url", + th.StringType, + description="The avatar URL of the user.", + ), + ), + description="The user of the audit log event.", + ), + th.Property( + "token_id", + th.StringType, + description="The token id associated with this event.", + ), + th.Property( + "token_name", + th.StringType, + description="The token name associated with this event.", + ), + th.Property( + "req_org_admin", + th.BooleanType, + description="Required organization admin role.", + ), + th.Property( + "req_stack_admin", th.BooleanType, description="Required stack admin role." + ), + th.Property( + "auth_failure", + th.BooleanType, + description="Event was the result of an authentication check failure.", + ), + ).to_dict() + + def get_new_paginator(self, context: Context | None) -> BaseAPIPaginator: + """Get a fresh paginator for this API endpoint. + + Returns: + A paginator instance. + """ + return AuditLogsPaginator( + self.next_page_token_jsonpath, + self.get_starting_timestamp(context).timestamp(), + ) + + def request_records(self, context: Context | None) -> t.Iterable[dict]: + """Request records from REST endpoint(s), returning response records. + + If pagination is detected, pages will be recursed automatically. + + Args: + context: Stream partition or context dictionary. + + Yields: + An item for every record in the response. + """ + paginator = self.get_new_paginator(context) + decorated_request = self.request_decorator(self._request) + pages = 0 + + with metrics.http_request_counter(self.name, self.path) as request_counter: + request_counter.context = context + + while not paginator.finished: + prepared_request = self.prepare_request( + context, + next_page_token=paginator.current_value, + ) + resp = decorated_request(prepared_request, context) + request_counter.increment() + self.update_sync_costs(prepared_request, resp, context) + records = iter(self.parse_response(resp)) + try: + first_record = next(records) + except StopIteration: + self.logger.info( + "Pagination stopped after %d pages because no records were " + "found in the last response", + pages, + ) + break + yield first_record + yield from records + pages += 1 + + paginator.advance(resp) + + def get_url_params( + self, + context: dict | None, + next_page_token: str | None, + ) -> dict[str, t.Any]: + """Return a dictionary of URL query parameters. + + Args: + context: The stream sync context. + next_page_token: A token for the next page of results. + + Returns: + A dictionary of URL query parameters. + """ + params = {"pageSize": 100} + since = round(self.get_starting_timestamp(context).timestamp()) + if next_page_token: + until = int(next_page_token) + else: + until = round(self.get_replication_key_signpost(context).timestamp()) + params["startTime"] = since + params["endTime"] = until + return params + + def post_process( + self, + row: dict, + context: dict | None = None, + ) -> dict | None: + """Post-process a row of data.""" + row = super().post_process(row, context) or {} + row["timestamp"] = datetime.fromtimestamp(row["timestamp"], tz=timezone.utc) + return row diff --git a/tap_pulumi_cloud/client.py b/tap_pulumi_cloud/client.py index fad65ee..4be6cfe 100644 --- a/tap_pulumi_cloud/client.py +++ b/tap_pulumi_cloud/client.py @@ -2,11 +2,16 @@ from __future__ import annotations -from typing import Any +import typing as t +from http import HTTPStatus +from typing import TYPE_CHECKING, Any +if TYPE_CHECKING: + import requests import humps from singer_sdk import RESTStream from singer_sdk.authenticators import APIKeyAuthenticator +from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.helpers._typing import TypeConformanceLevel @@ -15,6 +20,7 @@ class PulumiCloudStream(RESTStream): url_base = "https://api.pulumi.com" next_page_token_jsonpath = "$.continuationToken" # noqa: S105 + tolerated_http_errors: t.Sequence[int] = [] TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY @@ -60,7 +66,7 @@ def get_url_params( Returns: Mapping of URL query parameters. """ - params: dict = {} + params: dict = {"pageSize": 100} if next_page_token: params["continuationToken"] = next_page_token return params @@ -72,3 +78,76 @@ def post_process( ) -> dict | None: """Post-process a row of data.""" return humps.decamelize(row) + + def parse_response(self, response: requests.Response) -> t.Iterable[dict]: + """Parse the response and return an iterator of result records. + + Args: + response: A raw :class:`requests.Response` + + Yields: + One item for every item found in the response. + """ + if response.status_code in self.tolerated_http_errors: + return [] + return super().parse_response(response) + + def validate_response(self, response: requests.Response) -> None: + """Validate HTTP response. + + Checks for error status codes and whether they are fatal or retriable. + + In case an error is deemed transient and can be safely retried, then this + method should raise an :class:`singer_sdk.exceptions.RetriableAPIError`. + By default this applies to 5xx error codes, along with values set in: + :attr:`~singer_sdk.RESTStream.extra_retry_statuses` + + In case an error is unrecoverable raises a + :class:`singer_sdk.exceptions.FatalAPIError`. By default, this applies to + 4xx errors, excluding values found in: + :attr:`~singer_sdk.RESTStream.extra_retry_statuses` + + Tap developers are encouraged to override this method if their APIs use HTTP + status codes in non-conventional ways, or if they communicate errors + differently (e.g. in the response body). + + .. image:: ../images/200.png + + Args: + response: A :class:`requests.Response` object. + + Raises: + FatalAPIError: If the request is not retriable. + RetriableAPIError: If the request is retriable. + """ + if response.status_code in self.tolerated_http_errors: + msg = ( + f"{response.status_code} Tolerated Status Code " + f"(Reason: {response.reason}) for path: {response.request.url}" + ) + self.logger.info(msg) + return + + if ( + response.status_code in self.extra_retry_statuses + or response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR + ): + msg = self.response_error_message(response) + raise RetriableAPIError(msg, response) + + if ( + HTTPStatus.BAD_REQUEST + <= response.status_code + < HTTPStatus.INTERNAL_SERVER_ERROR + ): + msg = self.response_error_message(response) + raise FatalAPIError(msg) + + +class _OrgPartitionedStream(PulumiCloudStream): + """Base class for streams that are partitioned by organization.""" + + @property + def partitions(self) -> list[dict] | None: + """List of organizations to sync.""" + return [{"org_name": org} for org in self.config["organizations"]] diff --git a/tap_pulumi_cloud/environments.py b/tap_pulumi_cloud/environments.py new file mode 100644 index 0000000..81fba66 --- /dev/null +++ b/tap_pulumi_cloud/environments.py @@ -0,0 +1,70 @@ +"""Stream type classes for tap-pulumi-cloud.""" + +from __future__ import annotations + +import typing as t + +from singer_sdk import typing as th + +from tap_pulumi_cloud.client import _OrgPartitionedStream + + +class Environments(_OrgPartitionedStream): + """Stream Organization Environments.""" + + name = "environments" + path = "/api/preview/environments/{org_name}" + primary_keys: t.Sequence[str] = ["org_name", "name"] + records_jsonpath = "$.environments[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + ), + th.Property( + "project", + th.StringType, + description="The project associated with this environment.", + ), + th.Property("name", th.StringType, description="The name of the environment."), + th.Property( + "created", + th.DateTimeType, + description="The timestamp when the environment was created.", + ), + th.Property( + "modified", + th.DateTimeType, + description="The timestamp when the environment was last modified.", + ), + th.Property( + "tags", + th.ObjectType( + th.Property( + "*", # Wildcard to allow for any key in the tags object + th.StringType, + ) + ), + description="A dictionary of tags associated with the environment.", + ), + ).to_dict() + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return { + "environment_name": record["name"], + "org_name": record["org_name"], + } diff --git a/tap_pulumi_cloud/organizations.py b/tap_pulumi_cloud/organizations.py new file mode 100644 index 0000000..2561d64 --- /dev/null +++ b/tap_pulumi_cloud/organizations.py @@ -0,0 +1,538 @@ +"""Stream type classes for tap-pulumi-cloud.""" + +from __future__ import annotations + +import typing as t + +from singer_sdk import typing as th + +from tap_pulumi_cloud.client import PulumiCloudStream, _OrgPartitionedStream + + +class OrganizationMembers(_OrgPartitionedStream): + """Organization members stream.""" + + name = "organization_members" + path = "/api/orgs/{org_name}/members" + primary_keys: t.Sequence[str] = ["org_name", "user_github_login"] + records_jsonpath = "$.members[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "role", + th.StringType, + description="The role of the user in the organization.", + ), + th.Property( + "user_github_login", + th.StringType, + description="The github login of the user.", + ), + th.Property( + "user_name", + th.StringType, + description="The name of the user.", + ), + th.Property( + "user_avatar_url", + th.StringType, + description="The name of the user.", + ), + th.Property( + "created", + th.DateTimeType, + description="The time the user was added to the organization.", + ), + th.Property( + "known_to_pulumi", + th.BooleanType, + ), + th.Property( + "virtual_admin", + th.BooleanType, + ), + ).to_dict() + + def get_url_params( + self, + context: dict | None, + next_page_token: str | None, + ) -> dict[str, t.Any]: + """Return a dictionary of URL query parameters. + + Args: + context: The stream sync context. + next_page_token: A token for the next page of results. + + Returns: + A dictionary of URL query parameters. + """ + params = super().get_url_params(context, next_page_token) + params["type"] = "backend" + return params + + def post_process(self, row: dict, context: dict | None = None) -> dict | None: + """Post-process a row. + + Args: + row: A row. + context: The stream sync context. + + Returns: + The processed row. + """ + new_row = super().post_process(row, context) + if new_row: + new_row["user_name"] = new_row["user"].pop("name") + new_row["user_github_login"] = new_row["user"].pop("github_login") + new_row["user_avatar_url"] = new_row["user"].pop("avatar_url") + return new_row + + +class OrganizationTeams(_OrgPartitionedStream): + """Organization teams stream.""" + + name = "organization_teams" + path = "/api/orgs/{org_name}/teams" + primary_keys: t.Sequence[str] = ["org_name", "name"] + records_jsonpath = "$.teams[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the team.", + ), + th.Property( + "kind", + th.StringType, + description="The kind of team.", + ), + th.Property( + "name", + th.StringType, + description="The name of the team.", + ), + th.Property( + "display_name", + th.StringType, + description="The display name of the team.", + ), + th.Property( + "description", + th.StringType, + description="The description of the team.", + ), + th.Property( + "user_role", + th.StringType, + description="The default user role of the team members.", + ), + ).to_dict() + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return {"org_name": record["org_name"], "team_name": record["name"]} + + +class OrganizationTeamsMembers(PulumiCloudStream): + """Organization team members stream.""" + + name = "organization_team_members" + path = "/api/orgs/{org_name}/teams/{team_name}" + primary_keys: t.Sequence[str] = ["org_name", "team_name", "github_login"] + records_jsonpath = "$.members[*]" + + parent_stream_type = OrganizationTeams + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the team.", + ), + th.Property( + "team_name", + th.StringType, + description="The name of the team.", + ), + th.Property( + "role", + th.StringType, + description="The role of the user in the team.", + ), + th.Property( + "name", + th.StringType, + description="The name of the user.", + ), + th.Property( + "github_login", + th.StringType, + description="The GitHub login of the user.", + ), + th.Property( + "avatar_url", + th.StringType, + description="The URL of the user's avatar.", + ), + ).to_dict() + + +class OrganizationTeamsStacks(PulumiCloudStream): + """Organization team stacks stream.""" + + name = "organization_team_stacks" + path = "/api/orgs/{org_name}/teams/{team_name}" + primary_keys: t.Sequence[str] = [ + "org_name", + "team_name", + "project_name", + "stack_name", + ] + records_jsonpath = "$.stacks[*]" + + parent_stream_type = OrganizationTeams + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the team.", + ), + th.Property( + "team_name", + th.StringType, + description="The name of the team.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "permissions", + th.IntegerType, + description=( + "Permissions for the stack: " + "None = 0, Read = 101, Write = 102, Admin = 103." + ), + ), + ).to_dict() + + +class OrganizationTeamsEnvironments(PulumiCloudStream): + """Organization team environments stream.""" + + name = "organization_team_environments" + path = "/api/orgs/{org_name}/teams/{team_name}" + primary_keys: t.Sequence[str] = ["org_name", "team_name", "env_name"] + records_jsonpath = "$.environments[*]" + + parent_stream_type = OrganizationTeams + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the team.", + ), + th.Property( + "team_name", + th.StringType, + description="The name of the team.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project.", + ), + th.Property( + "env_name", + th.StringType, + description="The name of the environment.", + ), + th.Property( + "permission", + th.StringType, + description="Permissions for the environment.", + ), + ).to_dict() + + +class OrganizationTeamsAccessTokens(PulumiCloudStream): + """Organization team access tokens stream.""" + + name = "organization_team_access_tokens" + path = "/api/orgs/{org_name}/teams/{team_name}/tokens" + primary_keys: t.Sequence[str] = ["org_name", "team_name", "id"] + records_jsonpath = "$.tokens[*]" + + parent_stream_type = OrganizationTeams + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the team.", + ), + th.Property( + "team_name", + th.StringType, + description="The name of the team.", + ), + th.Property( + "id", + th.StringType, + description="The ID of the token.", + ), + th.Property( + "description", + th.StringType, + description="The description of the token.", + ), + th.Property( + "expires", + th.IntegerType, + description="The expiration time of the token.", + ), + th.Property( + "last_used", + th.IntegerType, + description="The time the token was last used.", + ), + th.Property("name", th.StringType, description="The name of the token"), + ).to_dict() + + +class OrganizationAccessTokens(_OrgPartitionedStream): + """Organization access tokens stream.""" + + name = "organization_access_tokens" + path = "/api/orgs/{org_name}/tokens" + primary_keys: t.Sequence[str] = ["org_name", "id"] + records_jsonpath = "$.tokens[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the token.", + ), + th.Property( + "id", + th.StringType, + description="The ID of the token.", + ), + th.Property( + "description", + th.StringType, + description="The description of the token.", + ), + th.Property( + "expires", + th.IntegerType, + description="The expiration time of the token.", + ), + th.Property( + "last_used", + th.IntegerType, + description="The time the token was last used.", + ), + th.Property("name", th.StringType, description="The name of the token"), + ).to_dict() + + +class OrganizationOidcIssuers(_OrgPartitionedStream): + """Organization OIDC issuers stream.""" + + name = "organization_oidc_issuers" + path = "/api/orgs/{org_name}/oidc/issuers" + primary_keys: t.Sequence[str] = ["org_name", "id"] + records_jsonpath = "$.oidcIssuers[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + ), + th.Property( + "id", th.StringType, description="The unique identifier for the Issuer." + ), + th.Property("name", th.StringType, description="The name of the Issuer."), + th.Property("url", th.StringType, description="The issuer URL."), + th.Property("issuer", th.StringType, description="The issuer URL"), + th.Property( + "created", + th.DateTimeType, + description="The timestamp when the Issuer was created.", + ), + th.Property( + "modified", + th.DateTimeType, + description="The timestamp when the Issuer was last modified.", + ), + ).to_dict() + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return { + "org_name": record["org_name"], + "issuer_id": record["id"], + } + + +class OrganizationOidcIssuersPolicies(PulumiCloudStream): + """OIDC Issuer Policy details Stream.""" + + name = "organization_oidc_issuers_policies" + path = "/api/orgs/{org_name}/auth/policies/oidcissuers/{issuer_id}" + primary_keys: t.Sequence[str] = ["org_name", "issuer_id", "id"] + parent_stream_type = OrganizationOidcIssuers + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + ), + th.Property( + "issuer_id", + th.StringType, + description="The unique identifier for the OIDC Issuer.", + ), + th.Property( + "id", th.StringType, description="The unique identifier for the policy." + ), + th.Property( + "version", th.IntegerType, description="The version number of the policy." + ), + th.Property( + "created", + th.DateTimeType, + description="The timestamp when the policy was created.", + ), + th.Property( + "modified", + th.DateTimeType, + description="The timestamp when the policy was last modified.", + ), + th.Property( + "policies", + th.ArrayType( + th.ObjectType( + th.Property( + "decision", + th.StringType, + description=( + "The decision made by the policy," + "'e.g., 'allow' or 'deny'." + ), + ), + th.Property( + "tokenType", + th.StringType, + description="The type of token associated with the policy.", + ), + th.Property( + "authorizedPermissions", + th.ArrayType(th.StringType), + description="The permissions authorized by the policy.", + ), + th.Property( + "rules", + th.ObjectType( + th.Property( + "*", # Wildcard to allow for any key + th.StringType, + ) + ), + description="Dynamic set of rules applied by the policy.", + ), + ) + ), + description="List of policies within the OIDC Issuer.", + ), + ).to_dict() + + +class OrganizationAgentPools(_OrgPartitionedStream): + """Organization Agent Pools Stream.""" + + name = "organization_agent_pools" + path = "/api/orgs/{org_name}/agent-pools" + primary_keys: t.Sequence[str] = ["org_name", "id"] + records_jsonpath = "$.agentPools[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the Agent Pool organization.", + ), + th.Property( + "created", + th.IntegerType, + description=( + "The timestamp when the Agent Pool " + "was created, in milliseconds - epoch." + ), + ), + th.Property( + "id", th.StringType, description="The unique identifier for the Agent Pool." + ), + th.Property("name", th.StringType, description="The Agent Pool name."), + th.Property( + "description", th.StringType, description="The Agent Pool description." + ), + th.Property( + "last_seen", + th.IntegerType, + description=( + "The timestamp when the Agent Pool " + "was seen for the last time, in milliseconds - epoch." + ), + ), + th.Property( + "status", th.StringType, description="The current status of the Agent Pool." + ), + th.Property( + "last_deployment", + th.StringType, + description="The last deployment associated with the Agent Pool.", + required=False, + ), + ).to_dict() diff --git a/tap_pulumi_cloud/policies.py b/tap_pulumi_cloud/policies.py new file mode 100644 index 0000000..87cf1a8 --- /dev/null +++ b/tap_pulumi_cloud/policies.py @@ -0,0 +1,309 @@ +"""Stream type classes for tap-pulumi-cloud.""" + +from __future__ import annotations + +import typing as t + +from singer_sdk import typing as th + +from tap_pulumi_cloud.client import PulumiCloudStream, _OrgPartitionedStream + + +class PolicyGroupsList(_OrgPartitionedStream): + """Auxiliar stream to get Organization Policy Groups names.""" + + name = "policy_groups_list" + path = "/api/orgs/{org_name}/policygroups" + primary_keys: t.Sequence[str] = ["org_name", "name"] + records_jsonpath = "$.policyGroups[*]" + selected_by_default = False + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + ), + th.Property( + "name", + th.StringType, + ), + th.Property( + "is_org_default", + th.BooleanType, + ), + th.Property( + "num_stacks", + th.IntegerType, + ), + th.Property( + "num_enabled_policy_packs", + th.IntegerType, + ), + ).to_dict() + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return { + "policy_group_name": record["name"], + "org_name": record["org_name"], + "num_enabled_policy_packs": record["num_enabled_policy_packs"], + "num_stacks": record["num_stacks"], + } + + +class PolicyGroups(PulumiCloudStream): + """Organization Policy Groups.""" + + name = "policy_groups" + path = "/api/orgs/{org_name}/policygroups/{policy_group_name}" + primary_keys: t.Sequence[str] = ["org_name", "policy_group_name"] + + parent_stream_type = PolicyGroupsList + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The Organization name.", + ), + th.Property( + "policy_group_name", + th.StringType, + description="The Policy group name.", + ), + th.Property( + "num_stacks", + th.IntegerType, + description="The amount of stacks asociated to the policy group.", + ), + th.Property( + "num_enabled_policy_packs", + th.IntegerType, + description="The amount of enabled Policy Packs in the Policy Group .", + ), + th.Property( + "is_org_default", + th.BooleanType, + ), + th.Property( + "applied_policy_packs", + th.ArrayType( + th.ObjectType( + th.Property("name", th.StringType), + th.Property("displayName", th.StringType), + th.Property("version", th.IntegerType), + th.Property("versionTag", th.StringType), + th.Property( + "config", + th.ObjectType( + th.Property( + "all", + th.ObjectType( + th.Property("enforcementLevel", th.StringType) + ), + ), + th.Property( + "prohibited-public-internet", + th.ObjectType( + th.Property("enforcementLevel", th.StringType) + ), + ), + th.Property( + "s3-bucket-replication-enabled", + th.ObjectType( + th.Property("enforcementLevel", th.StringType) + ), + ), + th.Property( + "s3-no-public-read", + th.ObjectType( + th.Property("enforcementLevel", th.StringType) + ), + ), + ), + ), + ) + ), + description="Policy Packs list with configuration details.", + ), + ).to_dict() + + +class PolicyPacks(_OrgPartitionedStream): + """Policy Packs, versions and version tags.""" + + path = "/api/orgs/{org_name}/policypacks" + name = "policy_packs" + primary_keys: t.Sequence[str] = ["org_name", "name"] + records_jsonpath = "$.policyPacks[*]" + selected_by_default = False + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization.", + ), + th.Property( + "name", + th.StringType, + description="The name of the policy pack.", + ), + th.Property( + "display_name", + th.StringType, + description="The display name of the policy pack.", + ), + th.Property( + "versions", + th.ArrayType(th.IntegerType), + description="List of versions available for the policy pack.", + ), + th.Property( + "version_tags", + th.ArrayType(th.StringType), + description="List of version tags corresponding to the versions.", + ), + ).to_dict() + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return { + "policy_pack_name": record["name"], + "org_name": record["org_name"], + } + + +class LatestPolicyPacks(PulumiCloudStream): + """Latest Policy Pack with complete Policy details.""" + + name = "policy_pack_detailed" + path = "/api/orgs/{org_name}/policypacks/{policy_pack_name}/latest" + + """Version is included in the primary key, so when a new version is created, + the latest status of the older versions will be retained.""" + primary_keys: t.Sequence[str] = ["org_name", "policy_pack_name", "version"] + + parent_stream_type = PolicyPacks + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization.", + ), + th.Property( + "policy_pack_name", + th.StringType, + description="The name of the policy pack.", + ), + th.Property( + "display_name", + th.StringType, + description="The display name of the policy pack.", + ), + th.Property( + "version", th.IntegerType, description="The version of the policy pack." + ), + th.Property( + "version_tag", + th.StringType, + description="The version tag of the policy pack.", + ), + th.Property( + "policies", + th.ArrayType( + th.ObjectType( + th.Property( + "name", th.StringType, description="The name of the policy." + ), + th.Property( + "displayName", + th.StringType, + description="The display name of the policy.", + ), + th.Property( + "description", + th.StringType, + description="A description of the policy.", + ), + th.Property( + "enforcementLevel", + th.StringType, + description="The enforcement level of the policy.", + ), + th.Property( + "message", + th.StringType, + description="The message associated with the policy.", + ), + th.Property( + "configSchema", + th.ObjectType( + th.Property( + "properties", + th.ObjectType( + th.Property( + "enforcementLevel", + th.ObjectType( + th.Property( + "enum", + th.ArrayType(th.StringType), + description=( + "possible " "enforcement levels." + ), + ), + th.Property( + "type", + th.StringType, + description=( + "The type of " + "the enforcement Level." + ), + ), + ), + ) + ), + ), + th.Property( + "type", + th.StringType, + description="The type of the config schema.", + ), + ), + description="Configuration schema for the policy.", + ), + ) + ), + description="List of policies within the policy pack.", + ), + th.Property( + "applied", + th.BooleanType, + description="Indicates whether the policy pack is applied.", + ), + ).to_dict() diff --git a/tap_pulumi_cloud/rum.py b/tap_pulumi_cloud/rum.py new file mode 100644 index 0000000..a0e8878 --- /dev/null +++ b/tap_pulumi_cloud/rum.py @@ -0,0 +1,30 @@ +"""Stream type classes for tap-pulumi-cloud.""" + +from __future__ import annotations + +import typing as t + +from singer_sdk import typing as th + +from tap_pulumi_cloud.client import _OrgPartitionedStream + + +class RumUsageDaily(_OrgPartitionedStream): + """RUM Usage Stream.""" + + name = "daily_rum_usage" + path = "/api/orgs/{org_name}/resources/summary?granularity=daily&lookbackDays=365" + primary_keys: t.Sequence[str] = ["org_name", "year", "month", "day"] + records_jsonpath = "$.summary[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + ), + th.Property("year", th.IntegerType, description="The year of the RUM usage."), + th.Property("month", th.IntegerType, description="The month of the RUM usage."), + th.Property("day", th.IntegerType, description="The day of the RUM usage."), + th.Property("resources", th.IntegerType, description="Daily RUM usage."), + th.Property("resource_hours", th.IntegerType, description="Hourly RUM usage."), + ).to_dict() diff --git a/tap_pulumi_cloud/stacks.py b/tap_pulumi_cloud/stacks.py new file mode 100644 index 0000000..044e72e --- /dev/null +++ b/tap_pulumi_cloud/stacks.py @@ -0,0 +1,970 @@ +"""Stream type classes for tap-pulumi-cloud.""" + +from __future__ import annotations + +import typing as t + +from singer_sdk import typing as th + +from tap_pulumi_cloud.client import PulumiCloudStream, _OrgPartitionedStream + + +class Stacks(_OrgPartitionedStream): + """Users stream.""" + + name = "stacks" + path = "/api/user/stacks" + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name"] + records_jsonpath = "$.stacks[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "last_update", + th.IntegerType, + description="The last time the stack was updated.", + ), + th.Property( + "resource_count", + th.IntegerType, + description="The number of resources in the stack.", + ), + ).to_dict() + + def get_url_params( + self, + context: dict | None, + next_page_token: str | None, + ) -> dict[str, t.Any]: + """Get URL query parameters. + + Args: + context: Stream sync context. + next_page_token: Next offset. + + Returns: + A dictionary of URL query parameters. + """ + params = super().get_url_params(context, next_page_token) + + if context: + params["organization"] = context["org_name"] + + return params + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return { + "org_name": record["org_name"], + "project_name": record["project_name"], + "stack_name": record["stack_name"], + } + + +class StackDetails(PulumiCloudStream): + """Stack details stream.""" + + name = "stack_details" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}" + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name"] + parent_stream_type = Stacks + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property( + "current_operation", + th.ObjectType( + th.Property( + "kind", + th.StringType, + description="The kind of operation.", + ), + th.Property( + "author", + th.StringType, + description="The author of the operation.", + ), + th.Property( + "started", + th.IntegerType, + description="The time the operation started.", + ), + ), + description="The name of the current operation being ran.", + ), + th.Property( + "active_update", + th.StringType, + description="The ID of the active update.", + ), + th.Property( + "tags", + th.ObjectType(), + description="The tags associated with the stack.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "version", + th.IntegerType, + description="The ID of the update.", + ), + ).to_dict() + + +class StackUpdates(PulumiCloudStream): + """Stack updates stream.""" + + name = "stack_updates" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}/updates" + primary_keys: t.Sequence[str] = [ + "org_name", + "project_name", + "stack_name", + "version", + ] + records_jsonpath = "$.updates[*]" + + parent_stream_type = Stacks + + def get_url_params( + self, + context: dict | None, + next_page_token: str | None, + ) -> dict[str, t.Any]: + """Get URL query parameters. + + Args: + context: Stream sync context. + next_page_token: Next offset. + + Returns: + A dictionary of URL query parameters. + """ + params = super().get_url_params(context, next_page_token) + + if context: + params["output-type"] = "service" + + return params + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "info", + th.ObjectType( + th.Property( + "kind", + th.StringType, + description="The kind of update.", + ), + th.Property( + "start_time", + th.IntegerType, + description="The time the update started.", + ), + th.Property( + "message", + th.StringType, + description="The message associated with the update.", + ), + th.Property( + "environment", + th.ObjectType(), + description="The environment configuration present at the update.", + ), + th.Property( + "config", + th.ObjectType(), + description="The config associated with the update.", + ), + th.Property( + "result", + th.StringType, + description="The result of the update.", + ), + th.Property( + "end_time", + th.IntegerType, + description="The time the update ended.", + ), + th.Property( + "resource_changes", + th.ObjectType(), + description="The resource changes associated with the update.", + ), + ), + description="The information associated with the update.", + ), + th.Property( + "update_id", + th.StringType, + description="The ID of the update.", + ), + th.Property( + "github_commit_info", + th.ObjectType( + th.Property( + "slug", + th.StringType, + description="The slug of the commit.", + ), + th.Property( + "sha", + th.StringType, + description="The SHA of the commit.", + ), + th.Property( + "url", + th.StringType, + description="The URL of the commit.", + ), + th.Property( + "author", + th.ObjectType( + th.Property( + "name", + th.StringType, + description="The name of the author.", + ), + th.Property( + "github_login", + th.StringType, + description="The GitHub login of the author.", + ), + th.Property( + "avatar_url", + th.StringType, + description="The avatar URL of the author.", + ), + ), + description=( + "The information associated " "with the author of the commit." + ), + ), + ), + description="The information associated with the GitHub commit.", + ), + th.Property( + "version", + th.IntegerType, + description="The numeric sequence of the update.", + ), + th.Property( + "latest_version", + th.IntegerType, + description="The latest version for this stack.", + ), + th.Property( + "requested_by", + th.ObjectType( + th.Property( + "name", + th.StringType, + description="The name of the requester.", + ), + th.Property( + "github_login", + th.StringType, + description="The GitHub login of the requester.", + ), + th.Property( + "avatar_url", + th.StringType, + description="The avatar URL of the requester.", + ), + ), + description="The information associated with the requester.", + ), + th.Property( + "policy_packs", + th.ArrayType( + th.ObjectType( + th.Property( + "name", + th.StringType, + description="The name of the policy pack.", + ), + th.Property( + "display_name", + th.StringType, + description="The display name of the policy pack.", + ), + th.Property( + "version", + th.IntegerType, + description="The version of the policy pack.", + ), + th.Property( + "version_tag", + th.StringType, + description="The version tag of the policy pack.", + ), + th.Property( + "config", + th.ObjectType(), + description="The configuration of the policy pack.", + ), + ), + ), + description="The policy packs associated with the update.", + ), + ).to_dict() + + +class StackPreviews(StackUpdates): + """Stack previews stream.""" + + name = "stack_previews" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}/updates/latest/previews" + primary_keys: t.Sequence[str] = [ + "org_name", + "project_name", + "stack_name", + "version", + ] + records_jsonpath = "$.updates[*]" + tolerated_http_errors: t.Sequence[int] = [504] + + parent_stream_type = Stacks + + # Schema same as StackUpdates, inherited + + +class StackResources(PulumiCloudStream): + """Stack resources stream.""" + + name = "stack_resources" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}/export" + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name", "urn"] + records_jsonpath = "$.deployment.resources[*]" + + parent_stream_type = Stacks + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "urn", + th.StringType, + description="The URN of the resource.", + ), + th.Property( + "type", + th.StringType, + description="The type of the resource.", + ), + th.Property( + "id", + th.StringType, + description="The ID of the resource.", + ), + th.Property( + "custom", + th.BooleanType, + description=( + "Is it a custom resource?; a cloud resource " + "managed by a resource provider such as AWS," + "Microsoft Azure, Google Cloud or Kubernetes." + ), + ), + th.Property( + "created", + th.StringType, + description="The time the resource was created.", + ), + th.Property( + "modified", + th.StringType, + description="The time the resource was last modified.", + ), + th.Property( + "inputs", + th.ObjectType(), + description="The inputs used for this resource.", + ), + th.Property( + "outputs", + th.ObjectType(), + description="The outputs generated by this resource.", + ), + th.Property( + "protect", + th.BooleanType, + description="The resource is protected for deletion", + ), + th.Property( + "dependencies", + th.ArrayType(th.StringType), + description="The dependencies of the resource.", + ), + th.Property( + "parent", + th.StringType, + description="Parent resource of this resource.", + ), + th.Property( + "property_dependencies", + th.ObjectType(), + description="The property dependencies of the resource.", + ), + ).to_dict() + + +class StackPolicyGroups(PulumiCloudStream): + """Stack policy groups stream.""" + + name = "stack_policy_groups" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}/policygroups" + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name", "name"] + records_jsonpath = "$.policyGroups[*]" + + parent_stream_type = Stacks + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "name", + th.StringType, + description="The name of the policy group.", + ), + th.Property( + "is_org_default", + th.BooleanType, + description="Is the policy group the default for the organization.", + ), + th.Property( + "num_stacks", + th.IntegerType, + description="The number of stacks the policy group is applied to.", + ), + th.Property( + "num_enabled_policy_packs", + th.IntegerType, + description="The number of policy packs enabled in the policy group.", + ), + ).to_dict() + + +class StackPolicyPacks(PulumiCloudStream): + """Stack policy groups stream.""" + + name = "stack_policy_packs" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}/policypacks" + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name", "name"] + records_jsonpath = "$.requiredPolicies[*]" + + parent_stream_type = Stacks + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "name", + th.StringType, + description="The name of the policy group.", + ), + th.Property( + "version", + th.IntegerType, + description="Version of the policy pack applied to this stack.", + ), + th.Property( + "versionTag", + th.StringType, + description="Version tag of the policy pack applied to this stack.", + ), + th.Property( + "displayName", + th.StringType, + description="Display name of the policy pack applied to this stack.", + ), + th.Property( + "packLocation", + th.StringType, + description="Location of the policy pack applied to this stack.", + ), + th.Property( + "config", + th.ObjectType(), + description="The configuration of the policy pack applied to this stack.", + ), + ).to_dict() + + +class StackDeployments(PulumiCloudStream): + """Stack deployments stream.""" + + name = "stack_deployments" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}/deployments" + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name", "id"] + records_jsonpath = "$.deployments[*]" + + parent_stream_type = Stacks + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "id", + th.StringType, + description="The ID of the deployment.", + ), + th.Property( + "created", + th.StringType, + description="The time the deployment was created.", + ), + th.Property( + "modified", + th.StringType, + description="The time the deployment was last modified.", + ), + th.Property( + "status", + th.StringType, + description="The status of the deployment.", + ), + th.Property( + "version", + th.IntegerType, + description="The version of the deployment.", + ), + th.Property( + "requested_by", + th.ObjectType( + th.Property( + "name", + th.StringType, + description="The name of the requester.", + ), + th.Property( + "github_login", + th.StringType, + description="The GitHub login of the requester.", + ), + th.Property( + "avatar_url", + th.StringType, + description="The avatar URL of the requester.", + ), + th.Property( + "email", + th.StringType, + description="The email of the requester.", + ), + ), + description="The information associated with the requester.", + ), + th.Property( + "paused", + th.BooleanType, + description="Is the deployment paused.", + ), + th.Property( + "pulumi_operation", + th.StringType, + description="The operation performed in the deployment.", + ), + th.Property( + "updates", + th.ArrayType( + th.ObjectType( + th.Property( + "id", + th.StringType, + description="The ID of the update.", + ), + th.Property( + "version", + th.IntegerType, + description="The version of the update.", + ), + th.Property( + "start_time", + th.IntegerType, + description="The time the update started.", + ), + th.Property( + "end_time", + th.IntegerType, + description="The time the update ended.", + ), + th.Property( + "result", + th.StringType, + description="The result of the update.", + ), + th.Property( + "kind", + th.StringType, + description="The kind of update.", + ), + th.Property( + "message", + th.StringType, + description="The message associated with the update.", + ), + th.Property( + "environment", + th.ObjectType(), + description=( + "The environment configuration " "present at the update." + ), + ), + ), + ), + description="The updates associated with the deployment.", + ), + th.Property( + "jobs", + th.ArrayType( + th.ObjectType( + th.Property( + "status", + th.StringType, + description="The status of the job.", + ), + th.Property( + "started", + th.StringType, + description="The time the job started.", + ), + th.Property( + "last_updated", + th.StringType, + description="The time the job was last updated.", + ), + th.Property( + "steps", + th.ArrayType( + th.ObjectType( + th.Property( + "name", + th.StringType, + description="The name of the step.", + ), + th.Property( + "status", + th.StringType, + description="The status of the step.", + ), + th.Property( + "started", + th.StringType, + description="The time the step started.", + ), + th.Property( + "last_updated", + th.StringType, + description="The time the step was last updated.", + ), + ), + ), + description="The steps of the job.", + ), + ), + ), + description="The jobs associated with the deployment.", + ), + th.Property( + "initiator", + th.StringType, + description="The initiator of the deployment.", + ), + ).to_dict() + + +class StackSchedules(PulumiCloudStream): + """Stack schedules stream.""" + + name = "stack_schedules" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}/deployments/schedules" + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name"] + parent_stream_type = Stacks + records_jsonpath = "$.schedules[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "id", + th.StringType, + description="The unique identifier for the scheduled task.", + ), + th.Property( + "org_id", + th.StringType, + description="The organization ID associated with the task.", + ), + th.Property( + "schedule_cron", + th.StringType, + description="The cron expression defining the schedule for the task.", + ), + th.Property( + "next_execution", + th.DateTimeType, + description="The timestamp for the next scheduled execution.", + ), + th.Property( + "paused", + th.BooleanType, + description="Indicates whether the task is paused.", + ), + th.Property( + "kind", th.StringType, description="The kind of task, e.g., 'deployment'." + ), + th.Property( + "definition", + th.ObjectType( + th.Property( + "programID", + th.StringType, + description="The ID of the program associated with the task.", + ), + th.Property( + "request", + th.ObjectType( + th.Property( + "inheritSettings", + th.BooleanType, + description=( + "Indicates whether to inherit " + "settings from the program." + ), + ), + th.Property( + "operation", + th.StringType, + description=( + "The operation to be performed, " + "e.g., 'detect-drift'." + ), + ), + th.Property( + "operationContext", + th.ObjectType( + th.Property( + "*", # Wildcard to allow for any key + th.ObjectType( + th.Property( + "*", # Wildcard to allow for any key + th.StringType, + ) + ), + ) + ), + ), + ), + ), + ), + description="Definition of the scheduled.", + ), + th.Property( + "created", + th.DateTimeType, + description="The timestamp when the task was created.", + ), + th.Property( + "modified", + th.DateTimeType, + description="The timestamp when the task was last modified.", + ), + th.Property( + "lastExecuted", + th.DateTimeType, + description="The timestamp when the task was last executed.", + ), + ).to_dict() + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return { + "org_name": record["org_name"], + "project_name": record["project_name"], + "stack_name": record["stack_name"], + "scheduled_action_id": record["id"], + } + + +class StackScheduledDeploymentHistory(PulumiCloudStream): + """Stack schedules deployment history stream.""" + + name = "stack_schedules_deployment_history" + path = ( + "/a(pi/stacks/{org_name}/{project_name}/{stack_name}" + "/deployments/schedules/{scheduled_action_id}/history)" + ) + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name", "id"] + parent_stream_type = StackSchedules + records_jsonpath = "$.scheduleHistoryEvents[*]" + + schema = th.PropertiesList( + th.Property( + "id", + th.StringType, + description="The unique identifier for the execution record.", + ), + th.Property( + "scheduled_action_id", + th.StringType, + description=( + "The ID of the scheduled action " "associated with this execution." + ), + ), + th.Property( + "executed", + th.DateTimeType, + description="The timestamp when the scheduled action was executed.", + ), + th.Property( + "version", + th.IntegerType, + description="The version number of the execution.", + ), + th.Property( + "result", th.StringType, description="The result of the execution." + ), + th.Property( + "org_name", + th.StringType, + description="The name of the organization that owns the stack.", + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project that contains the stack.", + ), + th.Property("stack_name", th.StringType, description="The name of the stack."), + ).to_dict() diff --git a/tap_pulumi_cloud/streams.py b/tap_pulumi_cloud/streams.py deleted file mode 100644 index bef05b9..0000000 --- a/tap_pulumi_cloud/streams.py +++ /dev/null @@ -1,292 +0,0 @@ -"""Stream type classes for tap-pulumi-cloud.""" - -from __future__ import annotations - -import typing as t - -from singer_sdk import typing as th - -from tap_pulumi_cloud.client import PulumiCloudStream - - -class _OrgPartitionedStream(PulumiCloudStream): - """Base class for streams that are partitioned by organization.""" - - @property - def partitions(self) -> list[dict] | None: - """List of organizations to sync.""" - return [{"org_name": org} for org in self.config["organizations"]] - - -class Stacks(_OrgPartitionedStream): - """Users stream.""" - - name = "stacks" - path = "/api/user/stacks" - primary_keys = ["org_name", "project_name", "stack_name"] - records_jsonpath = "$.stacks[*]" - - schema = th.PropertiesList( - th.Property( - "org_name", - th.StringType, - description="The name of the organization that owns the stack.", - ), - th.Property( - "project_name", - th.StringType, - description="The name of the project that contains the stack.", - ), - th.Property( - "stack_name", - th.StringType, - description="The name of the stack.", - ), - th.Property( - "last_update", - th.IntegerType, - description="The last time the stack was updated.", - ), - th.Property( - "resource_count", - th.IntegerType, - description="The number of resources in the stack.", - ), - ).to_dict() - - def get_url_params( - self, - context: dict | None, - next_page_token: str | None, - ) -> dict[str, t.Any]: - """Get URL query parameters. - - Args: - context: Stream sync context. - next_page_token: Next offset. - - Returns: - A dictionary of URL query parameters. - """ - params = super().get_url_params(context, next_page_token) - - if context: - params["organization"] = context["org_name"] - - return params - - def get_child_context( - self, - record: dict, - context: dict | None, # noqa: ARG002 - ) -> dict | None: - """Return a context object for child streams. - - Args: - record: A record from this stream. - context: The stream sync context. - - Returns: - A context object for child streams. - """ - return { - "org_name": record["org_name"], - "project_name": record["project_name"], - "stack_name": record["stack_name"], - } - - -class StackUpdates(PulumiCloudStream): - """Stack updates stream.""" - - name = "stack_updates" - path = "/api/stacks/{org_name}/{project_name}/{stack_name}/updates" - primary_keys = ["org_name", "project_name", "stack_name", "version"] - records_jsonpath = "$.updates[*]" - - parent_stream_type = Stacks - - schema = th.PropertiesList( - th.Property( - "version", - th.IntegerType, - description="The ID of the update.", - ), - th.Property( - "org_name", - th.StringType, - description="The name of the organization that owns the stack.", - ), - th.Property( - "project_name", - th.StringType, - description="The name of the project that contains the stack.", - ), - th.Property( - "stack_name", - th.StringType, - description="The name of the stack.", - ), - th.Property( - "start_time", - th.IntegerType, - description="The time the update started.", - ), - th.Property( - "end_time", - th.IntegerType, - description="The time the update ended.", - ), - th.Property( - "kind", - th.StringType, - description="The kind of update.", - ), - th.Property( - "message", - th.StringType, - description="The message associated with the update.", - ), - th.Property( - "environment", - th.ObjectType(), - description="The environment associated with the update.", - ), - th.Property( - "config", - th.ObjectType(), - description="The config associated with the update.", - ), - th.Property( - "result", - th.StringType, - description="The result of the update.", - ), - th.Property("resource_changes", th.ObjectType()), - th.Property("resource_count", th.IntegerType), - ).to_dict() - - -class OrganizationMembers(_OrgPartitionedStream): - """Organization members stream.""" - - name = "organization_members" - path = "/api/orgs/{org_name}/members" - primary_keys = ["org_name", "user_name"] - records_jsonpath = "$.members[*]" - - schema = th.PropertiesList( - th.Property( - "org_name", - th.StringType, - description="The name of the organization that owns the stack.", - ), - th.Property( - "role", - th.StringType, - description="The role of the user in the organization.", - ), - th.Property( - "user_name", - th.StringType, - description="The name of the user.", - ), - th.Property( - "user", - th.ObjectType( - th.Property( - "github_login", - th.StringType, - description="The GitHub login of the user.", - ), - th.Property( - "avatar_url", - th.StringType, - description="The URL of the user's avatar.", - ), - ), - description="The user object.", - ), - th.Property( - "created", - th.DateTimeType, - description="The time the user was added to the organization.", - ), - th.Property( - "known_to_pulumi", - th.BooleanType, - ), - th.Property( - "virtual_admin", - th.BooleanType, - ), - ).to_dict() - - def get_url_params( - self, - context: dict | None, - next_page_token: str | None, - ) -> dict[str, t.Any]: - """Return a dictionary of URL query parameters. - - Args: - context: The stream sync context. - next_page_token: A token for the next page of results. - - Returns: - A dictionary of URL query parameters. - """ - params = super().get_url_params(context, next_page_token) - params["type"] = "backend" - return params - - def post_process(self, row: dict, context: dict | None = None) -> dict | None: - """Post-process a row. - - Args: - row: A row. - context: The stream sync context. - - Returns: - The processed row. - """ - new_row = super().post_process(row, context) - if new_row: - new_row["user_name"] = new_row["user"].pop("name") - return new_row - - -class OrganizationTeams(_OrgPartitionedStream): - """Organization teams stream.""" - - name = "organization_teams" - path = "/api/orgs/{org_name}/teams" - primary_keys = ["org_name", "name"] - records_jsonpath = "$.teams[*]" - - schema = th.PropertiesList( - th.Property( - "org_name", - th.StringType, - ), - th.Property( - "kind", - th.StringType, - ), - th.Property( - "name", - th.StringType, - ), - th.Property( - "display_name", - th.StringType, - ), - th.Property( - "description", - th.StringType, - ), - th.Property( - "user_role", - th.StringType, - ), - ).to_dict() diff --git a/tap_pulumi_cloud/tap.py b/tap_pulumi_cloud/tap.py index 5edda18..5055aac 100644 --- a/tap_pulumi_cloud/tap.py +++ b/tap_pulumi_cloud/tap.py @@ -8,7 +8,15 @@ from singer_sdk import Stream, Tap from singer_sdk import typing as th -from tap_pulumi_cloud import streams +from tap_pulumi_cloud import ( + audit_logs, + environments, + organizations, + policies, + rum, + stacks, + webhooks, +) class TapPulumiCloud(Tap): @@ -33,6 +41,7 @@ class TapPulumiCloud(Tap): "start_date", th.DateTimeType, description="Earliest datetime to get data from", + required=True, ), th.Property( "requests_cache", @@ -78,8 +87,35 @@ def discover_streams(self) -> list[Stream]: A list of Pulumi Cloud streams. """ return [ - streams.Stacks(tap=self), - streams.StackUpdates(tap=self), - streams.OrganizationMembers(tap=self), - streams.OrganizationTeams(tap=self), + stacks.Stacks(tap=self), + stacks.StackDetails(tap=self), + stacks.StackUpdates(tap=self), + stacks.StackResources(tap=self), + stacks.StackPolicyGroups(tap=self), + stacks.StackPolicyPacks(tap=self), + stacks.StackPreviews(tap=self), + stacks.StackDeployments(tap=self), + stacks.StackSchedules(tap=self), + stacks.StackScheduledDeploymentHistory(tap=self), + organizations.OrganizationMembers(tap=self), + organizations.OrganizationTeams(tap=self), + # organizations.OrganizationAccessTokens(tap=self), + organizations.OrganizationTeamsMembers(tap=self), + organizations.OrganizationTeamsStacks(tap=self), + organizations.OrganizationTeamsEnvironments(tap=self), + organizations.OrganizationTeamsAccessTokens(tap=self), + # organizations.OrganizationOidcIssuers(tap=self), + # organizations.OrganizationOidcIssuersPolicies(tap=self), + # organizations.OrganizationAgentPools(tap=self), + policies.PolicyGroupsList(tap=self), + policies.PolicyGroups(tap=self), + policies.PolicyPacks(tap=self), + policies.LatestPolicyPacks(tap=self), + rum.RumUsageDaily(tap=self), + environments.Environments(tap=self), + # webhooks.OrganizationWebhooks(tap=self), + # webhooks.OrganizationWebhookDeliveries(tap=self), + webhooks.StackWebhooks(tap=self), + webhooks.StackWebhookDeliveries(tap=self), + # audit_logs.AuditLogs(tap=self), ] diff --git a/tap_pulumi_cloud/webhooks.py b/tap_pulumi_cloud/webhooks.py new file mode 100644 index 0000000..55fd74a --- /dev/null +++ b/tap_pulumi_cloud/webhooks.py @@ -0,0 +1,254 @@ +"""Stream type classes for tap-pulumi-cloud.""" + +from __future__ import annotations + +import typing as t + +from singer_sdk import typing as th + +from tap_pulumi_cloud import stacks +from tap_pulumi_cloud.client import PulumiCloudStream, _OrgPartitionedStream + + +class OrganizationWebhooks(_OrgPartitionedStream): + """Stream Organization Webhooks.""" + + name = "organization_webhooks" + path = "/api/orgs/{org_name}/hooks" + primary_keys: t.Sequence[str] = ["org_name", "name"] + records_jsonpath = "$[*]" + + schema = th.PropertiesList( + th.Property( + "org_name", th.StringType, description="The name of the organization." + ), + th.Property("name", th.StringType, description="The name of the webhook."), + th.Property( + "display_name", + th.StringType, + description="The display name of the webhook.", + ), + th.Property( + "payload_url", + th.StringType, + description="The URL to which the webhook will send payloads.", + ), + th.Property( + "format", + th.StringType, + description="The format of the webhook payload, e.g. raw, slack, ms_teams.", + ), + th.Property( + "active", th.BooleanType, description="Whether the webhook is active." + ), + ).to_dict() + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return { + "webhook_name": record["name"], + "org_name": record["org_name"], + } + + +class OrganizationWebhookDeliveries(PulumiCloudStream): + """Organization Webhook deliveries stream.""" + + name = "organization_webhook_deliveries" + path = "/api/orgs/{org_name}/hooks/{webhook_name}/deliveries" + primary_keys: t.Sequence[str] = ["org_name", "webhook_name", "id"] + records_jsonpath = "$[*]" + + parent_stream_type = OrganizationWebhooks + + schema = th.PropertiesList( + th.Property( + "org_name", th.StringType, description="The name of the organization." + ), + th.Property( + "webhook_name", th.StringType, description="The name of the webhook." + ), + th.Property("id", th.StringType, description="The ID of the delivery."), + th.Property("kind", th.StringType, description="The kind of the delivery."), + th.Property( + "payload", th.StringType, description="The payload of the delivery." + ), + th.Property( + "timestamp", th.IntegerType, description="The timestamp of the delivery." + ), + th.Property( + "duration", th.IntegerType, description="The duration of the delivery." + ), + th.Property( + "request_url", th.StringType, description="The URL of the request." + ), + th.Property( + "request_headers", th.StringType, description="The headers of the request." + ), + th.Property( + "response_code", + th.IntegerType, + description="The response code of the delivery.", + ), + th.Property( + "response_headers", + th.StringType, + description="The headers of the response.", + ), + th.Property( + "response_body", th.StringType, description="The body of the response." + ), + ).to_dict() + + +class StackWebhooks(PulumiCloudStream): + """Stream Organization Webhooks.""" + + name = "stack_webhooks" + path = "/api/stacks/{org_name}/{project_name}/{stack_name}/hooks" + primary_keys: t.Sequence[str] = ["org_name", "project_name", "stack_name", "name"] + records_jsonpath = "$[*]" + + parent_stream_type = stacks.Stacks + + schema = th.PropertiesList( + th.Property( + "org_name", th.StringType, description="The name of the organization." + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property("name", th.StringType, description="The name of the webhook."), + th.Property( + "display_name", + th.StringType, + description="The display name of the webhook.", + ), + th.Property( + "payload_url", + th.StringType, + description="The URL to which the webhook will send payloads.", + ), + th.Property( + "format", + th.StringType, + description="The format of the webhook payload, e.g. raw, slack, ms_teams.", + ), + th.Property( + "filters", + th.ArrayType(th.StringType), + description="The filters for the webhook.", + ), + th.Property( + "active", th.BooleanType, description="Whether the webhook is active." + ), + ).to_dict() + + def get_child_context( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> dict | None: + """Return a context object for child streams. + + Args: + record: A record from this stream. + context: The stream sync context. + + Returns: + A context object for child streams. + """ + return { + "org_name": record["org_name"], + "project_name": record["project_name"], + "stack_name": record["stack_name"], + "webhook_name": record["name"], + } + + +class StackWebhookDeliveries(PulumiCloudStream): + """Stack Webhook deliveries stream.""" + + name = "stack_webhook_deliveries" + path = ( + "/api/stacks/{org_name}/{project_name}/{stack_name}" + "/hooks/{webhook_name}/deliveries" + ) + primary_keys: t.Sequence[str] = [ + "org_name", + "project_name", + "stack_name", + "webhook_name", + "id", + ] + records_jsonpath = "$[*]" + + parent_stream_type = StackWebhooks + + schema = th.PropertiesList( + th.Property( + "org_name", th.StringType, description="The name of the organization." + ), + th.Property( + "project_name", + th.StringType, + description="The name of the project.", + ), + th.Property( + "stack_name", + th.StringType, + description="The name of the stack.", + ), + th.Property( + "webhook_name", th.StringType, description="The name of the webhook." + ), + th.Property("id", th.StringType, description="The ID of the delivery."), + th.Property("kind", th.StringType, description="The kind of the delivery."), + th.Property( + "payload", th.StringType, description="The payload of the delivery." + ), + th.Property( + "timestamp", th.IntegerType, description="The timestamp of the delivery." + ), + th.Property( + "duration", th.IntegerType, description="The duration of the delivery." + ), + th.Property( + "request_url", th.StringType, description="The URL of the request." + ), + th.Property( + "request_headers", th.StringType, description="The headers of the request." + ), + th.Property( + "response_code", + th.IntegerType, + description="The response code of the delivery.", + ), + th.Property( + "response_headers", + th.StringType, + description="The headers of the response.", + ), + th.Property( + "response_body", th.StringType, description="The body of the response." + ), + ).to_dict()