Skip to content

Commit

Permalink
Draft state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
josvazg committed Jan 16, 2025
1 parent 4f482c1 commit 95bae68
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 65 deletions.
10 changes: 9 additions & 1 deletion config/crd/bases/atlas.mongodb.com_atlasnetworkpeerings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,23 @@ spec:
description: ContainerID records the ID of the container created by
atlas for this peering
type: string
error:
description: Error refers to the last error seen in the network peering
setup
type: string
id:
description: ID recrods the identified of thr peer created by Atlas
description: ID recrods the identified of the peer created by Atlas
type: string
observedGeneration:
description: |-
ObservedGeneration indicates the generation of the resource specification that the Atlas Operator is aware of.
The Atlas Operator updates this field to the 'metadata.generation' as soon as it starts reconciliation of the resource.
format: int64
type: integer
status:
description: Status describes the last status seen for the network
peering setup
type: string
required:
- conditions
type: object
Expand Down
38 changes: 37 additions & 1 deletion internal/translation/networkpeering/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,33 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer"
)

type AWSStatus struct {
ConnectionID string
VpcID string
}

type NetworkPeer struct {
akov2.AtlasNetworkPeeringConfig
ID string
ID string
Status string
ErrorMessage string
AWSStatus AWSStatus
}

func (np *NetworkPeer) Failed() bool {
return np.ErrorMessage != ""
}

// AWS status: INITIATING PENDING_ACCEPTANCE FINALIZING AVAILABLE

func (np *NetworkPeer) Provisioned() bool {
return (np.Provider == string(provider.ProviderAWS) && np.Status == "AVAILABLE") ||
(np.Provider == string(provider.ProviderAzure) && np.Status == "azure ok status") ||
(np.Provider == string(provider.ProviderGCP) && np.Status == "gcp ok status")
}

func (np *NetworkPeer) Summary() string {
return fmt.Sprintf("status=%q error=%q", np.Status, np.ErrorMessage)
}

func NewNetworkPeer(id string, cfg *akov2.AtlasNetworkPeeringConfig) *NetworkPeer {
Expand Down Expand Up @@ -92,6 +116,13 @@ func fromAtlasConnection(conn *admin.BaseNetworkPeeringConnectionSettings) (*Net
VpcID: conn.GetVpcId(),
},
},
// Status and error messages
Status: conn.GetStatusName(),
ErrorMessage: conn.GetErrorStateName(),
AWSStatus: AWSStatus{
ConnectionID: conn.GetConnectionId(),
VpcID: conn.GetVpcId(),
},
}, nil
case provider.ProviderGCP:
return &NetworkPeer{
Expand All @@ -104,6 +135,8 @@ func fromAtlasConnection(conn *admin.BaseNetworkPeeringConnectionSettings) (*Net
NetworkName: conn.GetNetworkName(),
},
},
// Status and error messages
ErrorMessage: conn.GetErrorMessage(),
}, nil
case provider.ProviderAzure:
return &NetworkPeer{
Expand All @@ -118,6 +151,9 @@ func fromAtlasConnection(conn *admin.BaseNetworkPeeringConnectionSettings) (*Net
VNetName: conn.GetVnetName(),
},
},
// Status and error messages
Status: conn.GetStatus(),
ErrorMessage: conn.GetErrorState(),
}, nil
default:
return nil, fmt.Errorf("unsupported provider %q", conn.GetProviderName())
Expand Down
79 changes: 27 additions & 52 deletions internal/translation/networkpeering/networkpeering.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,32 @@ package networkpeering

import (
"context"
"errors"
"fmt"
"net/http"

"go.mongodb.org/atlas-sdk/v20231115008/admin"

"github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1/provider"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/paging"
)

const (
pageSize = 100
)

var (
// ErrNotFound means an resource is missing
ErrNotFound = errors.New("not found")
)

