From 95bae684befe1538de5a5bb867ffde2fa763fb20 Mon Sep 17 00:00:00 2001 From: "jose.vazquez" Date: Wed, 15 Jan 2025 15:32:45 +0100 Subject: [PATCH] Draft state machine --- ...tlas.mongodb.com_atlasnetworkpeerings.yaml | 10 +- .../translation/networkpeering/conversion.go | 38 +- .../networkpeering/networkpeering.go | 79 ++-- .../networkpeering/__debug_bin1495666336 | 0 .../networkpeering/networkpeering_test.go | 18 +- test/contract/networkpeering/story_test.go | 363 ++++++++++++++++++ test/helper/cloud/aws/vpc.go | 86 +++++ 7 files changed, 529 insertions(+), 65 deletions(-) create mode 100644 test/contract/networkpeering/__debug_bin1495666336 create mode 100644 test/contract/networkpeering/story_test.go diff --git a/config/crd/bases/atlas.mongodb.com_atlasnetworkpeerings.yaml b/config/crd/bases/atlas.mongodb.com_atlasnetworkpeerings.yaml index 9e7ce2528d..b1627df95e 100644 --- a/config/crd/bases/atlas.mongodb.com_atlasnetworkpeerings.yaml +++ b/config/crd/bases/atlas.mongodb.com_atlasnetworkpeerings.yaml @@ -218,8 +218,12 @@ spec: description: ContainerID records the ID of the container created by atlas for this peering type: string + error: + description: Error refers to the last error seen in the network peering + setup + type: string id: - description: ID recrods the identified of thr peer created by Atlas + description: ID recrods the identified of the peer created by Atlas type: string observedGeneration: description: |- @@ -227,6 +231,10 @@ spec: The Atlas Operator updates this field to the 'metadata.generation' as soon as it starts reconciliation of the resource. format: int64 type: integer + status: + description: Status describes the last status seen for the network + peering setup + type: string required: - conditions type: object diff --git a/internal/translation/networkpeering/conversion.go b/internal/translation/networkpeering/conversion.go index 03c9dba1da..3323ba6179 100644 --- a/internal/translation/networkpeering/conversion.go +++ b/internal/translation/networkpeering/conversion.go @@ -10,9 +10,33 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" ) +type AWSStatus struct { + ConnectionID string + VpcID string +} + type NetworkPeer struct { akov2.AtlasNetworkPeeringConfig - ID string + ID string + Status string + ErrorMessage string + AWSStatus AWSStatus +} + +func (np *NetworkPeer) Failed() bool { + return np.ErrorMessage != "" +} + +// AWS status: INITIATING PENDING_ACCEPTANCE FINALIZING AVAILABLE + +func (np *NetworkPeer) Provisioned() bool { + return (np.Provider == string(provider.ProviderAWS) && np.Status == "AVAILABLE") || + (np.Provider == string(provider.ProviderAzure) && np.Status == "azure ok status") || + (np.Provider == string(provider.ProviderGCP) && np.Status == "gcp ok status") +} + +func (np *NetworkPeer) Summary() string { + return fmt.Sprintf("status=%q error=%q", np.Status, np.ErrorMessage) } func NewNetworkPeer(id string, cfg *akov2.AtlasNetworkPeeringConfig) *NetworkPeer { @@ -92,6 +116,13 @@ func fromAtlasConnection(conn *admin.BaseNetworkPeeringConnectionSettings) (*Net VpcID: conn.GetVpcId(), }, }, + // Status and error messages + Status: conn.GetStatusName(), + ErrorMessage: conn.GetErrorStateName(), + AWSStatus: AWSStatus{ + ConnectionID: conn.GetConnectionId(), + VpcID: conn.GetVpcId(), + }, }, nil case provider.ProviderGCP: return &NetworkPeer{ @@ -104,6 +135,8 @@ func fromAtlasConnection(conn *admin.BaseNetworkPeeringConnectionSettings) (*Net NetworkName: conn.GetNetworkName(), }, }, + // Status and error messages + ErrorMessage: conn.GetErrorMessage(), }, nil case provider.ProviderAzure: return &NetworkPeer{ @@ -118,6 +151,9 @@ func fromAtlasConnection(conn *admin.BaseNetworkPeeringConnectionSettings) (*Net VNetName: conn.GetVnetName(), }, }, + // Status and error messages + Status: conn.GetStatus(), + ErrorMessage: conn.GetErrorState(), }, nil default: return nil, fmt.Errorf("unsupported provider %q", conn.GetProviderName()) diff --git a/internal/translation/networkpeering/networkpeering.go b/internal/translation/networkpeering/networkpeering.go index b9b2f4a205..51b6a20377 100644 --- a/internal/translation/networkpeering/networkpeering.go +++ b/internal/translation/networkpeering/networkpeering.go @@ -2,31 +2,32 @@ package networkpeering import ( "context" + "errors" "fmt" - "net/http" "go.mongodb.org/atlas-sdk/v20231115008/admin" - - "github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1/provider" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/paging" ) const ( pageSize = 100 ) +var ( + // ErrNotFound means an resource is missing + ErrNotFound = errors.New("not found") +) + type PeerConnectionsService interface { CreatePeer(ctx context.Context, projectID string, conn *NetworkPeer) (*NetworkPeer, error) - ListPeers(ctx context.Context, projectID string) ([]NetworkPeer, error) GetPeer(ctx context.Context, projectID, containerID string) (*NetworkPeer, error) + UpdatePeer(ctx context.Context, projectID string, conn *NetworkPeer) error DeletePeer(ctx context.Context, projectID, containerID string) error } type PeeringContainerService interface { CreateContainer(ctx context.Context, projectID string, container *ProviderContainer) (*ProviderContainer, error) GetContainer(ctx context.Context, projectID, containerID string) (*ProviderContainer, error) - ListContainers(ctx context.Context, projectID, providerName string) ([]ProviderContainer, error) + UpdateContainer(ctx context.Context, projectID string, container *ProviderContainer) error DeleteContainer(ctx context.Context, projectID, containerID string) error } @@ -62,6 +63,9 @@ func (np *networkPeeringService) CreatePeer(ctx context.Context, projectID strin func (np *networkPeeringService) GetPeer(ctx context.Context, projectID, containerID string) (*NetworkPeer, error) { atlasConn, _, err := np.peeringAPI.GetPeeringConnection(ctx, projectID, containerID).Execute() if err != nil { + if admin.IsErrorCode(err, "PEER_NOT_FOUND") { + return nil, ErrNotFound + } return nil, fmt.Errorf("failed to get network peer for container id %v: %w", containerID, err) } peer, err := fromAtlasConnection(atlasConn) @@ -71,32 +75,16 @@ func (np *networkPeeringService) GetPeer(ctx context.Context, projectID, contain return peer, nil } -func (np *networkPeeringService) ListPeers(ctx context.Context, projectID string) ([]NetworkPeer, error) { - var peersList []NetworkPeer - providers := []provider.ProviderName{provider.ProviderAWS, provider.ProviderAzure, provider.ProviderGCP} - for _, providerName := range providers { - peers, err := np.listPeersForProvider(ctx, projectID, providerName) - if err != nil { - return nil, fmt.Errorf("failed to list network peers for %s: %w", string(providerName), err) - } - peersList = append(peersList, peers...) +func (np *networkPeeringService) UpdatePeer(ctx context.Context, projectID string, conn *NetworkPeer) error { + atlasConnRequest, err := toAtlasConnection(conn) + if err != nil { + return fmt.Errorf("failed to convert updated peer to Atlas: %w", err) } - return peersList, nil -} - -func (np *networkPeeringService) listPeersForProvider(ctx context.Context, projectID string, providerName provider.ProviderName) ([]NetworkPeer, error) { - results, err := paging.ListAll(ctx, func(ctx context.Context, pageNum int) (paging.Response[admin.BaseNetworkPeeringConnectionSettings], *http.Response, error) { - p := &admin.ListPeeringConnectionsApiParams{ - GroupId: projectID, - ProviderName: admin.PtrString(string(providerName)), - } - return np.peeringAPI.ListPeeringConnectionsWithParams(ctx, p).PageNum(pageNum).Execute() - }) + _, _, err = np.peeringAPI.UpdatePeeringConnection(ctx, projectID, conn.ID, atlasConnRequest).Execute() if err != nil { - return nil, fmt.Errorf("failed to list network peers: %w", err) + return fmt.Errorf("failed to create network peer %v: %w", conn, err) } - - return fromAtlasConnectionList(results) + return nil } func (np *networkPeeringService) DeletePeer(ctx context.Context, projectID, peerID string) error { @@ -118,38 +106,25 @@ func (np *networkPeeringService) CreateContainer(ctx context.Context, projectID func (np *networkPeeringService) GetContainer(ctx context.Context, projectID, containerID string) (*ProviderContainer, error) { container, _, err := np.peeringAPI.GetPeeringContainer(ctx, projectID, containerID).Execute() if err != nil { - return nil, fmt.Errorf("failed to get container for gcp status %s: %w", containerID, err) + return nil, fmt.Errorf("failed to get container %s: %w", containerID, err) } return fromAtlasContainer(container), nil } -func (np *networkPeeringService) ListContainers(ctx context.Context, projectID, providerName string) ([]ProviderContainer, error) { - results := []ProviderContainer{} - pageNum := 1 - listOpts := &admin.ListPeeringContainerByCloudProviderApiParams{ - GroupId: projectID, - ProviderName: pointer.SetOrNil(providerName, ""), - PageNum: &pageNum, - ItemsPerPage: pointer.MakePtr(pageSize), - IncludeCount: pointer.MakePtr(false), - } - for { - page, _, err := np.peeringAPI.ListPeeringContainerByCloudProviderWithParams(ctx, listOpts).Execute() - if err != nil { - return nil, fmt.Errorf("failed to list containers: %w", err) - } - results = append(results, fromAtlasContainerList(page.GetResults())...) - if len(page.GetResults()) < pageSize { - return results, nil - } - pageNum += 1 +func (np *networkPeeringService) UpdateContainer(ctx context.Context, projectID string, container *ProviderContainer) error { + containerUpdate := toAtlasContainer(container) + containerUpdate.Id = nil + _, _, err := np.peeringAPI.UpdatePeeringContainer(ctx, projectID, container.ID, containerUpdate).Execute() + if err != nil { + return fmt.Errorf("failed to get container %s: %w", container.ID, err) } + return nil } func (np *networkPeeringService) DeleteContainer(ctx context.Context, projectID, containerID string) error { _, _, err := np.peeringAPI.DeletePeeringContainer(ctx, projectID, containerID).Execute() if admin.IsErrorCode(err, "CLOUD_PROVIDER_CONTAINER_NOT_FOUND") { - return nil + return ErrNotFound } if err != nil { return fmt.Errorf("failed to delete container: %w", err) diff --git a/test/contract/networkpeering/__debug_bin1495666336 b/test/contract/networkpeering/__debug_bin1495666336 new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/contract/networkpeering/networkpeering_test.go b/test/contract/networkpeering/networkpeering_test.go index 74d2e919f1..7a845d042d 100644 --- a/test/contract/networkpeering/networkpeering_test.go +++ b/test/contract/networkpeering/networkpeering_test.go @@ -59,13 +59,6 @@ func TestPeerContainerServiceCRUD(t *testing.T) { createdContainer = newContainer }) - t.Run(fmt.Sprintf("list %s containers", tc.provider), func(t *testing.T) { - containers, err := cs.ListContainers(ctx, testProjectID, tc.container.Provider) - require.NoError(t, err) - assert.NotEmpty(t, containers) - assert.GreaterOrEqual(t, len(containers), 1) - }) - t.Run(fmt.Sprintf("get %s container", tc.provider), func(t *testing.T) { container, err := cs.GetContainer(ctx, testProjectID, createdContainer.ID) require.NoError(t, err) @@ -75,6 +68,11 @@ func TestPeerContainerServiceCRUD(t *testing.T) { assert.Equal(t, tc.container.AtlasCIDRBlock, container.AtlasCIDRBlock) }) + t.Run(fmt.Sprintf("update %s container", tc.provider), func(t *testing.T) { + err := cs.UpdateContainer(ctx, testProjectID, tc.container) + require.NoError(t, err) + }) + t.Run(fmt.Sprintf("delete %s container", tc.provider), func(t *testing.T) { time.Sleep(time.Second) // Atlas may reject removal if it happened within a second of creation assert.NoErrorf(t, ignoreRemoved(cs.DeleteContainer(ctx, testProjectID, createdContainer.ID)), @@ -164,11 +162,9 @@ func TestPeerServiceCRUD(t *testing.T) { assert.Equal(t, createdPeer, peer) }) - t.Run(fmt.Sprintf("list %s peer connections", tc.provider), func(t *testing.T) { - peers, err := ps.ListPeers(ctx, testProjectID) + t.Run(fmt.Sprintf("update %s peer connections", tc.provider), func(t *testing.T) { + err := ps.UpdatePeer(ctx, testProjectID, createdPeer) require.NoError(t, err) - assert.NotEmpty(t, peers) - assert.GreaterOrEqual(t, len(peers), 1) }) t.Run(fmt.Sprintf("delete %s peer connection", tc.provider), func(t *testing.T) { diff --git a/test/contract/networkpeering/story_test.go b/test/contract/networkpeering/story_test.go new file mode 100644 index 0000000000..dfc6195f44 --- /dev/null +++ b/test/contract/networkpeering/story_test.go @@ -0,0 +1,363 @@ +package networkpeering + +import ( + "context" + "fmt" + "log" + "os" + "reflect" + "strings" + "testing" + "time" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/networkpeering" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/cloud/aws" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/atlas-sdk/v20231115008/admin" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1/provider" +) + +const ( + testProject = "manual-test-project" +) + +type netPeerRequest struct { + nps networkpeering.NetworkPeeringService + projectID string + peer *networkpeering.NetworkPeer + container *networkpeering.ProviderContainer + deleted bool +} + +func TestNetworkPeeringStory(t *testing.T) { + ctx := context.Background() + ac := mustCreateVersionedAtlasClient(ctx) + projects, _, err := ac.ProjectsApi.ListProjects(ctx).Execute() + require.NoError(t, err) + projectID := "" + for _, project := range *projects.Results { + if project.Name == testProject { + projectID = project.GetId() + } + } + require.NotEqual(t, "", projectID) + fmt.Printf("Project: %q -> %s\n", testProject, projectID) + + provider := "AWS" + vpcCIDR := "10.11.0.0/21" + containerCIDR := "10.10.0.0/21" + regionName := "US_EAST_1" + containerID := detectContainerID(t, ctx, ac, projectID, containerCIDR) + log.Printf("containerID (for %s) -> %s", containerCIDR, containerID) + peerID := detectPeerID(t, ctx, ac, projectID, vpcCIDR) + log.Printf("peerID (for %s) -> %s", vpcCIDR, peerID) + + awsRegionName := aws.RegionCode(regionName) + awsVPCid := detectAWSVPCId(t, testVPCName, vpcCIDR, awsRegionName) + defer aws.DeleteVPC(awsVPCid, awsRegionName) + + samplePeer := &networkpeering.NetworkPeer{ + AtlasNetworkPeeringConfig: akov2.AtlasNetworkPeeringConfig{ + Provider: provider, + ContainerID: containerID, + AWSConfiguration: &akov2.AWSNetworkPeeringConfiguration{ + AccepterRegionName: awsRegionName, // This must be AWS format eg. us-east-1 + AWSAccountID: os.Getenv("AWS_ACCOUNT_ID"), + RouteTableCIDRBlock: vpcCIDR, + VpcID: awsVPCid, + }, + }, + ID: peerID, + } + container := &networkpeering.ProviderContainer{ + AtlasProviderContainerConfig: akov2.AtlasProviderContainerConfig{ + ContainerRegion: regionName, + AtlasCIDRBlock: containerCIDR, + }, + ID: containerID, + Provider: provider, + } + req := netPeerRequest{ + nps: networkpeering.NewNetworkPeeringService(ac.NetworkPeeringApi), + projectID: projectID, + peer: samplePeer, + container: container, + deleted: false, + } + + timeout := 6 * time.Minute + start := time.Now() + acceptedConn := false + for time.Since(start) < timeout { + require.NoError(t, reconcileNetPeering(ctx, &req)) + log.Printf("PASS: network peering container: %v", req.peer) + log.Printf("PASS: network peering summary: %s", req.peer.Summary()) + log.Printf("PASS: connection id: %q", req.peer.AWSStatus.ConnectionID) + + if req.peer.AWSStatus.ConnectionID != "" && !acceptedConn { + log.Printf("Accepting VPC connection %s", req.peer.AWSStatus.ConnectionID) + err, disconnect := aws.AcceptVPCConnection( + req.peer.AWSStatus.ConnectionID, req.peer.AWSConfiguration.AccepterRegionName) + require.NoError(t, err) + defer disconnect() + log.Printf("Accepted VPC connection %s", req.peer.AWSStatus.ConnectionID) + acceptedConn = true + } + if req.peer.Provisioned() { + break + } + time.Sleep(time.Second) + } + assert.True(t, req.peer.Provisioned()) + + log.Printf("Deleting peer connection %s now...", req.peer.ID) + req.deleted = true + timeout = time.Minute + start = time.Now() + for time.Since(start) < timeout { + err := reconcileNetPeering(ctx, &req) + // ignore container deletion issue, shoudl eventually delete + if !admin.IsErrorCode(err, "CONTAINERS_IN_USE") { + require.NoError(t, err) + } + log.Printf("PASS: network peering container: %v", req.peer) + log.Printf("PASS: network peering summary: %s", req.peer.Summary()) + log.Printf("PASS: connection id: %q", req.peer.AWSStatus.ConnectionID) + + if req.peer.ID == "" && req.peer.ContainerID == "" { + break + } + time.Sleep(time.Second) + } + assert.Empty(t, req.peer.ContainerID) + assert.Empty(t, req.peer.ID) + log.Printf("Story Test completed for AWS: Network Peering Connection created and deleted succesfully") +} + +func reconcileNetPeering(ctx context.Context, req *netPeerRequest) error { + var atlasPeer *networkpeering.NetworkPeer + if req.peer.ID != "" { + peer, err := req.nps.GetPeer(ctx, req.projectID, req.peer.ID) + if err != nil && err != networkpeering.ErrNotFound { + return fmt.Errorf("failed to get peer from project %s and container %s: %w", req.projectID, peer.ContainerID, err) + } + atlasPeer = peer + } + inAtlas := atlasPeer != nil + switch { + case !req.deleted && !inAtlas: + return createNetPeering(ctx, req) + case !req.deleted && inAtlas: + return syncNetPeering(ctx, req, atlasPeer) + case req.deleted && !inAtlas: + return unmanageNetPeer(ctx, req) + case req.deleted && inAtlas: + return deleteNetPeer(ctx, req) + } + return nil +} + +func createNetPeering(ctx context.Context, req *netPeerRequest) error { + if err := ensureContainer(ctx, req); err != nil { + return fmt.Errorf("failed to ensure network container: %w", err) + } + newPeer, err := req.nps.CreatePeer(ctx, req.projectID, req.peer) + if err != nil { + return fmt.Errorf("failed to create peer connection: %w", err) + } + log.Printf("created network peering connection ID=%q: %v", newPeer.ID, newPeer) + req.peer.ID = newPeer.ID + setStatus(req.peer, newPeer) + return nil +} + +func syncNetPeering(ctx context.Context, req *netPeerRequest, atlasPeer *networkpeering.NetworkPeer) error { + switch { + case atlasPeer.Failed(): // clear the network peero connection to try again if failed + if err := req.nps.DeletePeer(ctx, req.projectID, atlasPeer.ID); err != nil { + return fmt.Errorf("failed to clear failed peer connection %s: %w", atlasPeer.ID, err) + } + return nil + case !atlasPeer.Provisioned(): + log.Printf("peer connection in pending state: %s", atlasPeer.Status) + } + + setStatus(req.peer, atlasPeer) + if compareConfigs(req.peer, atlasPeer) { + log.Printf("spec in sync with Atlas: idle state") + return nil + } + log.Printf("Atlas peer connection differs from spec, re-configuring...") + if err := req.nps.UpdatePeer(ctx, req.projectID, req.peer); err != nil { + return fmt.Errorf("failed to update peer connection %s: %w", req.peer.ID, err) + } + return nil +} + +func unmanageNetPeer(ctx context.Context, req *netPeerRequest) error { + req.peer.ID = "" + if err := ensureContainer(ctx, req); err != nil { + return fmt.Errorf("failed to clear container %s: %w", req.peer.ContainerID, err) + } + return nil +} + +func deleteNetPeer(ctx context.Context, req *netPeerRequest) error { + if req.peer.ID != "" { + if err := req.nps.DeletePeer(ctx, req.projectID, req.peer.ID); err != nil { + return fmt.Errorf("failed to delete peer connection %s: %v", req.peer.ID, err) + } + } + return unmanageNetPeer(ctx, req) +} + +func ensureContainer(ctx context.Context, req *netPeerRequest) error { + var atlasContainer *networkpeering.ProviderContainer + if req.peer.ContainerID != "" { + container, err := req.nps.GetContainer(ctx, req.projectID, req.peer.ContainerID) + if err != nil { + return fmt.Errorf("failed to get container from project %s and container %s: %w", + req.projectID, req.peer.ContainerID, err) + } + atlasContainer = container + } + inAtlas := atlasContainer != nil + switch { + case !req.deleted && !inAtlas: + return createNetContainer(ctx, req) + case !req.deleted && inAtlas: + return syncNetContainer(ctx, req) + case req.deleted && !inAtlas: + return unmanageContainer(req) + case req.deleted && inAtlas: + return deleteContainer(ctx, req) + default: + return fmt.Errorf("unsupported state deleted=%v inAtlas=%v", req.deleted, inAtlas) + } +} + +func createNetContainer(ctx context.Context, req *netPeerRequest) error { + createdContainer, err := req.nps.CreateContainer(ctx, req.projectID, req.container) + log.Printf("created container: %v", createdContainer) + req.peer.ContainerID = createdContainer.ID // record in peer + return err +} + +func syncNetContainer(ctx context.Context, req *netPeerRequest) error { + atlasContainer, err := req.nps.GetContainer(ctx, req.projectID, req.peer.ContainerID) + if err != nil { + return fmt.Errorf("failed to get container %v from atlas for comparison: %w", req.peer.ContainerID, err) + } + if reflect.DeepEqual(req.container, atlasContainer) { + log.Printf("container %s created as expected", atlasContainer.ID) + return nil + } + if err := req.nps.UpdateContainer(ctx, req.projectID, req.container); err != nil { + return fmt.Errorf("failed to update container %s: %w", req.container.ID, err) + } + return nil +} + +func unmanageContainer(req *netPeerRequest) error { + req.peer.ContainerID = "" + return nil +} + +func deleteContainer(ctx context.Context, req *netPeerRequest) error { + err := req.nps.DeleteContainer(ctx, req.projectID, req.peer.ContainerID) + if err == nil || err == networkpeering.ErrNotFound { + req.peer.ContainerID = "" + return unmanageContainer(req) + } + return fmt.Errorf("failed to delete container %s: %w", req.peer.ContainerID, err) +} + +func compareConfigs(a, b *networkpeering.NetworkPeer) bool { + switch a.Provider { + case string(provider.ProviderAWS): + // API sends the AWS accepter region when it matches the container region + cfg := b.AWSConfiguration + if b.AWSConfiguration.AccepterRegionName == "" { + cfg = b.AWSConfiguration.DeepCopy() + cfg.AccepterRegionName = a.AWSConfiguration.AccepterRegionName + } + return reflect.DeepEqual(a.AWSConfiguration, cfg) + default: + panic("unsupported") + } +} + +func setStatus(dst, src *networkpeering.NetworkPeer) { + dst.Status = src.Status + dst.ErrorMessage = src.ErrorMessage + switch src.Provider { + case string(provider.ProviderAWS): + dst.AWSStatus.ConnectionID = src.AWSStatus.ConnectionID + dst.AWSStatus.VpcID = src.AWSStatus.VpcID + default: + panic("unsupported") + } +} + +func mustCreateVersionedAtlasClient(ctx context.Context) *admin.APIClient { + client, err := newVersionedClient(ctx) + if err != nil { + panic(fmt.Sprintf("Failed to create an Atlas versioned client: %v", err)) + } + return client +} + +func newVersionedClient(ctx context.Context) (*admin.APIClient, error) { + domain := os.Getenv("MCLI_OPS_MANAGER_URL") + pubKey := os.Getenv("MCLI_PUBLIC_API_KEY") + prvKey := os.Getenv("MCLI_PRIVATE_API_KEY") + client, err := atlas.NewClient(domain, pubKey, prvKey) + if err != nil { + return nil, fmt.Errorf("failed to setup Atlas Client: %w", err) + } + _, _, err = client.ProjectsApi.ListProjects(ctx).Execute() + if err != nil { + return nil, fmt.Errorf("non working Atlas Client: %w", err) + } + return client, err +} + +func detectContainerID(t *testing.T, ctx context.Context, ac *admin.APIClient, projectID, containerCIDR string) string { + containers, _, err := ac.NetworkPeeringApi.ListPeeringContainers(ctx, projectID).Execute() + require.NoError(t, err) + for _, container := range containers.GetResults() { + if container.GetAtlasCidrBlock() == containerCIDR { + return container.GetId() + } + } + return "" +} + +func detectPeerID(t *testing.T, ctx context.Context, ac *admin.APIClient, projectID, containerCIDR string) string { + conns, _, err := ac.NetworkPeeringApi.ListPeeringConnections(ctx, projectID).Execute() + require.NoError(t, err) + for _, conn := range conns.GetResults() { + if conn.GetRouteTableCidrBlock() == containerCIDR { + return conn.GetId() + } + } + return "" +} + +func detectAWSVPCId(t *testing.T, vpcName, vpcCIDR, awsRegionName string) string { + vpcs, err := aws.ListVPCs(awsRegionName, aws.Filter{Name: "cidr", Values: []string{vpcCIDR}}) + require.NoError(t, err) + for _, vpc := range vpcs { + if strings.Contains(vpc.Name, vpcName) { + return vpc.ID + } + } + vpcID, err := aws.CreateVPC(utils.RandomName(vpcName), vpcCIDR, awsRegionName) + require.NoError(t, err) + return vpcID +} diff --git a/test/helper/cloud/aws/vpc.go b/test/helper/cloud/aws/vpc.go index 6cb2b254e6..19f268cdfc 100644 --- a/test/helper/cloud/aws/vpc.go +++ b/test/helper/cloud/aws/vpc.go @@ -2,11 +2,67 @@ package aws import ( "fmt" + "log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" ) +type Filter struct { + Name string + Values []string +} + +type VPCSummary struct { + ID string + Name string + CIDRBlock string +} + +func ListVPCs(region string, filters ...Filter) ([]VPCSummary, error) { + awsSession, err := newSession(region) + if err != nil { + return nil, fmt.Errorf("failed to create an AWS session: %w", err) + } + ec2Client := ec2.New(awsSession) + vpcsDump, err := ec2Client.DescribeVpcs(&ec2.DescribeVpcsInput{Filters: converFilters(filters)}) + if err != nil { + return nil, fmt.Errorf("failed to list AWS VPCs: %w", err) + } + vpcs := []VPCSummary{} + for _, vpc := range vpcsDump.Vpcs { + name := "" + for _, tag := range vpc.Tags { + if pointer.GetOrDefault(tag.Key, "") == "Name" { + name = pointer.GetOrDefault(tag.Value, "") + break + } + } + vpcs = append(vpcs, VPCSummary{ + ID: pointer.GetOrDefault(vpc.VpcId, ""), + Name: name, + CIDRBlock: pointer.GetOrDefault(vpc.CidrBlock, ""), + }) + } + return vpcs, nil +} + +func converFilters(filters []Filter) []*ec2.Filter { + ec2Filters := []*ec2.Filter{} + for _, filter := range filters { + valuePtrs := []*string{} + for _, value := range filter.Values { + valuePtrs = append(valuePtrs, &value) + } + ec2Filters = append(ec2Filters, &ec2.Filter{ + Name: &filter.Name, + Values: valuePtrs, + }) + } + return ec2Filters +} + func CreateVPC(name, cidr, region string) (string, error) { awsSession, err := newSession(region) if err != nil { @@ -52,3 +108,33 @@ func DeleteVPC(vpcID, region string) error { }) return err } + +type disconnectFunc func() + +func AcceptVPCConnection(connectionID, region string) (error, disconnectFunc) { + awsSession, err := newSession(region) + if err != nil { + return fmt.Errorf("failed to get AWS session: %w", err), nil + } + ec2Client := ec2.New(awsSession) + _, err = ec2Client.AcceptVpcPeeringConnection( + &ec2.AcceptVpcPeeringConnectionInput{ + VpcPeeringConnectionId: aws.String(connectionID), + }, + ) + if err != nil { + log.Fatalf("failed to accept AWS VPC connection: %v", err) + } + disconnectFn := func() { + _, err = ec2Client.DeleteVpcPeeringConnection( + &ec2.DeleteVpcPeeringConnectionInput{ + VpcPeeringConnectionId: aws.String(connectionID), + }, + ) + if err != nil { + log.Fatalf("failed to disconnect AWS VPC connection: %v", err) + } + } + + return nil, disconnectFn +}