-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proof of concept of a common plugin base #47972
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
package launcher | ||
|
||
import ( | ||
"context" | ||
"golang.org/x/sync/errgroup" | ||
"time" | ||
|
||
"github.com/gravitational/trace" | ||
"github.com/jonboulle/clockwork" | ||
|
||
"github.com/gravitational/teleport/api/client/proto" | ||
"github.com/gravitational/teleport/integrations/access/common/teleport" | ||
"github.com/gravitational/teleport/lib/utils" | ||
) | ||
|
||
const ( | ||
// minServerVersion is the minimal teleport version the plugin supports. | ||
minServerVersion = "6.1.0-beta.1" | ||
// InitTimeout is used to bound execution time of health check and teleport version check. | ||
initTimeout = time.Second * 10 | ||
) | ||
|
||
// Launcher is responsible for obtaining a teleport client and launching one or multiple services. | ||
type Launcher struct { | ||
PluginName string | ||
teleportClient teleport.Client | ||
Conf Config | ||
Clock clockwork.Clock | ||
Services []Service | ||
} | ||
|
||
// New creates a new Launcher and initialize its main job | ||
func New(conf Config, pluginName string) *Launcher { | ||
baseApp := Launcher{ | ||
PluginName: pluginName, | ||
Conf: conf, | ||
} | ||
return &baseApp | ||
} | ||
|
||
// Run initializes the launcher, gets a teleport client, check connectivity, and start all the configured services. | ||
func (a *Launcher) Run(ctx context.Context) error { | ||
// TODO: cancel context on signal | ||
err := a.init(ctx) | ||
if err != nil { | ||
return trace.Wrap(err, "failed initialization") | ||
} | ||
|
||
group, ctx := errgroup.WithContext(ctx) | ||
for _, service := range a.Services { | ||
group.Go(func() error { | ||
return service.Run(ctx, a.teleportClient, a.PluginName) | ||
}) | ||
} | ||
|
||
return group.Wait() | ||
Comment on lines
+51
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the expected behavior if any one of the services fails, causing this to return? Do we expect systemd/kubernetes to restart the plugin? |
||
} | ||
|
||
func (a *Launcher) init(ctx context.Context) error { | ||
ctx, cancel := context.WithTimeout(ctx, initTimeout) | ||
defer cancel() | ||
|
||
err := a.initTeleport(ctx) | ||
if err != nil { | ||
return trace.Wrap(err) | ||
} | ||
|
||
a.PluginName = a.Conf.PluginName() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need to be updated? |
||
|
||
// Check which service the plugin supports and run their preflight checks | ||
return nil | ||
} | ||
|
||
func (a *Launcher) checkTeleportVersion(ctx context.Context) (proto.PingResponse, error) { | ||
pong, err := a.teleportClient.Ping(ctx) | ||
if err != nil { | ||
if trace.IsNotImplemented(err) { | ||
return pong, trace.Wrap(err, "server version must be at least %s", minServerVersion) | ||
} | ||
return pong, trace.Wrap(err, "Unable to get Teleport server version") | ||
} | ||
err = utils.CheckMinVersion(pong.ServerVersion, minServerVersion) | ||
return pong, trace.Wrap(err) | ||
} | ||
Comment on lines
+74
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this still relevant? The minimum version this would be released in is v15 which seems highly unlikely to be running against a v6 control plane. |
||
|
||
// initTeleport creates a Teleport client and validates Teleport connectivity. | ||
func (a *Launcher) initTeleport(ctx context.Context) (err error) { | ||
clt, err := a.Conf.GetTeleportClient(ctx) | ||
if err != nil { | ||
return trace.Wrap(err) | ||
} | ||
|
||
a.teleportClient = clt | ||
_, err = a.checkTeleportVersion(ctx) | ||
if err != nil { | ||
return trace.Wrap(err) | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package launcher | ||
|
||
import ( | ||
"context" | ||
"github.com/gravitational/teleport/api/client" | ||
accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1" | ||
"github.com/gravitational/teleport/api/types/accesslist" | ||
"github.com/gravitational/teleport/integrations/access/common/teleport" | ||
"github.com/gravitational/teleport/integrations/lib" | ||
"github.com/gravitational/teleport/integrations/lib/logger" | ||
"github.com/gravitational/trace" | ||
) | ||
|
||
type Config interface { | ||
Services() []Service | ||
GetTeleportClient(ctx context.Context) (teleport.Client, error) | ||
PluginName() string | ||
} | ||
|
||
type Service interface { | ||
Run(ctx context.Context, clt teleport.Client, name string /* TODO add status sink */) error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does the name here convey? Why does it need to be given to the service? |
||
CheckHealth(ctx context.Context) error | ||
} | ||
|
||
// TODO: merge with integrations/lib/config.go | ||
|
||
// BaseFileConfig is a configuration you can embed to read the standard teleport fields in your | ||
// plugin file configs. | ||
type BaseFileConfig struct { | ||
Teleport lib.TeleportConfig `toml:"teleport"` | ||
Log logger.Config `toml:"log"` | ||
} | ||
|
||
// GetTeleportClient returns a Teleport plugin client for the given config. | ||
func (c BaseFileConfig) GetTeleportClient(ctx context.Context) (teleport.Client, error) { | ||
clt, err := c.Teleport.NewClient(ctx) | ||
if err != nil { | ||
return nil, trace.Wrap(err) | ||
} | ||
|
||
return wrapAPIClient(clt), nil | ||
} | ||
|
||
// TODO: check if we can get rid of this? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 - Can teleport.Client be updated to use the matching signature so this can be eliminated? |
||
|
||
// wrapAPIClient will wrap the API client such that it conforms to the Teleport plugin client interface. | ||
func wrapAPIClient(clt *client.Client) teleport.Client { | ||
return &wrappedClient{ | ||
Client: clt, | ||
} | ||
} | ||
|
||
type wrappedClient struct { | ||
*client.Client | ||
} | ||
|
||
func (w *wrappedClient) ListAccessLists(ctx context.Context, pageSize int, pageToken string) ([]*accesslist.AccessList, string, error) { | ||
return w.Client.AccessListClient().ListAccessLists(ctx, pageSize, pageToken) | ||
} | ||
|
||
// ListAccessMonitoringRulesWithFilter lists current access monitoring rules. | ||
func (w *wrappedClient) ListAccessMonitoringRulesWithFilter(ctx context.Context, pageSize int, pageToken string, subjects []string, notificationName string) ([]*accessmonitoringrulesv1.AccessMonitoringRule, string, error) { | ||
return w.Client.AccessMonitoringRulesClient().ListAccessMonitoringRulesWithFilter(ctx, pageSize, pageToken, subjects, notificationName) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package accesslist | ||
|
||
import ( | ||
"context" | ||
"github.com/gravitational/teleport/api/types/accesslist" | ||
"github.com/gravitational/teleport/integrations/lib/services/common" | ||
) | ||
|
||
type Notifier interface { | ||
// CheckHealth checks if the bot can connect to its messaging service | ||
CheckHealth(ctx context.Context) error | ||
// FetchRecipient fetches recipient data from the messaging service API. It can also be used to check and initialize | ||
// a communication channel (e.g. MsTeams needs to install the app for the user before being able to send | ||
// notifications) | ||
FetchRecipient(ctx context.Context, recipient string) (*common.Recipient, error) | ||
// SendReviewReminders will send a review reminder that an access list needs to be reviewed. | ||
SendReviewReminders(ctx context.Context, recipient common.Recipient, accessLists []*accesslist.AccessList) error | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we warn if there are no services configured?