Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: vm scheduler #10

Merged
merged 6 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# ROOT_FS_STORAGE_DSN=
# VM_STORAGE_DSN=
VM_STATE_URL=redis://localhost:6379
FUNCTION_STATE_STORAGE_DSN='host=localhost user=postgres password=postgres dbname=postgres port=5432 sslmode=disable TimeZone=Asia/Shanghai'
JWT_SECRET="I am so secret! Hopefully someone doesn't commit me.."

Expand Down
15 changes: 15 additions & 0 deletions database/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package database

import (
"github.com/redis/go-redis/v9"
)

func InitRedis(url string) *redis.Client {
opts, err := redis.ParseURL(url)

if err != nil {
panic(err)
}

return redis.NewClient(opts)
}
3 changes: 1 addition & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ services:
VMState:
image: bitnami/redis:latest
environment:
- REDIS_PASSWORD=password
- REDIS_DATABASE=functionState
- ALLOW_EMPTY_PASSWORD=yes
ports:
- 6379:6379
volumes:
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
Expand All @@ -21,6 +23,7 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/redis/go-redis/v9 v9.5.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/rs/xid v1.5.0 // indirect
golang.org/x/sync v0.6.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/bytedance/sonic v1.11.3 h1:jRN+yEjakWh8aK5FzrciUHG8OFXK+4/KrAX/ysEtHA
github.com/bytedance/sonic v1.11.3/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4=
github.com/caarlos0/env/v10 v10.0.0 h1:yIHUBZGsyqCnpTkbjk8asUlx6RFhhEs+h7TOBdgdzXA=
github.com/caarlos0/env/v10 v10.0.0/go.mod h1:ZfulV76NvVPw3tm591U4SwL3Xx9ldzBP9aGxzeN7G18=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0=
Expand All @@ -17,6 +19,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
Expand Down Expand Up @@ -88,6 +92,8 @@ github.com/pelletier/go-toml/v2 v2.2.0 h1:QLgLl2yMN7N+ruc31VynXs1vhMZa7CeHHejIeB
github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
Expand Down
24 changes: 23 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package main

