Skip to content

Commit

Permalink
⭐️ refactor providers code (#3242)
Browse files Browse the repository at this point in the history
* refactor providers code
* make sure AddRuntime uses a runtime builder function
* fix k8s tests
* fix runtime
* migrate arista provider
* migrate atlassian provider
* migrate aws provider
* migrate azure provider
* migrate core provider
* migrate equinix provider
* migrate gcp provider
* migrate github provider
* migrate gitlab provider
* migrate google-workspace provider
* migrate ipmi provider
* migrate ms365 provider
* migrate network provider
* migrate oci provider
* migrate okta provider
* migrate opcua provider
* migrate os provider
* migrate slack provider
* migrate terraform provider
* migrate vcd provider
* migrate vsphere provider
* call Disconnect on runtime close
* fix broken test
* fix tests
* fix deadlock
* properly call disconnect
* add tests for service
* fix terraform provider connection ids
* fix more provider connection ids

---------

Signed-off-by: Ivan Milchev <[email protected]>
  • Loading branch information
imilchev authored Feb 11, 2024
1 parent e66bb43 commit c32aa49
Show file tree
Hide file tree
Showing 107 changed files with 1,652 additions and 3,139 deletions.
8 changes: 8 additions & 0 deletions providers-sdk/v1/plugin/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (m *GRPCClient) Connect(req *ConnectReq, callback ProviderCallback) (*Conne
return m.client.Connect(context.Background(), req)
}

func (m *GRPCClient) Disconnect(req *DisconnectReq) (*DisconnectRes, error) {
return m.client.Disconnect(context.Background(), req)
}

func (m *GRPCClient) MockConnect(req *ConnectReq, callback ProviderCallback) (*ConnectRes, error) {
m.connect(req, callback)
return m.client.MockConnect(context.Background(), req)
Expand Down Expand Up @@ -100,6 +104,10 @@ func (m *GRPCServer) Connect(ctx context.Context, req *ConnectReq) (*ConnectRes,
return m.Impl.Connect(req, a)
}

func (m *GRPCServer) Disconnect(ctx context.Context, req *DisconnectReq) (*DisconnectRes, error) {
return m.Impl.Disconnect(req)
}

func (m *GRPCServer) MockConnect(ctx context.Context, req *ConnectReq) (*ConnectRes, error) {
conn, err := m.broker.Dial(req.CallbackServer)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions providers-sdk/v1/plugin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var PluginMap = map[string]plugin.Plugin{
"provider": &ProviderPluginImpl{},
}

type Closer interface {
Close()
}

type ProviderCallback interface {
Collect(req *DataRes) error
GetRecording(req *DataReq) (*ResourceData, error)
Expand All @@ -33,6 +37,7 @@ type ProviderPlugin interface {
Heartbeat(req *HeartbeatReq) (*HeartbeatRes, error)
ParseCLI(req *ParseCLIReq) (*ParseCLIRes, error)
Connect(req *ConnectReq, callback ProviderCallback) (*ConnectRes, error)
Disconnect(req *DisconnectReq) (*DisconnectRes, error)
MockConnect(req *ConnectReq, callback ProviderCallback) (*ConnectRes, error)
Shutdown(req *ShutdownReq) (*ShutdownRes, error)
GetData(req *DataReq) (*DataRes, error)
Expand Down
302 changes: 212 additions & 90 deletions providers-sdk/v1/plugin/plugin.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions providers-sdk/v1/plugin/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,19 @@ message HeartbeatRes {

}

message DisconnectReq {
uint32 connection = 1;
}

message DisconnectRes {

}

service ProviderPlugin {
rpc Heartbeat(HeartbeatReq) returns (HeartbeatRes);
rpc ParseCLI(ParseCLIReq) returns (ParseCLIRes);
rpc Connect(ConnectReq) returns (ConnectRes);
rpc Disconnect(DisconnectReq) returns (DisconnectRes);
rpc MockConnect(ConnectReq) returns (ConnectRes);
rpc Shutdown(ShutdownReq) returns (ShutdownRes);
rpc GetData(DataReq) returns (DataRes);
Expand Down
37 changes: 37 additions & 0 deletions providers-sdk/v1/plugin/plugin_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions providers-sdk/v1/plugin/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@ type Runtime struct {
Callback ProviderCallback
HasRecording bool
CreateResource CreateNamedResource
NewResource NewResource
GetData GetData
SetData SetData
Upstream *upstream.UpstreamClient
}

type Connection interface{}
type Connection interface {
ID() uint32
}

type CreateNamedResource func(runtime *Runtime, name string, args map[string]*llx.RawData) (Resource, error)
type (
CreateNamedResource func(runtime *Runtime, name string, args map[string]*llx.RawData) (Resource, error)
NewResource func(runtime *Runtime, name string, args map[string]*llx.RawData) (Resource, error)
GetData func(resource Resource, field string, args map[string]*llx.RawData) *DataRes
SetData func(resource Resource, field string, val *llx.RawData) error
)

type Resource interface {
MqlID() string
Expand Down
165 changes: 160 additions & 5 deletions providers-sdk/v1/plugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,178 @@ package plugin
import (
"errors"
"os"
"strconv"
"strings"
sync "sync"
"time"

llx "go.mondoo.com/cnquery/v10/llx"
)

type Service struct {
runtimes map[uint32]*Runtime
lastConnectionID uint32
runtimesLock sync.Mutex

lastHeartbeat int64
lock sync.Mutex
heartbeatLock sync.Mutex
}

func NewService() *Service {
return &Service{
runtimes: make(map[uint32]*Runtime),
}
}

var heartbeatRes HeartbeatRes

func (s *Service) AddRuntime(createRuntime func(connId uint32) (*Runtime, error)) (*Runtime, error) {
s.runtimesLock.Lock()
defer s.runtimesLock.Unlock()

s.lastConnectionID++
runtime, err := createRuntime(s.lastConnectionID)
if err != nil {
// If the runtime creation fails, revert the lastConnectionID
s.lastConnectionID--
return nil, err
}
s.runtimes[s.lastConnectionID] = runtime
return runtime, nil
}

func (s *Service) GetRuntime(id uint32) (*Runtime, error) {
s.runtimesLock.Lock()
defer s.runtimesLock.Unlock()
if runtime, ok := s.runtimes[id]; ok {
return runtime, nil
}
return nil, errors.New("connection " + strconv.FormatUint(uint64(id), 10) + " not found")
}

func (s *Service) Disconnect(req *DisconnectReq) (*DisconnectRes, error) {
s.runtimesLock.Lock()
defer s.runtimesLock.Unlock()
s.doDisconnect(req.Connection)
return &DisconnectRes{}, nil
}

// doDisconnect is a helper function to disconnect a runtime by its ID. It MUST be called
// with a lock on s.runtimesLock.
func (s *Service) doDisconnect(id uint32) {
if runtime, ok := s.runtimes[id]; ok {
// If the runtime implements the Closer interface, we need to call the
// Close function
if closer, ok := runtime.Connection.(Closer); ok {
closer.Close()
}
delete(s.runtimes, id)
}
}

func (s *Service) GetData(req *DataReq) (*DataRes, error) {
runtime, err := s.GetRuntime(req.Connection)
if err != nil {
return nil, err
}

args := PrimitiveArgsToRawDataArgs(req.Args, runtime)

if req.ResourceId == "" && req.Field == "" {
res, err := runtime.NewResource(runtime, req.Resource, args)
if err != nil {
return nil, err
}

rd := llx.ResourceData(res, res.MqlName()).Result()
return &DataRes{
Data: rd.Data,
}, nil
}

resource, ok := runtime.Resources.Get(req.Resource + "\x00" + req.ResourceId)
if !ok {
// Note: Since resources are internally always created, there are only very
// few cases where we arrive here:
// 1. The caller is wrong. Possibly a mixup with IDs
// 2. The resource was loaded from a recording, but the field is not
// in the recording. Thus the resource was never created inside the
// plugin. We will attempt to create the resource and see if the field
// can be computed.
if !runtime.HasRecording {
return nil, errors.New("resource '" + req.Resource + "' (id: " + req.ResourceId + ") doesn't exist")
}

args, err := runtime.ResourceFromRecording(req.Resource, req.ResourceId)
if err != nil {
return nil, errors.New("attempted to load resource '" + req.Resource + "' (id: " + req.ResourceId + ") from recording failed: " + err.Error())
}

resource, err = runtime.CreateResource(runtime, req.Resource, args)
if err != nil {
return nil, errors.New("attempted to create resource '" + req.Resource + "' (id: " + req.ResourceId + ") from recording failed: " + err.Error())
}
}

return runtime.GetData(resource, req.Field, args), nil
}

func (s *Service) StoreData(req *StoreReq) (*StoreRes, error) {
runtime, err := s.GetRuntime(req.Connection)
if err != nil {
return nil, err
}

var errs []string
for i := range req.Resources {
info := req.Resources[i]

args, err := ProtoArgsToRawDataArgs(info.Fields)
if err != nil {
errs = append(errs, "failed to add cached "+info.Name+" (id: "+info.Id+"), failed to parse arguments")
continue
}

resource, ok := runtime.Resources.Get(info.Name + "\x00" + info.Id)
if !ok {
resource, err = runtime.CreateResource(runtime, info.Name, args)
if err != nil {
errs = append(errs, "failed to add cached "+info.Name+" (id: "+info.Id+"), creation failed: "+err.Error())
continue
}

runtime.Resources.Set(info.Name+"\x00"+info.Id, resource)
}

for k, v := range args {
if err := runtime.SetData(resource, k, v); err != nil {
errs = append(errs, "failed to add cached "+info.Name+" (id: "+info.Id+"), field error: "+err.Error())
}
}
}

if len(errs) != 0 {
return nil, errors.New(strings.Join(errs, ", "))
}
return &StoreRes{}, nil
}

func (s *Service) Heartbeat(req *HeartbeatReq) (*HeartbeatRes, error) {
if req.Interval == 0 {
return nil, errors.New("heartbeat failed, requested interval is 0")
}

now := time.Now().UnixNano()
s.lock.Lock()
s.heartbeatLock.Lock()
s.lastHeartbeat = now
s.lock.Unlock()
s.heartbeatLock.Unlock()

go func() {
time.Sleep(time.Duration(req.Interval))

s.lock.Lock()
s.heartbeatLock.Lock()
isDead := s.lastHeartbeat == now
s.lock.Unlock()
s.heartbeatLock.Unlock()

if isDead {
os.Exit(1)
Expand All @@ -41,3 +186,13 @@ func (s *Service) Heartbeat(req *HeartbeatReq) (*HeartbeatRes, error) {

return &heartbeatRes, nil
}

func (s *Service) Shutdown(req *ShutdownReq) (*ShutdownRes, error) {
s.runtimesLock.Lock()
defer s.runtimesLock.Unlock()

for id := range s.runtimes {
s.doDisconnect(id)
}
return &ShutdownRes{}, nil
}
Loading

0 comments on commit c32aa49

Please sign in to comment.