Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Large overhaul adding a more complete view of all the different entities in Pulumi Cloud #185

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6aa29db
Split streams into orgs/stacks
sicarul Aug 12, 2024
9b0dba4
Merge pull request #1 from pulumi/branch-pablo
sicarul Aug 12, 2024
3a0f156
Add stack details and resources
sicarul Aug 12, 2024
4688cae
Merge pull request #2 from pulumi/update-stacks
sicarul Aug 12, 2024
62350d8
Add stack policy groups and policy packs
sicarul Aug 12, 2024
8e7b490
Merge pull request #3 from pulumi/add-stack-policies
sicarul Aug 12, 2024
d402558
add policy group streams
lucascrespo88 Aug 12, 2024
c798a46
rename policy group streams to remove organization prefix
lucascrespo88 Aug 12, 2024
d36ced2
Merge pull request #4 from pulumi/add_policy_pack_streams
lucascrespo88 Aug 12, 2024
3903044
Add stack previews and change updates to service format
sicarul Aug 13, 2024
9033232
add policy pack and enviroment streams
lucascrespo88 Aug 13, 2024
5db6bfe
Merge pull request #5 from pulumi/add-stack-updates-previews
sicarul Aug 13, 2024
2116f7b
fix environments description
lucascrespo88 Aug 13, 2024
08a1387
Merge pull request #6 from pulumi/add_policy_pack_streams
lucascrespo88 Aug 13, 2024
6fc7770
Add deployments and set default items per page to 100
sicarul Aug 13, 2024
82c19a8
Merge pull request #7 from pulumi/add-deployments
sicarul Aug 13, 2024
1a729f7
Add team details
sicarul Aug 13, 2024
a542b2b
Add team access tokens
sicarul Aug 13, 2024
e3d53dd
Merge pull request #8 from pulumi/add-team-details
sicarul Aug 13, 2024
815e406
Add webhooks
sicarul Aug 13, 2024
16feaff
added org name to envs and OIDC Issuers and RUM usage streams added
lucascrespo88 Aug 13, 2024
8147b48
fix commenting
lucascrespo88 Aug 13, 2024
91f5a68
fix space
lucascrespo88 Aug 13, 2024
c078a14
Merge pull request #9 from pulumi/add-webhooks
sicarul Aug 13, 2024
71409e1
Merge branch 'main' into add_issuers_streams
lucascrespo88 Aug 13, 2024
1252510
Merge pull request #10 from pulumi/add_issuers_streams
lucascrespo88 Aug 13, 2024
2407fd1
Add audit logs
sicarul Aug 13, 2024
e7a213e
Agent Pool Stream Added
lucascrespo88 Aug 13, 2024
6cd0126
Merge pull request #11 from pulumi/add-audit-logs
sicarul Aug 13, 2024
60a95e0
Merge pull request #12 from pulumi/add_agent_pool_stream
lucascrespo88 Aug 13, 2024
924c78b
Integrity fixes needed to target-duckdb to work
sicarul Aug 13, 2024
a5dad6a
Merge pull request #13 from pulumi/fixes-for-integrity
sicarul Aug 13, 2024
e02ca2d
Handle 504 gateway timeout error for stack previews endpoint
sicarul Aug 13, 2024
9b5c86e
Merge pull request #14 from pulumi/handle-504-on-stack-previews
sicarul Aug 13, 2024
7b61b21
add Stack Schedules and Stack Schedules deployment history Streams
lucascrespo88 Aug 14, 2024
e3e6627
Merge pull request #15 from pulumi/add_schedule_streams
lucascrespo88 Aug 14, 2024
b3d7845
Merge branch 'main' into main
sicarul Aug 14, 2024
4a003e8
Fix linting
sicarul Aug 14, 2024
e12f1c0
Merge pull request #16 from pulumi/fix-linting
sicarul Aug 14, 2024
4c3bbc4
Start date is now required
sicarul Aug 14, 2024
afa2a83
Let tests run
edgarrmondragon Aug 14, 2024
f5d43f7
Merge pull request #17 from MeltanoLabs/pr-185-suggestions
sicarul Aug 15, 2024
857d88a
fix resource hours field name
lucascrespo88 Aug 15, 2024
3109ef0
Merge pull request #19 from pulumi/fix_resource_hours_field
sicarul Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
ignore = DAR
ignore = DAR,W503
max-line-length = 88
docstring-convention = google
per-file-ignores =
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ jobs:
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- "3.13"
# - "3.9"
# - "3.10"
# - "3.11"
# - "3.12"
# - "3.13"