import (
"context"
"log"
"time"

"github.com/do4-2022/grobuzin/database"
"github.com/do4-2022/grobuzin/routes"
"github.com/do4-2022/grobuzin/scheduler"

"github.com/caarlos0/env/v10"

Expand All @@ -16,7 +19,7 @@ import (

type Config struct {
// rootFsStorageDSN string `env:"ROOT_FS_STORAGE_DSN,notEmpty"`
// VMStorageDSN string `env:"VM_STORAGE_DSN,notEmpty"`
VMStateURL string `env:"VM_STATE_URL,notEmpty"`
FuntionStateStorageDSN string `env:"FUNCTION_STATE_STORAGE_DSN,notEmpty" envDefault:"host=localhost user=postgres password=postgres dbname=postgres port=5432 sslmode=disable TimeZone=Asia/Shanghai"`
JWTSecret string `env:"JWT_SECRET,notEmpty"`
MinioEndpoint string `env:"MINIO_ENDPOINT,notEmpty"`
Expand All @@ -31,6 +34,25 @@ func main() {
log.Fatalf("%+v\n", err)
}

ctx := context.Background()
redis := database.InitRedis(cfg.VMStateURL)

s := &scheduler.Scheduler{
Redis: redis,
Context: &ctx,
}
//Now inject the scheduler into the routes that need it!

go func() {
// every 24 hours we check for
for {
time.Sleep(time.Hour * 6);

log.Println("Ran instance pruning at", time.Now().UTC())
s.FindAndDestroyUnsused(24);
}
}()

db := database.Init(cfg.FuntionStateStorageDSN)
r := routes.GetRoutes(db, cfg.JWTSecret, getMinioClient(cfg))

Expand Down
Empty file removed scheduler/.keep
Empty file.
74 changes: 74 additions & 0 deletions scheduler/lambdoService.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package scheduler

import (
"fmt"

"github.com/google/uuid"

"bytes"
"encoding/json"
"io"
"net/http"
)

type LambdoService struct {
URL string
MinioURL string
}

type LambdoSpawnRequest struct {
RootfsURL string `json:"rootfs"`
}

type LambdoSpawnResponse struct {
ID string `json:"ID"`
Port uint16 `json:"port"`
Address string `json:"address"`
}

func (service *LambdoService) SpawnVM(function_id uuid.UUID) (data LambdoSpawnResponse, err error) {
var res *http.Response
defer func() {
if res != nil {
res.Body.Close()
}
}()

body, err := json.Marshal(&LambdoSpawnRequest{
RootfsURL: fmt.Sprintf("%s/%s", service.MinioURL, function_id),
})

if err != nil {
return
}
alexis-langlet marked this conversation as resolved.
Show resolved Hide resolved

res, err = http.Post(
fmt.Sprintf(service.URL, "/spawn"),
"application/json",
bytes.NewReader(body),
)

if err != nil {
return
}
alexis-langlet marked this conversation as resolved.
Show resolved Hide resolved

bytes, err := io.ReadAll(res.Body)

if err != nil {
return
}
alexis-langlet marked this conversation as resolved.
Show resolved Hide resolved

err = json.Unmarshal(bytes, &data)

return
}

func (service *LambdoService) DeleteVM(VMID string) (err error) {
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf(service.URL, "/destroy/", VMID ), nil)
if err != nil {
return
}
_, err = http.DefaultClient.Do(req)

return
}
142 changes: 142 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package scheduler

import (
"context"
"errors"
"fmt"
"time"

"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)

type StatusCode int

const (
Creating StatusCode = iota
Ready
Running
Unknown
)

// This struct represents an instance of a function, especially it's address and status
// Each key in the redis is namespaced as it follows:
// <id of the function>:<id of the instance>
type FunctionLocation struct {
Address string `redis:"address"`
Port uint16 `redis:"port"`
Status StatusCode `redis:"status"`
LastUsed string `redis:"lastUsed"`
}


type Scheduler struct {
Redis *redis.Client
Context *context.Context
Lambdo *LambdoService
}

// Uses SCAN to look for the first instance marked as ready (https://redis.io/docs/latest/commands/scan/)
func (s *Scheduler) LookForReadyInstance(functionId uuid.UUID, cursor uint64) (id string, returnedCursor uint64, err error) {
// match rule to get all instances of a function
locationMatch := fmt.Sprintf(functionId.String(), ":*")

keys, returnedCursor, err := s.Redis.Scan(*s.Context, cursor, locationMatch, 10).Result()

if err != nil {
return
}

for _, id := range keys {
code, err := s.Redis.HGet(*s.Context, id, "status").Int();

if err != nil && StatusCode(code) == Ready {
// we found a ready instance so we return it
return id, 0, nil
}
}

if returnedCursor != 0 {
// the current sweep did not find anything, we try again
return s.LookForReadyInstance(functionId, returnedCursor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this is recursive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically in Redis, SCAN allows you to get keys matching a certain pattern just like KEYS would.

While KEYS locks the DB and scans everything in one shot, SCAN gets x keys and returns a cursor, you can then use the returned cursor to your next SCAN call to continue scanning until the returned cursor is 0.

This is why it is recursive, I can refactor it into interative if you want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I understand
You don't need to put that in an iterative way but if could just add some comments in your code to say what you just explained it would be really nice

}

return "", 0, errors.New("could not find an available function") // we did not find anything thus, id is empty
}

func (s *Scheduler) SpawnVM(functionId uuid.UUID) (LambdoRunResponse, err error) {
res, err := s.Lambdo.SpawnVM(functionId)

if (err != nil) {
return
}

locationID := fmt.Sprintf(functionId.String(), ":", res.ID)

err = s.Redis.HSet(*s.Context, locationID, &FunctionLocation{
Address: res.Address,
Port: res.Port,
Status: Creating,
LastUsed: "never",
}).Err()

return
}

func (s *Scheduler) GetFunctionLocations(functionId uuid.UUID) (locations []FunctionLocation) {
locationQuery := fmt.Sprintf(functionId.String(), ":*")
firstsweep, cursor := true, uint64(0)

// because do-while does not exists in these lands
for !firstsweep && cursor != 0 {
firstsweep = false

var keys []string
keys, cursor = s.Redis.Scan(*s.Context, cursor, locationQuery, 10).Val()

// for each key we got, we retrieve all of it's information
for _, ID := range keys {
var location FunctionLocation

if s.Redis.HGetAll(*s.Context, ID).Scan(&location) != nil {
return
}

locations = append(locations, location)
}
}

return
}

// goes through the whole redis instance and remove that have not been used within the last {hoursTimeout} Hours
func (s *Scheduler) FindAndDestroyUnsused(hoursTimeout float64) {
now := time.Now()
keys, cursor := s.Redis.Scan(*s.Context, 0, "*", 10).Val()

for cursor != 0 {
for _, ID := range keys {
val, err := s.Redis.HGet(*s.Context, ID, "lastUsed").Result()

if err != nil {
continue
}

// if it was never used we delete this
if val == "never" {
if s.Lambdo.DeleteVM(ID) != nil {
s.Redis.Del(*s.Context, ID)
}
} else {
// if it hasn't been used for {hoursTimeout} we delete it
lastUsed, err := time.Parse(time.UnixDate, val)
if err != nil || now.Sub(lastUsed).Abs().Hours() >= hoursTimeout {
if s.Lambdo.DeleteVM(ID) != nil {
s.Redis.Del(*s.Context, ID)
}
}
}
}
keys, cursor = s.Redis.Scan(*s.Context, cursor, "*", 10).Val()
}
}
Loading