Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add session recording access format to audit event #47307

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2434,10 +2434,11 @@ func (c *Client) DeleteAllNodes(ctx context.Context, namespace string) error {
}

// StreamSessionEvents streams audit events from a given session recording.
func (c *Client) StreamSessionEvents(ctx context.Context, sessionID string, startIndex int64) (chan events.AuditEvent, chan error) {
func (c *Client) StreamSessionEvents(ctx context.Context, sessionID string, startIndex int64, format string) (chan events.AuditEvent, chan error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about putting the format inside the context instead of an explicit parameter?

I don't love making format a required argument because the stream session events API doesn't know anything about formats - that all happens downstream. Adding a required argument for something that's only used for some event metadata feels wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with this (especially after seeing so many places requiring this parameter, which would be empty by default). I'll update it and make another PR (there will be fewer places to change).

request := &proto.StreamSessionEventsRequest{
SessionID: sessionID,
StartIndex: int32(startIndex),
Format: format,
}

ch := make(chan events.AuditEvent)
Expand Down
1,146 changes: 599 additions & 547 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,8 @@ message StreamSessionEventsRequest {
// StartIndex is the index of the event to resume the stream after.
// A StartIndex of 0 creates a new stream.
int32 StartIndex = 2;
// Format is the format the session recording was accessed.
string Format = 3;
}

// NodeLogin specifies an SSH node and OS login.
Expand Down
2 changes: 2 additions & 0 deletions api/proto/teleport/legacy/types/events/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5599,6 +5599,8 @@ message SessionRecordingAccess {
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// Format is the format the session recording was accessed.
string Format = 4 [(gogoproto.jsontag) = "format"];
}

// KubeClusterMetadata contains common kubernetes cluster information.
Expand Down
2,108 changes: 1,077 additions & 1,031 deletions api/types/events/events.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func streamSession(
sessionID string,
) (string, []apievents.AuditEvent) {
t.Helper()
evtCh, errCh := streamer.StreamSessionEvents(ctx, session.ID(sessionID), 0)
evtCh, errCh := streamer.StreamSessionEvents(ctx, session.ID(sessionID), 0, "" /* format */)
capturedStream := &bytes.Buffer{}
evts := make([]apievents.AuditEvent, 0)
readLoop:
Expand Down Expand Up @@ -1220,7 +1220,7 @@ func testLeafProxySessionRecording(t *testing.T, suite *integrationTestSuite) {
require.EventuallyWithT(t, func(t *assert.CollectT) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
eventsCh, errCh := authSrv.StreamSessionEvents(ctx, session.ID(sessionID), 0)
eventsCh, errCh := authSrv.StreamSessionEvents(ctx, session.ID(sessionID), 0, "" /* format */)
for {
select {
case err := <-errCh:
Expand Down Expand Up @@ -4970,7 +4970,7 @@ func testAuditOff(t *testing.T, suite *integrationTestSuite) {

// however, attempts to read the actual sessions should fail because it was
// not actually recorded
eventsCh, errCh := site.StreamSessionEvents(ctx, session.ID(tracker.GetSessionID()), 0)
eventsCh, errCh := site.StreamSessionEvents(ctx, session.ID(tracker.GetSessionID()), 0, "" /* format */)
err = nil
readLoop:
for {
Expand Down Expand Up @@ -7394,7 +7394,7 @@ outer:
time.Sleep(time.Second * 5)

receivedSession := make([]apievents.AuditEvent, 0)
sessionPlayback, e := api.StreamSessionEvents(ctx, sessionID, 0)
sessionPlayback, e := api.StreamSessionEvents(ctx, sessionID, 0, "" /* format */)

inner:
for {
Expand Down
4 changes: 2 additions & 2 deletions integrations/event-handler/teleport_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type TeleportSearchEventsClient interface {
// SearchEvents searches for events in the audit log and returns them using their protobuf representation.
SearchEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]events.AuditEvent, string, error)
// StreamSessionEvents returns session events stream for a given session ID using their protobuf representation.
StreamSessionEvents(ctx context.Context, sessionID string, startIndex int64) (chan events.AuditEvent, chan error)
StreamSessionEvents(ctx context.Context, sessionID string, startIndex int64, format string) (chan events.AuditEvent, chan error)
// SearchUnstructuredEvents searches for events in the audit log and returns them using an unstructured representation (structpb.Struct).
SearchUnstructuredEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]*auditlogpb.EventUnstructured, string, error)
// StreamUnstructuredSessionEvents returns session events stream for a given session ID using an unstructured representation (structpb.Struct).
Expand Down Expand Up @@ -401,7 +401,7 @@ func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent

// StreamSessionEvents returns session event stream, that's the simple delegate to an API function
func (t *TeleportEventsWatcher) StreamUnstructuredSessionEvents(ctx context.Context, id string, index int64) (chan *auditlogpb.EventUnstructured, chan error) {
return t.client.StreamUnstructuredSessionEvents(ctx, id, index)
return t.client.StreamUnstructuredSessionEvents(ctx, id, index, "" /* format */)
}

// UpsertLock upserts user lock
Expand Down
5 changes: 3 additions & 2 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -6086,7 +6086,7 @@ func (a *ServerWithRoles) ReplaceRemoteLocks(ctx context.Context, clusterName st
// StreamSessionEvents streams all events from a given session recording. An error is returned on the first
// channel if one is encountered. Otherwise the event channel is closed when the stream ends.
// The event channel is not closed on error to prevent race conditions in downstream select statements.
func (a *ServerWithRoles) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
func (a *ServerWithRoles) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64, format string) (chan apievents.AuditEvent, chan error) {
createErrorChannel := func(err error) (chan apievents.AuditEvent, chan error) {
e := make(chan error, 1)
e <- trace.Wrap(err)
Expand Down Expand Up @@ -6114,12 +6114,13 @@ func (a *ServerWithRoles) StreamSessionEvents(ctx context.Context, sessionID ses
},
SessionID: sessionID.String(),
UserMetadata: a.context.Identity.GetIdentity().GetUserMetadata(),
Format: format,
}); err != nil {
return createErrorChannel(err)
}
}

return a.alog.StreamSessionEvents(ctx, sessionID, startIndex)
return a.alog.StreamSessionEvents(ctx, sessionID, startIndex, format)
}

// CreateApp creates a new application resource.
Expand Down
12 changes: 7 additions & 5 deletions lib/auth/auth_with_roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ func TestStreamSessionEventsRBAC(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, errC := clt.StreamSessionEvents(ctx, "foo", 0)
_, errC := clt.StreamSessionEvents(ctx, "foo", 0, "" /* format */)
select {
case err := <-errC:
require.ErrorAs(t, err, new(*trace.AccessDeniedError))
Expand All @@ -2172,7 +2172,7 @@ func TestStreamSessionEvents_User(t *testing.T) {
require.NoError(t, err)

// ignore the response as we don't want the events or the error (the session will not exist)
_, _ = clt.StreamSessionEvents(ctx, "44c6cea8-362f-11ea-83aa-125400432324", 0)
_, _ = clt.StreamSessionEvents(ctx, "44c6cea8-362f-11ea-83aa-125400432324", 0, teleport.JSON)

// we need to wait for a short period to ensure the event is returned
time.Sleep(500 * time.Millisecond)
Expand All @@ -2188,6 +2188,7 @@ func TestStreamSessionEvents_User(t *testing.T) {

event := searchEvents[0].(*apievents.SessionRecordingAccess)
require.Equal(t, username, event.User)
require.Equal(t, teleport.JSON, event.Format)
}

// TestStreamSessionEvents_Builtin ensures that when a builtin role streams a session's events, it does not emit
Expand All @@ -2204,7 +2205,7 @@ func TestStreamSessionEvents_Builtin(t *testing.T) {
require.NoError(t, err)

// ignore the response as we don't want the events or the error (the session will not exist)
_, _ = clt.StreamSessionEvents(ctx, "44c6cea8-362f-11ea-83aa-125400432324", 0)
_, _ = clt.StreamSessionEvents(ctx, "44c6cea8-362f-11ea-83aa-125400432324", 0, "" /* format */)

// we need to wait for a short period to ensure the event is returned
time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -2240,7 +2241,7 @@ func TestStreamSessionEvents(t *testing.T) {
t.Cleanup(cancel)

// ignore the response as we don't want the events or the error (the session will not exist)
clt.StreamSessionEvents(ctx, session.ID("44c6cea8-362f-11ea-83aa-125400432324"), 0)
clt.StreamSessionEvents(ctx, session.ID("44c6cea8-362f-11ea-83aa-125400432324"), 0, teleport.JSON)

// we need to wait for a short period to ensure the event is returned
time.Sleep(500 * time.Millisecond)
Expand All @@ -2255,6 +2256,7 @@ func TestStreamSessionEvents(t *testing.T) {

event := searchEvents[0].(*apievents.SessionRecordingAccess)
require.Equal(t, username, event.User)
require.Equal(t, teleport.JSON, event.Format)
}

// TestAPILockedOut tests Auth API when there are locks involved.
Expand Down Expand Up @@ -6172,7 +6174,7 @@ func TestLocalServiceRolesHavePermissionsForUploaderService(t *testing.T) {
t.Cleanup(func() { s.alog = originalLog })
s.alog = events.NewDiscardAuditLog()

eventC, errC := s.StreamSessionEvents(ctx, "foo", 0)
eventC, errC := s.StreamSessionEvents(ctx, "foo", 0, "" /* format */)
select {
case err := <-errC:
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions lib/auth/authclient/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func (c *Client) CompareAndSwapUser(ctx context.Context, new, expected types.Use
// StreamSessionEvents streams all events from a given session recording. An error is returned on the first
// channel if one is encountered. Otherwise the event channel is closed when the stream ends.
// The event channel is not closed on error to prevent race conditions in downstream select statements.
func (c *Client) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
return c.APIClient.StreamSessionEvents(ctx, string(sessionID), startIndex)
func (c *Client) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64, format string) (chan apievents.AuditEvent, chan error) {
return c.APIClient.StreamSessionEvents(ctx, string(sessionID), startIndex, format)
}

// SearchEvents allows searching for audit events with pagination support.
Expand Down
4 changes: 2 additions & 2 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3385,7 +3385,7 @@ func (g *GRPCServer) StreamSessionEvents(req *authpb.StreamSessionEventsRequest,
return trace.Wrap(err)
}

c, e := auth.ServerWithRoles.StreamSessionEvents(stream.Context(), session.ID(req.SessionID), int64(req.StartIndex))
c, e := auth.ServerWithRoles.StreamSessionEvents(stream.Context(), session.ID(req.SessionID), int64(req.StartIndex), req.Format)
for {
select {
case event, more := <-c:
Expand Down Expand Up @@ -5573,7 +5573,7 @@ func (g *GRPCServer) StreamUnstructuredSessionEvents(req *auditlogpb.StreamUnstr
return trace.Wrap(err)
}

c, e := auth.ServerWithRoles.StreamSessionEvents(stream.Context(), session.ID(req.SessionId), int64(req.StartIndex))
c, e := auth.ServerWithRoles.StreamSessionEvents(stream.Context(), session.ID(req.SessionId), int64(req.StartIndex), "" /* format */)

for {
select {
Expand Down
1 change: 1 addition & 0 deletions lib/client/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (p *playFromFileStreamer) StreamSessionEvents(
ctx context.Context,
sessionID session.ID,
startIndex int64,
_ string,
) (chan apievents.AuditEvent, chan error) {
evts := make(chan apievents.AuditEvent)
errs := make(chan error, 1)
Expand Down
2 changes: 1 addition & 1 deletion lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ type SessionStreamer interface {
// statements. Both returned channels must be driven until the event channel
// is exhausted or the error channel reports an error, or until the context
// is canceled.
StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error)
StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64, format string) (chan apievents.AuditEvent, chan error)
}

type SearchEventsRequest struct {
Expand Down
2 changes: 1 addition & 1 deletion lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (l *AuditLog) GetEventExportChunks(ctx context.Context, req *auditlogpb.Get
}

// StreamSessionEvents implements [SessionStreamer].
func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64, _ string) (chan apievents.AuditEvent, chan error) {
l.log.DebugContext(ctx, "StreamSessionEvents", "session_id", string(sessionID))
e := make(chan error, 1)
c := make(chan apievents.AuditEvent)
Expand Down
2 changes: 1 addition & 1 deletion lib/events/auditlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestConcurrentStreaming(t *testing.T) {
errors := make(chan error, streams)
for i := 0; i < streams; i++ {
go func() {
eventsC, errC := alog.StreamSessionEvents(ctx, sid, 0)
eventsC, errC := alog.StreamSessionEvents(ctx, sid, 0, "" /* format */)
for {
select {
case err := <-errC:
Expand Down
2 changes: 1 addition & 1 deletion lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData
var lastEvent events.AuditEvent
ctx, cancel := context.WithCancel(ctx)
defer cancel()
evts, errors := u.cfg.AuditLog.StreamSessionEvents(ctx, uploadData.SessionID, 0)
evts, errors := u.cfg.AuditLog.StreamSessionEvents(ctx, uploadData.SessionID, 0, "" /* format */)

loop:
for {
Expand Down
2 changes: 1 addition & 1 deletion lib/events/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (d *DiscardAuditLog) EmitAuditEvent(ctx context.Context, event apievents.Au
return nil
}

func (d *DiscardAuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
func (d *DiscardAuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64, _ string) (chan apievents.AuditEvent, chan error) {
c, e := make(chan apievents.AuditEvent), make(chan error, 1)
close(c)
return c, e
Expand Down
2 changes: 1 addition & 1 deletion lib/events/eventstest/mock_auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type MockAuditLog struct {
SessionEvents []apievents.AuditEvent
}

func (m *MockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
func (m *MockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID, startIndex int64, _ string) (chan apievents.AuditEvent, chan error) {
errors := make(chan error, 1)
events := make(chan apievents.AuditEvent)

Expand Down
2 changes: 1 addition & 1 deletion lib/events/eventstest/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type fakeStreamer struct {
interval time.Duration
}

func (f fakeStreamer) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
func (f fakeStreamer) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64, _ string) (chan apievents.AuditEvent, chan error) {
errors := make(chan error, 1)
events := make(chan apievents.AuditEvent)

Expand Down
3 changes: 2 additions & 1 deletion lib/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Streamer interface {
ctx context.Context,
sessionID session.ID,
startIndex int64,
format string,
) (chan events.AuditEvent, chan error)
}

Expand Down Expand Up @@ -182,7 +183,7 @@ func (p *Player) stream() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventsC, errC := p.streamer.StreamSessionEvents(ctx, p.sessionID, 0)
eventsC, errC := p.streamer.StreamSessionEvents(ctx, p.sessionID, 0, teleport.PTY)
lastDelay := int64(0)
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions lib/player/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ type simpleStreamer struct {
delay int64 // milliseconds
}

func (s *simpleStreamer) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
func (s *simpleStreamer) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64, _ string) (chan apievents.AuditEvent, chan error) {
errors := make(chan error, 1)
evts := make(chan apievents.AuditEvent)

Expand Down Expand Up @@ -366,7 +366,7 @@ type databaseStreamer struct {
idx int64
}

func (d *databaseStreamer) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
func (d *databaseStreamer) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64, _ string) (chan apievents.AuditEvent, chan error) {
errors := make(chan error, 1)
evts := make(chan apievents.AuditEvent)

Expand Down
2 changes: 1 addition & 1 deletion tool/tsh/common/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func exportSession(cf *CLIConf) error {
}
defer clusterClient.Close()

eventC, errC := clusterClient.AuthClient.StreamSessionEvents(cf.Context, *sid, 0)
eventC, errC := clusterClient.AuthClient.StreamSessionEvents(cf.Context, *sid, 0, format)

var exporter sessionExporter
switch format {
Expand Down
3 changes: 2 additions & 1 deletion tool/tsh/common/recording_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func writeMovie(ctx context.Context, ss events.SessionStreamer, sid session.ID,
var width, height int32
currentFilename := makeAVIFileName(prefix, fileCount)

evts, errs := ss.StreamSessionEvents(ctx, sid, 0)
// TODO(gabrielcorado): Define a recording format for this.
evts, errs := ss.StreamSessionEvents(ctx, sid, 0, "" /* format */)
fastPathReceived := false
loop:
for {
Expand Down
Loading