steps:
- name: Checkout code
Expand Down Expand Up @@ -58,6 +58,6 @@ jobs:
env:
TAP_PULUMI_CLOUD_TOKEN: ${{ secrets.TAP_PULUMI_CLOUD_TOKEN }}
TAP_PULUMI_CLOUD_ORGANIZATIONS: ${{ secrets.TAP_PULUMI_CLOUD_ORGANIZATIONS }}
TAP_PULUMI_CLOUD_START_DATE: ${{ secrets.TAP_PULUMI_CLOUD_START_DATE }}
TAP_PULUMI_CLOUD_START_DATE: "2023-01-01T00:00:00Z"
run: |
nox
8 changes: 7 additions & 1 deletion meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ plugins:
kind: password
label: API Token
description: API Token for Pulumi Cloud
sensitive: true
- name: requests_cache.enabled
kind: boolean
label: Enable Requests Cache
Expand All @@ -29,9 +30,14 @@ plugins:
kind: object
label: Requests Cache Config
description: Configuration for requests cache
repository: https://github.com/edgarrmondragon/tap-pulumi-cloud
- name: start_date
kind: date_iso8601
value: 2024-01-01T00:00:00+00:00
label: Start Date
description: Start date
config:
organizations: [meltano]
repository: https://github.com/edgarrmondragon/tap-pulumi-cloud
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
217 changes: 217 additions & 0 deletions tap_pulumi_cloud/audit_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
"""Stream type classes for tap-pulumi-cloud."""

from __future__ import annotations

import typing as t
from datetime import datetime, timezone
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from requests import Response
from singer_sdk.helpers.types import Context
from singer_sdk import metrics
from singer_sdk import typing as th
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseAPIPaginator

from tap_pulumi_cloud.client import _OrgPartitionedStream


class AuditLogsPaginator(BaseAPIPaginator[t.Optional[str]]):
"""Paginator class for APIs returning a pagination token in the response body."""

def __init__(
self,
jsonpath: str,
since: int,
*args: t.Any,
**kwargs: t.Any,
) -> None:
"""Create a new paginator.

Args:
jsonpath: A JSONPath expression.
since: Start date for the audit logs.
args: Paginator positional arguments for base class.
kwargs: Paginator keyword arguments for base class.
"""
super().__init__(None, *args, **kwargs)
self._jsonpath = jsonpath
self._since = since

def get_next(self, response: Response) -> str | None:
"""Get the next page token.

Args:
response: API response object.

Returns:
The next page token.
"""
all_matches = extract_jsonpath(self._jsonpath, response.json())
matched = next(all_matches, None)
if matched is None or int(matched) < self._since:
return None
return matched


class AuditLogs(_OrgPartitionedStream):
"""Stream Audit Logs."""

name = "audit_logs"
path = "/api/orgs/{org_name}/auditlogs"
primary_keys: t.Sequence[str] = ["org_name", "timestamp", "event", "description"]
records_jsonpath = "$.auditLogEvents[*]"
replication_key = "timestamp"
is_sorted = False

schema = th.PropertiesList(
th.Property(
"org_name", th.StringType, description="The name of the organization."
),
th.Property(
"timestamp",
th.DateTimeType,
description="The timestamp of the audit log event.",
),
th.Property(
"source_ip",
th.StringType,
description="The source IP of the audit log event.",
),
th.Property(
"event", th.StringType, description="The event of the audit log event."
),
th.Property(
"description",
th.StringType,
description="The description of the audit log event.",
),
th.Property(
"user",
th.ObjectType(
th.Property("name", th.StringType, description="The name of the user."),
th.Property(
"github_login",
th.StringType,
description="The GitHub login of the user.",
),
th.Property(
"avatar_url",
th.StringType,
description="The avatar URL of the user.",
),
),
description="The user of the audit log event.",
),
th.Property(
"token_id",
th.StringType,
description="The token id associated with this event.",
),
th.Property(
"token_name",
th.StringType,
description="The token name associated with this event.",
),
th.Property(
"req_org_admin",
th.BooleanType,
description="Required organization admin role.",
),
th.Property(
"req_stack_admin", th.BooleanType, description="Required stack admin role."
),
th.Property(
"auth_failure",
th.BooleanType,
description="Event was the result of an authentication check failure.",
),
).to_dict()

def get_new_paginator(self, context: Context | None) -> BaseAPIPaginator:
"""Get a fresh paginator for this API endpoint.

Returns:
A paginator instance.
"""
return AuditLogsPaginator(
self.next_page_token_jsonpath,
self.get_starting_timestamp(context).timestamp(),
)

def request_records(self, context: Context | None) -> t.Iterable[dict]:
"""Request records from REST endpoint(s), returning response records.

If pagination is detected, pages will be recursed automatically.

Args:
context: Stream partition or context dictionary.

Yields:
An item for every record in the response.
"""
paginator = self.get_new_paginator(context)
decorated_request = self.request_decorator(self._request)
pages = 0

