Skip to content

Commit

Permalink
Merge pull request #41 from buildbarn/ch
Browse files Browse the repository at this point in the history
Improve BES gRPC server
  • Loading branch information
mortenmj authored Oct 18, 2024
2 parents d0ff213 + 1f2e28e commit 915c691
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 127 deletions.
2 changes: 1 addition & 1 deletion cmd/bb_portal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func main() {
if err := bb_grpc.NewServersFromConfigurationAndServe(
configuration.GrpcServers,
func(s go_grpc.ServiceRegistrar) {
build.RegisterPublishBuildEventServer(s.(*go_grpc.Server), bes.New(dbClient, blobArchiver))
build.RegisterPublishBuildEventServer(s.(*go_grpc.Server), bes.NewBuildEventServer(dbClient, blobArchiver))
},
siblingsGroup,
); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion internal/api/grpc/bes/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "bes",
srcs = ["bes.go"],
srcs = [
"channel.go",
"handler.go",
"server.go",
],
importpath = "github.com/buildbarn/bb-portal/internal/api/grpc/bes",
visibility = ["//:__subpackages__"],
deps = [
Expand Down
125 changes: 0 additions & 125 deletions internal/api/grpc/bes/bes.go

This file was deleted.

72 changes: 72 additions & 0 deletions internal/api/grpc/bes/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package bes

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"google.golang.org/protobuf/encoding/protojson"

"github.com/buildbarn/bb-portal/pkg/events"
"github.com/buildbarn/bb-portal/pkg/processing"
"github.com/buildbarn/bb-portal/pkg/summary"
"github.com/buildbarn/bb-portal/third_party/bazel/gen/bes"
"google.golang.org/genproto/googleapis/devtools/build/v1"
)

// BuildEventChannel handles a single BuildEvent stream
type BuildEventChannel struct {
ctx context.Context
streamID *build.StreamId
summarizer *summary.Summarizer
workflow *processing.Workflow
}

// HandleBuildEvent processes a single BuildEvent
func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
if event.GetBazelEvent() == nil {
return nil
}

var bazelEvent bes.BuildEvent
err := event.GetBazelEvent().UnmarshalTo(&bazelEvent)
if err != nil {
slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err)
return err
}
buildEvent := events.NewBuildEvent(&bazelEvent, json.RawMessage(protojson.Format(&bazelEvent)))
if err = c.summarizer.ProcessEvent(&buildEvent); err != nil {
slog.ErrorContext(c.ctx, "ProcessEvent failed", "err", err)
return fmt.Errorf("could not process event (%s): , %w", buildEvent, err)
}
return nil
}

// Finalize wraps up processing of a stream of BuildEvent
func (c *BuildEventChannel) Finalize() error {
summaryReport, err := c.summarizer.FinishProcessing()
if err != nil {
slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err)
return err
}

// Hack for eventFile being required
summaryReport.EventFileURL = fmt.Sprintf(
"grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?streamID=%s",
c.streamID.String(),
)

slog.InfoContext(c.ctx, "Saving invocation", "id", c.streamID.String())
startTime := time.Now()
invocation, err := c.workflow.SaveSummary(c.ctx, summaryReport)
if err != nil {
slog.ErrorContext(c.ctx, "SaveSummary failed", "err", err)
return err
}
endTime := time.Now()
elapsedTime := endTime.Sub(startTime)
slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID)
return nil
}
38 changes: 38 additions & 0 deletions internal/api/grpc/bes/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package bes

import (
"context"

"github.com/buildbarn/bb-portal/pkg/processing"
"github.com/buildbarn/bb-portal/pkg/summary"
"google.golang.org/genproto/googleapis/devtools/build/v1"
)

// BuildEventHandler orchestrates the handling of incoming Build Event streams.
// For each incoming stream, and BuildEventChannel is created, which handles that stream.
// BuildEventHandler is responsible for managing the things that are common to these event streams.
type BuildEventHandler struct {
workflow *processing.Workflow
}

// NewBuildEventHandler constructs a new BuildEventHandler
// TODO: Ensure we allow processing to complete before shutdown
// TODO: Cancel previous processing for an invocation if the client retries
// TODO: Write metrics
func NewBuildEventHandler(workflow *processing.Workflow) *BuildEventHandler {
return &BuildEventHandler{
workflow: workflow,
}
}

// CreateEventChannel creates a new BuildEventChannel
func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, streamID *build.StreamId) *BuildEventChannel {
summarizer := summary.NewSummarizer()

return &BuildEventChannel{
ctx: ctx,
streamID: streamID,
summarizer: summarizer,
workflow: h.workflow,
}
}
97 changes: 97 additions & 0 deletions internal/api/grpc/bes/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package bes

import (
"context"
"io"
"log/slog"

build "google.golang.org/genproto/googleapis/devtools/build/v1"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/buildbarn/bb-portal/ent/gen/ent"
"github.com/buildbarn/bb-portal/pkg/processing"
)

// BuildEventServer implements the Build Event Service.
// It receives events and forwards them to a BuildEventChannel.
// TODO: Should this support forwarding events? Users might want to create their own
// tooling that reacts to build events, and it would be useful if this service could
// forward events to those.
type BuildEventServer struct {
handler *BuildEventHandler
}

// NewBuildEventServer creates a new BuildEventServer
func NewBuildEventServer(db *ent.Client, blobArchiver processing.BlobMultiArchiver) build.PublishBuildEventServer {
return &BuildEventServer{
handler: NewBuildEventHandler(processing.New(db, blobArchiver)),
}
}

// PublishLifecycleEvent handles life cycle events.
func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) {
slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent()))
return &emptypb.Empty{}, nil
}

// PublishBuildToolEventStream handles a build tool event stream.
func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error {
slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context())

ack := func(req *build.PublishBuildToolEventStreamRequest) {
if err := stream.Send(&build.PublishBuildToolEventStreamResponse{
StreamId: req.OrderedBuildEvent.StreamId,
SequenceNumber: req.OrderedBuildEvent.SequenceNumber,
}); err != nil {
slog.ErrorContext(stream.Context(), "Send failed", "err", err)
}
}

var streamID *build.StreamId
reqCh := make(chan *build.PublishBuildToolEventStreamRequest)
errCh := make(chan error)
var eventCh *BuildEventChannel

go func() {
for {
req, err := stream.Recv()
if err != nil {
errCh <- err
return
}
reqCh <- req
}
}()

for {
select {
case err := <-errCh:
if err == io.EOF {
slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context())
if eventCh == nil {
return nil
}

return eventCh.Finalize()
}

slog.ErrorContext(stream.Context(), "Recv failed", "err", err)
return err

case req := <-reqCh:
// First request
if streamID == nil {
streamID = req.OrderedBuildEvent.GetStreamId()
eventCh = s.handler.CreateEventChannel(stream.Context(), streamID)
}

if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil {
slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err)
return err
}

ack(req)
}
}
}

0 comments on commit 915c691

Please sign in to comment.