type PeerConnectionsService interface {
CreatePeer(ctx context.Context, projectID string, conn *NetworkPeer) (*NetworkPeer, error)
ListPeers(ctx context.Context, projectID string) ([]NetworkPeer, error)
GetPeer(ctx context.Context, projectID, containerID string) (*NetworkPeer, error)
UpdatePeer(ctx context.Context, projectID string, conn *NetworkPeer) error
DeletePeer(ctx context.Context, projectID, containerID string) error
}

type PeeringContainerService interface {
CreateContainer(ctx context.Context, projectID string, container *ProviderContainer) (*ProviderContainer, error)
GetContainer(ctx context.Context, projectID, containerID string) (*ProviderContainer, error)
ListContainers(ctx context.Context, projectID, providerName string) ([]ProviderContainer, error)
UpdateContainer(ctx context.Context, projectID string, container *ProviderContainer) error
DeleteContainer(ctx context.Context, projectID, containerID string) error
}

Expand Down Expand Up @@ -62,6 +63,9 @@ func (np *networkPeeringService) CreatePeer(ctx context.Context, projectID strin
func (np *networkPeeringService) GetPeer(ctx context.Context, projectID, containerID string) (*NetworkPeer, error) {
atlasConn, _, err := np.peeringAPI.GetPeeringConnection(ctx, projectID, containerID).Execute()
if err != nil {
if admin.IsErrorCode(err, "PEER_NOT_FOUND") {
return nil, ErrNotFound
}
return nil, fmt.Errorf("failed to get network peer for container id %v: %w", containerID, err)
}
peer, err := fromAtlasConnection(atlasConn)
Expand All @@ -71,32 +75,16 @@ func (np *networkPeeringService) GetPeer(ctx context.Context, projectID, contain
return peer, nil
}

func (np *networkPeeringService) ListPeers(ctx context.Context, projectID string) ([]NetworkPeer, error) {
var peersList []NetworkPeer
providers := []provider.ProviderName{provider.ProviderAWS, provider.ProviderAzure, provider.ProviderGCP}
for _, providerName := range providers {
peers, err := np.listPeersForProvider(ctx, projectID, providerName)
if err != nil {
return nil, fmt.Errorf("failed to list network peers for %s: %w", string(providerName), err)
}
peersList = append(peersList, peers...)
func (np *networkPeeringService) UpdatePeer(ctx context.Context, projectID string, conn *NetworkPeer) error {
atlasConnRequest, err := toAtlasConnection(conn)
if err != nil {
return fmt.Errorf("failed to convert updated peer to Atlas: %w", err)
}
return peersList, nil
}

func (np *networkPeeringService) listPeersForProvider(ctx context.Context, projectID string, providerName provider.ProviderName) ([]NetworkPeer, error) {
results, err := paging.ListAll(ctx, func(ctx context.Context, pageNum int) (paging.Response[admin.BaseNetworkPeeringConnectionSettings], *http.Response, error) {
p := &admin.ListPeeringConnectionsApiParams{
GroupId: projectID,
ProviderName: admin.PtrString(string(providerName)),
}
return np.peeringAPI.ListPeeringConnectionsWithParams(ctx, p).PageNum(pageNum).Execute()
})
_, _, err = np.peeringAPI.UpdatePeeringConnection(ctx, projectID, conn.ID, atlasConnRequest).Execute()
if err != nil {
return nil, fmt.Errorf("failed to list network peers: %w", err)
return fmt.Errorf("failed to create network peer %v: %w", conn, err)
}

return fromAtlasConnectionList(results)
return nil
}

func (np *networkPeeringService) DeletePeer(ctx context.Context, projectID, peerID string) error {
Expand All @@ -118,38 +106,25 @@ func (np *networkPeeringService) CreateContainer(ctx context.Context, projectID
func (np *networkPeeringService) GetContainer(ctx context.Context, projectID, containerID string) (*ProviderContainer, error) {
container, _, err := np.peeringAPI.GetPeeringContainer(ctx, projectID, containerID).Execute()
if err != nil {
return nil, fmt.Errorf("failed to get container for gcp status %s: %w", containerID, err)
return nil, fmt.Errorf("failed to get container %s: %w", containerID, err)
}
return fromAtlasContainer(container), nil
}

func (np *networkPeeringService) ListContainers(ctx context.Context, projectID, providerName string) ([]ProviderContainer, error) {
results := []ProviderContainer{}
pageNum := 1
listOpts := &admin.ListPeeringContainerByCloudProviderApiParams{
GroupId: projectID,
ProviderName: pointer.SetOrNil(providerName, ""),
PageNum: &pageNum,
ItemsPerPage: pointer.MakePtr(pageSize),
IncludeCount: pointer.MakePtr(false),
}
for {
page, _, err := np.peeringAPI.ListPeeringContainerByCloudProviderWithParams(ctx, listOpts).Execute()
if err != nil {
return nil, fmt.Errorf("failed to list containers: %w", err)
}
results = append(results, fromAtlasContainerList(page.GetResults())...)
if len(page.GetResults()) < pageSize {
return results, nil
}
pageNum += 1
func (np *networkPeeringService) UpdateContainer(ctx context.Context, projectID string, container *ProviderContainer) error {
containerUpdate := toAtlasContainer(container)
containerUpdate.Id = nil
_, _, err := np.peeringAPI.UpdatePeeringContainer(ctx, projectID, container.ID, containerUpdate).Execute()
if err != nil {
return fmt.Errorf("failed to get container %s: %w", container.ID, err)
}
return nil
}

func (np *networkPeeringService) DeleteContainer(ctx context.Context, projectID, containerID string) error {
_, _, err := np.peeringAPI.DeletePeeringContainer(ctx, projectID, containerID).Execute()
if admin.IsErrorCode(err, "CLOUD_PROVIDER_CONTAINER_NOT_FOUND") {
return nil
return ErrNotFound
}
if err != nil {
return fmt.Errorf("failed to delete container: %w", err)
Expand Down
Empty file.
18 changes: 7 additions & 11 deletions test/contract/networkpeering/networkpeering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ func TestPeerContainerServiceCRUD(t *testing.T) {
createdContainer = newContainer
})

t.Run(fmt.Sprintf("list %s containers", tc.provider), func(t *testing.T) {
containers, err := cs.ListContainers(ctx, testProjectID, tc.container.Provider)
require.NoError(t, err)
assert.NotEmpty(t, containers)
assert.GreaterOrEqual(t, len(containers), 1)
})

t.Run(fmt.Sprintf("get %s container", tc.provider), func(t *testing.T) {
container, err := cs.GetContainer(ctx, testProjectID, createdContainer.ID)
require.NoError(t, err)
Expand All @@ -75,6 +68,11 @@ func TestPeerContainerServiceCRUD(t *testing.T) {
assert.Equal(t, tc.container.AtlasCIDRBlock, container.AtlasCIDRBlock)
})

t.Run(fmt.Sprintf("update %s container", tc.provider), func(t *testing.T) {
err := cs.UpdateContainer(ctx, testProjectID, tc.container)
require.NoError(t, err)
})

t.Run(fmt.Sprintf("delete %s container", tc.provider), func(t *testing.T) {
time.Sleep(time.Second) // Atlas may reject removal if it happened within a second of creation
assert.NoErrorf(t, ignoreRemoved(cs.DeleteContainer(ctx, testProjectID, createdContainer.ID)),
Expand Down Expand Up @@ -164,11 +162,9 @@ func TestPeerServiceCRUD(t *testing.T) {
assert.Equal(t, createdPeer, peer)
})

t.Run(fmt.Sprintf("list %s peer connections", tc.provider), func(t *testing.T) {
peers, err := ps.ListPeers(ctx, testProjectID)
t.Run(fmt.Sprintf("update %s peer connections", tc.provider), func(t *testing.T) {
err := ps.UpdatePeer(ctx, testProjectID, createdPeer)
require.NoError(t, err)
assert.NotEmpty(t, peers)
assert.GreaterOrEqual(t, len(peers), 1)
})

t.Run(fmt.Sprintf("delete %s peer connection", tc.provider), func(t *testing.T) {
Expand Down
Loading

0 comments on commit 95bae68

Please sign in to comment.