Skip to content

Commit

Permalink
Feat: beam shell (#817)
Browse files Browse the repository at this point in the history
Adds a shell command to let users connect to containers with their stub
config. It can be used with any decorator that inherits from the
`DeployableMixin`.

<img width="609" alt="Screenshot 2025-01-01 at 6 52 13 PM"
src="https://github.com/user-attachments/assets/0d8232fc-6215-40d7-bfdd-1f717e550aef"
/>
  • Loading branch information
luke-lombardi authored Jan 2, 2025
1 parent c7ad9cb commit 535d153
Show file tree
Hide file tree
Showing 24 changed files with 1,701 additions and 15 deletions.
3 changes: 3 additions & 0 deletions bin/gen_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ protoc -I ./pkg/abstractions/experimental/signal/ --python_betterproto_beta9_out

protoc -I ./pkg/abstractions/experimental/bot/ --go_out=./proto --go_opt=paths=source_relative --go-grpc_out=./proto --go-grpc_opt=paths=source_relative ./pkg/abstractions/experimental/bot/bot.proto
protoc -I ./pkg/abstractions/experimental/bot/ --python_betterproto_beta9_out=./sdk/src/beta9/clients/ ./pkg/abstractions/experimental/bot/bot.proto

protoc -I ./pkg/abstractions/shell/ --go_out=./proto --go_opt=paths=source_relative --go-grpc_out=./proto --go-grpc_opt=paths=source_relative ./pkg/abstractions/shell/shell.proto
protoc -I ./pkg/abstractions/shell/ --python_betterproto_beta9_out=./sdk/src/beta9/clients/ ./pkg/abstractions/shell/shell.proto
2 changes: 1 addition & 1 deletion docker/Dockerfile.runner
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ENV DEBIAN_FRONTEND=noninteractive
RUN <<EOT
echo 'Acquire::ForceIPv4 "true";' | tee /etc/apt/apt.conf.d/1000-force-ipv4-transport
apt-get update
apt-get install -y software-properties-common curl git gcc python3-dev bzip2
apt-get install -y software-properties-common curl git gcc python3-dev bzip2 openssh-server
add-apt-repository ppa:deadsnakes/ppa
apt-get update
EOT
Expand Down
107 changes: 107 additions & 0 deletions pkg/abstractions/shell/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package shell

import (
"context"
"io"
"net/http"
"sync"

apiv1 "github.com/beam-cloud/beta9/pkg/api/v1"
"github.com/beam-cloud/beta9/pkg/auth"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/types"
"github.com/labstack/echo/v4"
)

type shellGroup struct {
routerGroup *echo.Group
ss *SSHShellService
}

func registerShellRoutes(g *echo.Group, ss *SSHShellService) *shellGroup {
group := &shellGroup{routerGroup: g, ss: ss}
g.CONNECT("/id/:stubId/:containerId", auth.WithAuth(group.ShellConnect))
return group
}

func (g *shellGroup) ShellConnect(ctx echo.Context) error {
cc, _ := ctx.(*auth.HttpAuthContext)

containerId := ctx.Param("containerId")
stubId := ctx.Param("stubId")

stub, err := g.ss.backendRepo.GetStubByExternalId(ctx.Request().Context(), stubId, types.QueryFilter{
Field: "workspace_id",
Value: cc.AuthInfo.Token.Workspace.ExternalId,
})
if err != nil {
return apiv1.HTTPInternalServerError("Failed to retrieve stub")
} else if stub == nil {
return apiv1.HTTPNotFound()
}

containerAddress, err := g.ss.containerRepo.GetContainerAddress(containerId)
if err != nil {
return ctx.String(http.StatusBadGateway, "Failed to connect to container")
}

// Channel to signal when either connection is closed
done := make(chan struct{})
var once sync.Once

go g.ss.keepAlive(ctx.Request().Context(), containerId, done)

// Send a 200 OK before hijacking
ctx.Response().WriteHeader(http.StatusOK)
ctx.Response().Flush()

// Hijack the connection
hijacker, ok := ctx.Response().Writer.(http.Hijacker)
if !ok {
return ctx.String(http.StatusInternalServerError, "Failed to create tunnel")
}

conn, _, err := hijacker.Hijack()
if err != nil {
return ctx.String(http.StatusInternalServerError, "Failed to create tunnel")
}
defer conn.Close()

// Dial ssh server in the container
containerConn, err := network.ConnectToHost(ctx.Request().Context(), containerAddress, containerDialTimeoutDurationS, g.ss.tailscale, g.ss.config.Tailscale)
if err != nil {
return ctx.String(http.StatusBadGateway, "Failed to connect to container")
}
defer containerConn.Close()

// Create a context that will be canceled when the client disconnects
clientCtx, clientCancel := context.WithCancel(ctx.Request().Context())
defer clientCancel()

defer func() {
containerConn.Close()
conn.Close()
}()

go func() {
buf := make([]byte, shellProxyBufferSizeKb)
_, _ = io.CopyBuffer(containerConn, conn, buf)
once.Do(func() { close(done) })
}()

go func() {
buf := make([]byte, shellProxyBufferSizeKb)
_, _ = io.CopyBuffer(conn, containerConn, buf)
once.Do(func() { close(done) })
}()

// Wait for either connection to close
select {
case <-done:
return nil
case <-clientCtx.Done():
return nil
case <-g.ss.ctx.Done():
return nil
}
}
Loading

0 comments on commit 535d153

Please sign in to comment.