From 4b1783002c5f528e7970cdc321d324a294eac5e7 Mon Sep 17 00:00:00 2001 From: WoodenMaiden Date: Thu, 11 Apr 2024 15:29:34 +0200 Subject: [PATCH 1/6] feat: redis connection --- .env.example | 2 +- database/redis.go | 15 +++++++++++++++ docker-compose.yaml | 3 +-- go.mod | 4 ++++ go.sum | 6 ++++++ main.go | 8 +++++++- 6 files changed, 34 insertions(+), 4 deletions(-) create mode 100644 database/redis.go diff --git a/.env.example b/.env.example index fc8bbf5..4657d13 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,5 @@ # ROOT_FS_STORAGE_DSN= -# VM_STORAGE_DSN= +VM_STATE_URL=redis://localhost:6379 FUNCTION_STATE_STORAGE_DSN='host=localhost user=postgres password=postgres dbname=postgres port=5432 sslmode=disable TimeZone=Asia/Shanghai' JWT_SECRET="I am so secret! Hopefully someone doesn't commit me.." diff --git a/database/redis.go b/database/redis.go new file mode 100644 index 0000000..36480b4 --- /dev/null +++ b/database/redis.go @@ -0,0 +1,15 @@ +package database + +import ( + "github.com/redis/go-redis/v9" +) + +func InitRedis(url string) *redis.Client { + opts, err := redis.ParseURL(url) + + if err != nil { + panic(err) + } + + return redis.NewClient(opts) +} diff --git a/docker-compose.yaml b/docker-compose.yaml index 9b8dbe6..4205e57 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,8 +2,7 @@ services: VMState: image: bitnami/redis:latest environment: - - REDIS_PASSWORD=password - - REDIS_DATABASE=functionState + - ALLOW_EMPTY_PASSWORD=yes ports: - 6379:6379 volumes: diff --git a/go.mod b/go.mod index defe81c..d056a68 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,9 @@ require ( require ( github.com/dustin/go-humanize v1.0.1 // indirect + github.com/caarlos0/env/v10 v10.0.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect github.com/jackc/pgx/v5 v5.5.5 // indirect @@ -21,6 +24,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/sha256-simd v1.0.1 // indirect + github.com/redis/go-redis/v9 v9.5.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/rs/xid v1.5.0 // indirect golang.org/x/sync v0.6.0 // indirect diff --git a/go.sum b/go.sum index 47c2330..1747d7e 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/bytedance/sonic v1.11.3 h1:jRN+yEjakWh8aK5FzrciUHG8OFXK+4/KrAX/ysEtHA github.com/bytedance/sonic v1.11.3/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4= github.com/caarlos0/env/v10 v10.0.0 h1:yIHUBZGsyqCnpTkbjk8asUlx6RFhhEs+h7TOBdgdzXA= github.com/caarlos0/env/v10 v10.0.0/go.mod h1:ZfulV76NvVPw3tm591U4SwL3Xx9ldzBP9aGxzeN7G18= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0= @@ -17,6 +19,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= @@ -88,6 +92,8 @@ github.com/pelletier/go-toml/v2 v2.2.0 h1:QLgLl2yMN7N+ruc31VynXs1vhMZa7CeHHejIeB github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= +github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= diff --git a/main.go b/main.go index 34bd5b9..6b168be 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "context" "github.com/do4-2022/grobuzin/database" "github.com/do4-2022/grobuzin/routes" @@ -16,7 +17,7 @@ import ( type Config struct { // rootFsStorageDSN string `env:"ROOT_FS_STORAGE_DSN,notEmpty"` - // VMStorageDSN string `env:"VM_STORAGE_DSN,notEmpty"` + VMStateURL string `env:"VM_STATE_URL,notEmpty"` FuntionStateStorageDSN string `env:"FUNCTION_STATE_STORAGE_DSN,notEmpty" envDefault:"host=localhost user=postgres password=postgres dbname=postgres port=5432 sslmode=disable TimeZone=Asia/Shanghai"` JWTSecret string `env:"JWT_SECRET,notEmpty"` MinioEndpoint string `env:"MINIO_ENDPOINT,notEmpty"` @@ -31,8 +32,13 @@ func main() { log.Fatalf("%+v\n", err) } + ctx := context.Background() + db := database.Init(cfg.FuntionStateStorageDSN) r := routes.GetRoutes(db, cfg.JWTSecret, getMinioClient(cfg)) + redis := database.InitRedis(cfg.VMStateURL) + redis.Ping(ctx) // TODO define scheduler struct + err := r.Run() From 5dd648054a8133288860f8507dd01b890b17f5c4 Mon Sep 17 00:00:00 2001 From: WoodenMaiden Date: Thu, 11 Apr 2024 16:03:48 +0200 Subject: [PATCH 2/6] feat: basic structure Signed-off-by: WoodenMaiden --- main.go | 12 +++++-- scheduler/.keep | 0 scheduler/lambdoService.go | 74 ++++++++++++++++++++++++++++++++++++++ scheduler/operations.go | 9 +++++ scheduler/scheduler.go | 13 +++++++ 5 files changed, 106 insertions(+), 2 deletions(-) delete mode 100644 scheduler/.keep create mode 100644 scheduler/lambdoService.go create mode 100644 scheduler/operations.go create mode 100644 scheduler/scheduler.go diff --git a/main.go b/main.go index 6b168be..101057d 100644 --- a/main.go +++ b/main.go @@ -2,10 +2,11 @@ package main import ( "log" - "context" + // "context" "github.com/do4-2022/grobuzin/database" "github.com/do4-2022/grobuzin/routes" + // "github.com/do4-2022/grobuzin/scheduler" "github.com/caarlos0/env/v10" @@ -32,7 +33,14 @@ func main() { log.Fatalf("%+v\n", err) } - ctx := context.Background() + /*ctx := context.Background() + redis := database.InitRedis(cfg.VMStateURL) + + s := &scheduler.Scheduler{ + RedisHandle: redis, + Context: &ctx, + }*/ + //Now inject the scheduler into the routes that need it! db := database.Init(cfg.FuntionStateStorageDSN) r := routes.GetRoutes(db, cfg.JWTSecret, getMinioClient(cfg)) diff --git a/scheduler/.keep b/scheduler/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/scheduler/lambdoService.go b/scheduler/lambdoService.go new file mode 100644 index 0000000..fd64b97 --- /dev/null +++ b/scheduler/lambdoService.go @@ -0,0 +1,74 @@ +package scheduler + +import ( + "bytes" + "encoding/json" + "io" + "net/http" +) + +type LambdoService struct { + URL string +} + +type LambdoCodeFileItem struct { + Filename string `json:"filename"` + Content string `json:"content"` +} + +type LambdoRunRequest struct { + Language string `json:"language"` + Version string `json:"version"` + Input string `json:"input"` + Code []LambdoCodeFileItem `json:"code"` +} + +type LambdoRunResponse struct { + Status int `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + Port uint16 `json:"port"` //// + Address string `json:"address"` //// Should be added by Simon +} + +func (service *LambdoService) RunFunction(code string) (data LambdoRunResponse, err error) { + var res *http.Response + defer func() { + if res != nil { + res.Body.Close() + } + }() + + body, err := json.Marshal(&LambdoRunRequest{ + Language: "NODE", // TODO + Version: "1.0.0", + Input: "", + Code: []LambdoCodeFileItem{ + { + Filename: "index.js", + Content: code, + }, + }, + }) + + if err != nil { + return + } + + res, err = http.Post(service.URL, "application/json", bytes.NewReader(body)) + + if err != nil { + return + } + + bytes, err := io.ReadAll(res.Body) + + if err != nil { + return + } + + data = LambdoRunResponse{} + err = json.Unmarshal(bytes, &data) + + return +} \ No newline at end of file diff --git a/scheduler/operations.go b/scheduler/operations.go new file mode 100644 index 0000000..c4c20c2 --- /dev/null +++ b/scheduler/operations.go @@ -0,0 +1,9 @@ +package scheduler + +import ( + "github.com/google/uuid" +) + +func (s *Scheduler) SpawnVM(vmID uuid.UUID) { + +} \ No newline at end of file diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go new file mode 100644 index 0000000..7520f14 --- /dev/null +++ b/scheduler/scheduler.go @@ -0,0 +1,13 @@ +package scheduler + +import ( + "context" + + "github.com/redis/go-redis/v9" +) + +type Scheduler struct { + RedisHandle *redis.Client + Context *context.Context +} + From f3fa7cabcccb2487c46f2e567b9fc8e11bce7937 Mon Sep 17 00:00:00 2001 From: WoodenMaiden Date: Tue, 16 Apr 2024 16:28:55 +0200 Subject: [PATCH 3/6] feat: spawn and watch VMs --- main.go | 6 ++++-- scheduler/operations.go | 9 -------- scheduler/scheduler.go | 46 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 49 insertions(+), 12 deletions(-) delete mode 100644 scheduler/operations.go diff --git a/main.go b/main.go index 101057d..4e90aba 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,7 @@ package main import ( "log" - // "context" + "context" "github.com/do4-2022/grobuzin/database" "github.com/do4-2022/grobuzin/routes" @@ -33,7 +33,9 @@ func main() { log.Fatalf("%+v\n", err) } - /*ctx := context.Background() + ctx := context.Background() + + /* redis := database.InitRedis(cfg.VMStateURL) s := &scheduler.Scheduler{ diff --git a/scheduler/operations.go b/scheduler/operations.go deleted file mode 100644 index c4c20c2..0000000 --- a/scheduler/operations.go +++ /dev/null @@ -1,9 +0,0 @@ -package scheduler - -import ( - "github.com/google/uuid" -) - -func (s *Scheduler) SpawnVM(vmID uuid.UUID) { - -} \ No newline at end of file diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7520f14..7c4cb03 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -2,12 +2,56 @@ package scheduler import ( "context" + "fmt" + "github.com/google/uuid" "github.com/redis/go-redis/v9" ) +type FunctionLocation struct { + Address string `redis:"address"` + Port uint16 `redis:"port"` +} + + type Scheduler struct { - RedisHandle *redis.Client + Redis *redis.Client Context *context.Context + Lambdo *LambdoService +} + + +func (s *Scheduler) SpawnVM(functionId uuid.UUID) (LambdoRunResponse, err error) { + res, err := s.Lambdo.RunFunction("lorm ipsum dolor sit amet") // TODO change this when available + + if (err != nil) { + return + } + + locationID := fmt.Sprintf(functionId.String(), ":", uuid.New().String()) + + err = s.Redis.HSet(*s.Context, locationID, &FunctionLocation{ + Address: res.Address, + Port: res.Port, + }).Err() + + return } +func (s *Scheduler) GetFunctionLocations(functionId uuid.UUID) (locations []FunctionLocation, err error) { + locationQuery := fmt.Sprintf(functionId.String(), ":*") + + IDs := s.Redis.Keys(*s.Context, locationQuery).Val() + + for _, ID := range IDs { + var location FunctionLocation + + if s.Redis.HGetAll(*s.Context, ID).Scan(&location) != nil { + return + } + + locations = append(locations, location) + } + + return +} \ No newline at end of file From 8c73b390bd7082e7471f6227083f3e9d9b1b14e8 Mon Sep 17 00:00:00 2001 From: WoodenMaiden Date: Mon, 22 Apr 2024 17:14:40 +0200 Subject: [PATCH 4/6] feat: use cursor instead of KEY --- scheduler/scheduler.go | 67 +++++++++++++++++++++++++++++++++++------- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7c4cb03..951e87e 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -1,6 +1,7 @@ package scheduler import ( + "errors" "context" "fmt" @@ -8,9 +9,20 @@ import ( "github.com/redis/go-redis/v9" ) +type StatusCode int + +const ( + Creating StatusCode = iota + Ready + Running + Unknown +) + type FunctionLocation struct { - Address string `redis:"address"` - Port uint16 `redis:"port"` + Address string `redis:"address"` + Port uint16 `redis:"port"` + Status StatusCode `redis:"status"` + LastUsed string `redis:"lastUsed"` } @@ -20,9 +32,35 @@ type Scheduler struct { Lambdo *LambdoService } +// Uses SCAN to look for the first instance marked as ready (https://redis.io/docs/latest/commands/scan/) +func (s *Scheduler) LookForReadyInstance(functionId uuid.UUID, cursor uint64) (id string, returnedCursor uint64, err error) { + locationMatch := fmt.Sprintf(functionId.String(), ":*") + + keys, returnedCursor, err := s.Redis.Scan(*s.Context, cursor, locationMatch, 10).Result() + + if err != nil { + return + } + + for _, id := range keys { + code, err := s.Redis.HGet(*s.Context, id, "status").Int(); + + if err != nil && StatusCode(code) == Ready { + // we found a ready instance so we return it + return id, 0, nil + } + } + + if returnedCursor != 0 { + // the current sweep did not find anything, we try again + return s.LookForReadyInstance(functionId, returnedCursor) + } + + return "", 0, errors.New("could not find an available function") // we did not find anything thus, id is empty +} func (s *Scheduler) SpawnVM(functionId uuid.UUID) (LambdoRunResponse, err error) { - res, err := s.Lambdo.RunFunction("lorm ipsum dolor sit amet") // TODO change this when available + res, err := s.Lambdo.RunFunction("lorem ipsum dolor sit amet") // TODO change this when available if (err != nil) { return @@ -33,24 +71,31 @@ func (s *Scheduler) SpawnVM(functionId uuid.UUID) (LambdoRunResponse, err error) err = s.Redis.HSet(*s.Context, locationID, &FunctionLocation{ Address: res.Address, Port: res.Port, + Status: Creating, + LastUsed: "never", }).Err() return } -func (s *Scheduler) GetFunctionLocations(functionId uuid.UUID) (locations []FunctionLocation, err error) { +func (s *Scheduler) GetFunctionLocations(functionId uuid.UUID) (locations []FunctionLocation) { locationQuery := fmt.Sprintf(functionId.String(), ":*") - IDs := s.Redis.Keys(*s.Context, locationQuery).Val() - for _, ID := range IDs { - var location FunctionLocation + // because while does not exists in these lands + for ok, cursor := true, uint64(0) ; ok; ok = (cursor != 0) { + var keys []string + keys, cursor = s.Redis.Scan(*s.Context, cursor, locationQuery, 10).Val() - if s.Redis.HGetAll(*s.Context, ID).Scan(&location) != nil { - return - } + for _, ID := range keys { + var location FunctionLocation + + if s.Redis.HGetAll(*s.Context, ID).Scan(&location) != nil { + return + } - locations = append(locations, location) + locations = append(locations, location) + } } return From 4854deae5d0738445767efa9ffed3b478db705b9 Mon Sep 17 00:00:00 2001 From: WoodenMaiden Date: Thu, 25 Apr 2024 14:25:27 +0200 Subject: [PATCH 5/6] feat: change calls according to new definitions of lambdo --- go.mod | 3 +- scheduler/lambdoService.go | 60 +++++++++++++++++++------------------- scheduler/scheduler.go | 4 +-- 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index d056a68..4a06a8b 100644 --- a/go.mod +++ b/go.mod @@ -12,10 +12,9 @@ require ( ) require ( - github.com/dustin/go-humanize v1.0.1 // indirect - github.com/caarlos0/env/v10 v10.0.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect github.com/jackc/pgx/v5 v5.5.5 // indirect diff --git a/scheduler/lambdoService.go b/scheduler/lambdoService.go index fd64b97..4b89173 100644 --- a/scheduler/lambdoService.go +++ b/scheduler/lambdoService.go @@ -1,6 +1,10 @@ package scheduler import ( + "fmt" + + "github.com/google/uuid" + "bytes" "encoding/json" "io" @@ -8,30 +12,21 @@ import ( ) type LambdoService struct { - URL string + URL string + MinioURL string } -type LambdoCodeFileItem struct { - Filename string `json:"filename"` - Content string `json:"content"` +type LambdoSpawnRequest struct { + RootfsURL string `json:"rootfs"` } -type LambdoRunRequest struct { - Language string `json:"language"` - Version string `json:"version"` - Input string `json:"input"` - Code []LambdoCodeFileItem `json:"code"` +type LambdoSpawnResponse struct { + ID string `json:"ID"` + Port uint16 `json:"port"` + Address string `json:"address"` } -type LambdoRunResponse struct { - Status int `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - Port uint16 `json:"port"` //// - Address string `json:"address"` //// Should be added by Simon -} - -func (service *LambdoService) RunFunction(code string) (data LambdoRunResponse, err error) { +func (service *LambdoService) SpawnVM(function_id uuid.UUID) (data LambdoSpawnResponse, err error) { var res *http.Response defer func() { if res != nil { @@ -39,23 +34,19 @@ func (service *LambdoService) RunFunction(code string) (data LambdoRunResponse, } }() - body, err := json.Marshal(&LambdoRunRequest{ - Language: "NODE", // TODO - Version: "1.0.0", - Input: "", - Code: []LambdoCodeFileItem{ - { - Filename: "index.js", - Content: code, - }, - }, + body, err := json.Marshal(&LambdoSpawnRequest{ + RootfsURL: fmt.Sprintf("%s/%s", service.MinioURL, function_id), }) if err != nil { return } - res, err = http.Post(service.URL, "application/json", bytes.NewReader(body)) + res, err = http.Post( + fmt.Sprintf(service.URL, "/spawn"), + "application/json", + bytes.NewReader(body), + ) if err != nil { return @@ -67,8 +58,17 @@ func (service *LambdoService) RunFunction(code string) (data LambdoRunResponse, return } - data = LambdoRunResponse{} err = json.Unmarshal(bytes, &data) + return +} + +func (service *LambdoService) DeleteVM(VMID string) (err error) { + req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf(service.URL, "/destroy/", VMID ), nil) + if err != nil { + return + } + _, err = http.DefaultClient.Do(req) + return } \ No newline at end of file diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 951e87e..634758e 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -60,13 +60,13 @@ func (s *Scheduler) LookForReadyInstance(functionId uuid.UUID, cursor uint64) (i } func (s *Scheduler) SpawnVM(functionId uuid.UUID) (LambdoRunResponse, err error) { - res, err := s.Lambdo.RunFunction("lorem ipsum dolor sit amet") // TODO change this when available + res, err := s.Lambdo.SpawnVM(functionId) if (err != nil) { return } - locationID := fmt.Sprintf(functionId.String(), ":", uuid.New().String()) + locationID := fmt.Sprintf(functionId.String(), ":", res.ID) err = s.Redis.HSet(*s.Context, locationID, &FunctionLocation{ Address: res.Address, From 3977be1df66a2f8be70ddf1915d775d12838cde9 Mon Sep 17 00:00:00 2001 From: WoodenMaiden Date: Sun, 5 May 2024 18:45:08 +0200 Subject: [PATCH 6/6] feat: run instance pruning every 6h --- main.go | 24 ++++++++++++-------- scheduler/scheduler.go | 50 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/main.go b/main.go index 4e90aba..9f89999 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,13 @@ package main import ( - "log" "context" + "log" + "time" "github.com/do4-2022/grobuzin/database" "github.com/do4-2022/grobuzin/routes" - // "github.com/do4-2022/grobuzin/scheduler" + "github.com/do4-2022/grobuzin/scheduler" "github.com/caarlos0/env/v10" @@ -34,21 +35,26 @@ func main() { } ctx := context.Background() - - /* redis := database.InitRedis(cfg.VMStateURL) s := &scheduler.Scheduler{ - RedisHandle: redis, + Redis: redis, Context: &ctx, - }*/ + } //Now inject the scheduler into the routes that need it! + go func() { + // every 24 hours we check for + for { + time.Sleep(time.Hour * 6); + + log.Println("Ran instance pruning at", time.Now().UTC()) + s.FindAndDestroyUnsused(24); + } + }() + db := database.Init(cfg.FuntionStateStorageDSN) r := routes.GetRoutes(db, cfg.JWTSecret, getMinioClient(cfg)) - redis := database.InitRedis(cfg.VMStateURL) - redis.Ping(ctx) // TODO define scheduler struct - err := r.Run() diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 634758e..c41bb0c 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -1,9 +1,10 @@ package scheduler import ( - "errors" "context" + "errors" "fmt" + "time" "github.com/google/uuid" "github.com/redis/go-redis/v9" @@ -18,6 +19,9 @@ const ( Unknown ) +// This struct represents an instance of a function, especially it's address and status +// Each key in the redis is namespaced as it follows: +// : type FunctionLocation struct { Address string `redis:"address"` Port uint16 `redis:"port"` @@ -34,6 +38,7 @@ type Scheduler struct { // Uses SCAN to look for the first instance marked as ready (https://redis.io/docs/latest/commands/scan/) func (s *Scheduler) LookForReadyInstance(functionId uuid.UUID, cursor uint64) (id string, returnedCursor uint64, err error) { + // match rule to get all instances of a function locationMatch := fmt.Sprintf(functionId.String(), ":*") keys, returnedCursor, err := s.Redis.Scan(*s.Context, cursor, locationMatch, 10).Result() @@ -80,13 +85,16 @@ func (s *Scheduler) SpawnVM(functionId uuid.UUID) (LambdoRunResponse, err error) func (s *Scheduler) GetFunctionLocations(functionId uuid.UUID) (locations []FunctionLocation) { locationQuery := fmt.Sprintf(functionId.String(), ":*") + firstsweep, cursor := true, uint64(0) + + // because do-while does not exists in these lands + for !firstsweep && cursor != 0 { + firstsweep = false - - // because while does not exists in these lands - for ok, cursor := true, uint64(0) ; ok; ok = (cursor != 0) { var keys []string keys, cursor = s.Redis.Scan(*s.Context, cursor, locationQuery, 10).Val() + // for each key we got, we retrieve all of it's information for _, ID := range keys { var location FunctionLocation @@ -99,4 +107,36 @@ func (s *Scheduler) GetFunctionLocations(functionId uuid.UUID) (locations []Func } return -} \ No newline at end of file +} + +// goes through the whole redis instance and remove that have not been used within the last {hoursTimeout} Hours +func (s *Scheduler) FindAndDestroyUnsused(hoursTimeout float64) { + now := time.Now() + keys, cursor := s.Redis.Scan(*s.Context, 0, "*", 10).Val() + + for cursor != 0 { + for _, ID := range keys { + val, err := s.Redis.HGet(*s.Context, ID, "lastUsed").Result() + + if err != nil { + continue + } + + // if it was never used we delete this + if val == "never" { + if s.Lambdo.DeleteVM(ID) != nil { + s.Redis.Del(*s.Context, ID) + } + } else { + // if it hasn't been used for {hoursTimeout} we delete it + lastUsed, err := time.Parse(time.UnixDate, val) + if err != nil || now.Sub(lastUsed).Abs().Hours() >= hoursTimeout { + if s.Lambdo.DeleteVM(ID) != nil { + s.Redis.Del(*s.Context, ID) + } + } + } + } + keys, cursor = s.Redis.Scan(*s.Context, cursor, "*", 10).Val() + } +}