with metrics.http_request_counter(self.name, self.path) as request_counter:
request_counter.context = context

while not paginator.finished:
prepared_request = self.prepare_request(
context,
next_page_token=paginator.current_value,
)
resp = decorated_request(prepared_request, context)
request_counter.increment()
self.update_sync_costs(prepared_request, resp, context)
records = iter(self.parse_response(resp))
try:
first_record = next(records)
except StopIteration:
self.logger.info(
"Pagination stopped after %d pages because no records were "
"found in the last response",
pages,
)
break
yield first_record
yield from records
pages += 1

paginator.advance(resp)

def get_url_params(
self,
context: dict | None,
next_page_token: str | None,
) -> dict[str, t.Any]:
"""Return a dictionary of URL query parameters.

Args:
context: The stream sync context.
next_page_token: A token for the next page of results.

Returns:
A dictionary of URL query parameters.
"""
params = {"pageSize": 100}
since = round(self.get_starting_timestamp(context).timestamp())
if next_page_token:
until = int(next_page_token)
else:
until = round(self.get_replication_key_signpost(context).timestamp())
params["startTime"] = since
params["endTime"] = until
return params

def post_process(
self,
row: dict,
context: dict | None = None,
) -> dict | None:
"""Post-process a row of data."""
row = super().post_process(row, context) or {}
row["timestamp"] = datetime.fromtimestamp(row["timestamp"], tz=timezone.utc)
return row
83 changes: 81 additions & 2 deletions tap_pulumi_cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

from __future__ import annotations

from typing import Any
import typing as t
from http import HTTPStatus
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
import requests
import humps
from singer_sdk import RESTStream
from singer_sdk.authenticators import APIKeyAuthenticator
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from singer_sdk.helpers._typing import TypeConformanceLevel


Expand All @@ -15,6 +20,7 @@ class PulumiCloudStream(RESTStream):

url_base = "https://api.pulumi.com"
next_page_token_jsonpath = "$.continuationToken" # noqa: S105
tolerated_http_errors: t.Sequence[int] = []

TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY

Expand Down Expand Up @@ -60,7 +66,7 @@ def get_url_params(
Returns:
Mapping of URL query parameters.
"""
params: dict = {}
params: dict = {"pageSize": 100}
if next_page_token:
params["continuationToken"] = next_page_token
return params
Expand All @@ -72,3 +78,76 @@ def post_process(
) -> dict | None:
"""Post-process a row of data."""
return humps.decamelize(row)

def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
"""Parse the response and return an iterator of result records.

Args:
response: A raw :class:`requests.Response`

Yields:
One item for every item found in the response.
"""
if response.status_code in self.tolerated_http_errors:
return []
return super().parse_response(response)

def validate_response(self, response: requests.Response) -> None:
"""Validate HTTP response.

Checks for error status codes and whether they are fatal or retriable.

In case an error is deemed transient and can be safely retried, then this
method should raise an :class:`singer_sdk.exceptions.RetriableAPIError`.
By default this applies to 5xx error codes, along with values set in:
:attr:`~singer_sdk.RESTStream.extra_retry_statuses`

In case an error is unrecoverable raises a
:class:`singer_sdk.exceptions.FatalAPIError`. By default, this applies to
4xx errors, excluding values found in:
:attr:`~singer_sdk.RESTStream.extra_retry_statuses`

Tap developers are encouraged to override this method if their APIs use HTTP
status codes in non-conventional ways, or if they communicate errors
differently (e.g. in the response body).

.. image:: ../images/200.png

Args:
response: A :class:`requests.Response` object.

Raises:
FatalAPIError: If the request is not retriable.
RetriableAPIError: If the request is retriable.
"""
if response.status_code in self.tolerated_http_errors:
msg = (
f"{response.status_code} Tolerated Status Code "
f"(Reason: {response.reason}) for path: {response.request.url}"
)
self.logger.info(msg)
return

if (
response.status_code in self.extra_retry_statuses
or response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR
):
msg = self.response_error_message(response)
raise RetriableAPIError(msg, response)

if (
HTTPStatus.BAD_REQUEST
<= response.status_code
< HTTPStatus.INTERNAL_SERVER_ERROR
):
msg = self.response_error_message(response)
raise FatalAPIError(msg)


class _OrgPartitionedStream(PulumiCloudStream):
"""Base class for streams that are partitioned by organization."""

@property
def partitions(self) -> list[dict] | None:
"""List of organizations to sync."""
return [{"org_name": org} for org in self.config["organizations"]]
Loading
Loading