Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Final fixes (for initial release) #19

Merged
merged 5 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# ROOT_FS_STORAGE_DSN=
LAMBDO_URL="http://localhost:3000"
VM_STATE_URL=redis://localhost:6379
FUNCTION_STATE_STORAGE_DSN='host=localhost user=postgres password=postgres dbname=postgres port=5432 sslmode=disable TimeZone=Asia/Shanghai'
JWT_SECRET="I am so secret! Hopefully someone doesn't commit me.."
Expand All @@ -9,3 +8,6 @@ BUILDER_ENDPOINT="http://localhost:8080" # grobuzin's builder service endpoint
MINIO_ENDPOINT="localhost:9000"
MINIO_ACCESS_KEY="access_key"
MINIO_SECRET_KEY="secret_key"

LAMBDO_HOST="localhost"
LAMBDO_PORT="3000"
10 changes: 5 additions & 5 deletions database/functionState.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ const (
// Each key in the redis is namespaced as it follows:
// <id of the function>:<id of the instance>
type FunctionState struct {
ID string `redis:"address"`
Address string `redis:"address"`
Port uint16 `redis:"port"`
Status FnStatusCode `redis:"status"`
LastUsed string `redis:"lastUsed"`
ID string `redis:"id"`
Address string `redis:"address"`
Port uint16 `redis:"port"`
Status int `redis:"status"`
LastUsed string `redis:"lastUsed"`
}
13 changes: 11 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (

type Config struct {
// rootFsStorageDSN string `env:"ROOT_FS_STORAGE_DSN,notEmpty"`
LambdoURL string `env:"LAMBDO_URL,notEmpty"`
LambdoHost string `env:"LAMBDO_HOST,notEmpty"`
LambdoPort int `env:"LAMBDO_PORT,notEmpty"`
VMStateURL string `env:"VM_STATE_URL,notEmpty"`
FuntionStateStorageDSN string `env:"FUNCTION_STATE_STORAGE_DSN,notEmpty" envDefault:"host=localhost user=postgres password=postgres dbname=postgres port=5432 sslmode=disable TimeZone=Asia/Shanghai"`
JWTSecret string `env:"JWT_SECRET,notEmpty"`
Expand All @@ -41,12 +42,20 @@ func main() {
ctx := context.Background()
redis := database.InitRedis(cfg.VMStateURL)

bucketPrefix := "http://"

if cfg.MinioSecure {
bucketPrefix = "https://"
}

s := &scheduler.Scheduler{
Redis: redis,
Context: &ctx,
Lambdo: &scheduler.LambdoService{
URL: cfg.LambdoURL,
Host: cfg.LambdoHost,
Port: cfg.LambdoPort,
BucketURL: fmt.Sprint(
bucketPrefix,
cfg.MinioEndpoint,
"/",
objectStorage.BucketName,
Expand Down
17 changes: 10 additions & 7 deletions routes/function/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,19 @@ func (c *Controller) RunFunction(ctx *gin.Context) {
res, err := c.Scheduler.SpawnVM(fn)

if err != nil {
ctx.AbortWithStatusJSON(500, gin.H{"error": err.Error()})
log.Println(err.Error())
ctx.AbortWithStatusJSON(500, gin.H{"error": "Could not cold start the function"})
return
}

// retrieving freshly created function state
fnState, err = c.Scheduler.GetStateByID(
fmt.Sprintf(fnID.String(), ":", res.ID),
fmt.Sprint(fnID.String(), ":", res.ID),
)

if err != nil {
log.Println(err.Error())
ctx.AbortWithStatusJSON(500, gin.H{"error": "Could not cold start the function"})
return
}
} else if err != nil { // else if the error is not a record not found, we return an error
Expand All @@ -217,9 +220,9 @@ func (c *Controller) RunFunction(ctx *gin.Context) {
return
}

stateID := fmt.Sprintf(fnID.String(), ":", fnState.ID)
stateID := fmt.Sprint(fnID.String(), ":", fnState.ID)

if fnState.Status != database.FnReady {
if fnState.Status != int(database.FnReady) {
log.Println("Waiting for function", fn.ID, "to be ready")
time.Sleep(100 * time.Millisecond)

Expand All @@ -235,14 +238,14 @@ func (c *Controller) RunFunction(ctx *gin.Context) {
return
}

if fnState.Status == database.FnReady {
if fnState.Status == int(database.FnReady) {
log.Println("Function", fnID, "is ready")
break
};
}

// if even after 5 attempts the function is not ready, we return an error
if fnState.Status != database.FnReady {
if fnState.Status != int(database.FnReady) {
ctx.AbortWithStatusJSON(503, gin.H{"error": "Function is not ready"})
return
}
Expand All @@ -258,7 +261,7 @@ func (c *Controller) RunFunction(ctx *gin.Context) {
}

_, err = http.Post(
fmt.Sprint("http://", string(fnState.Address), ":", fnState.Port, "/execute"),
fmt.Sprint(string(fnState.Address), ":", fnState.Port, "/execute"),
"application/json",
ctx.Request.Body,
)
Expand Down
11 changes: 8 additions & 3 deletions scheduler/lambdoService.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ const (
)

type LambdoService struct {
URL string
Host string
Port int
BucketURL string
}

Expand All @@ -44,6 +45,10 @@ type LambdoSpawnResponse struct {
Ports [][2]uint16 `json:"port_mapping"`
}

func (service *LambdoService) url() string {
return fmt.Sprint("http://", service.Host, ":", service.Port)
}

func (service *LambdoService) SpawnVM(function database.Function) (data LambdoSpawnResponse, err error) {
var res *http.Response
defer func() {
Expand All @@ -69,7 +74,7 @@ func (service *LambdoService) SpawnVM(function database.Function) (data LambdoSp
}

res, err = http.Post(
fmt.Sprint(service.URL, "/spawn"),
fmt.Sprint(service.url(), "/spawn"),
"application/json",
bytes.NewReader(body),
)
Expand All @@ -90,7 +95,7 @@ func (service *LambdoService) SpawnVM(function database.Function) (data LambdoSp
}

func (service *LambdoService) DeleteVM(VMID string) (err error) {
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf(service.URL, "/destroy/", VMID ), nil)
req, err := http.NewRequest(http.MethodDelete, fmt.Sprint(service.url(), "/destroy/", VMID ), nil)
if err != nil {
return
}
Expand Down
12 changes: 7 additions & 5 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"time"

"github.com/do4-2022/grobuzin/database"

"github.com/google/uuid"
Expand Down Expand Up @@ -34,14 +35,14 @@ func (s *Scheduler) SetStatus(id string, status database.FnStatusCode) error {
*s.Context,
id,
"status",
status,
int(status),
).Err()
}

// Uses SCAN to look for the first instance marked as ready (https://redis.io/docs/latest/commands/scan/)
func (s *Scheduler) LookForReadyInstance(functionId uuid.UUID, cursor uint64) (fnState database.FunctionState, returnedCursor uint64, err error) {
// match rule to get all instances of a function
stateMatch := fmt.Sprintf(functionId.String(), ":*")
stateMatch := fmt.Sprint(functionId.String(), ":*")

keys, returnedCursor, err := s.Redis.Scan(*s.Context, cursor, stateMatch, 10).Result()

Expand All @@ -67,19 +68,20 @@ func (s *Scheduler) LookForReadyInstance(functionId uuid.UUID, cursor uint64) (f
}

func (s *Scheduler) SpawnVM(function database.Function) (fnState database.FunctionState, err error) {
url := fmt.Sprint("http://", s.Lambdo.Host)
res, err := s.Lambdo.SpawnVM(function)

if (err != nil) {
return
}

stateID := fmt.Sprintf(function.ID.String(), ":", res.ID)
stateID := fmt.Sprint(function.ID.String(), ":", res.ID)

fnState = database.FunctionState{
ID: res.ID,
Address: s.Lambdo.URL,
Address: url,
Port: res.Ports[0][0],
Status: database.FnReady,
Status: int(database.FnReady),
LastUsed: "never",
}

Expand Down
Loading