From 92030e512bc4c4a1288f8b23785cb56eb9439139 Mon Sep 17 00:00:00 2001 From: Swarvanu Sengupta Date: Fri, 28 Jun 2019 13:26:27 +0900 Subject: [PATCH] Incorporate chnages based on new state store Signed-off-by: Swarvanu Sengupta --- etcd.go | 94 ++++++++++++++++++++------------------------------------- 1 file changed, 32 insertions(+), 62 deletions(-) diff --git a/etcd.go b/etcd.go index e114149..5c863c3 100644 --- a/etcd.go +++ b/etcd.go @@ -1,12 +1,10 @@ package EtcdStateStore import ( - "fmt" - etcd "go.etcd.io/etcd/client" - "context" + "fmt" faasflow "github.com/s8sg/faas-flow" - "strconv" + etcd "go.etcd.io/etcd/client" ) type EtcdStateStore struct { @@ -45,81 +43,53 @@ func (etcdStore *EtcdStateStore) Init() error { return nil } -// Create -func (etcdStore *EtcdStateStore) Create(vertexs []string) error { +// Update Compare and Update a valuer +func (etcdStore *EtcdStateStore) Update(key string, oldValue string, newValue string) error { + key = fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, key) - for _, vertex := range vertexs { - key := fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, vertex) - _, err := etcdStore.kv.Set(context.Background(), key, "0", nil) - if err != nil { - return fmt.Errorf("failed to create vertex %s, error %v", vertex, err) - } + resp, err := etcdStore.kv.Get(context.Background(), key, nil) + if err != nil { + return fmt.Errorf("failed to get key %s, error %v", key, err) } - return nil -} + if resp == nil { + return fmt.Errorf("failed to get key %s, returned nil", key) + } + modifyIndex := resp.Node.ModifiedIndex -// IncrementCounter -func (etcdStore *EtcdStateStore) IncrementCounter(vertex string) (int, error) { - var serr error - count := 0 - key := fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, vertex) - for i := 0; i < etcdStore.RetryCount; i++ { - resp, err := etcdStore.kv.Get(context.Background(), key, nil) - if err != nil { - return 0, fmt.Errorf("failed to get vertex %s, error %v", vertex, err) - } - if resp == nil { - return 0, fmt.Errorf("failed to get vertex %s", vertex) - } - modifyIndex := resp.Node.ModifiedIndex - - counter, err := strconv.Atoi(resp.Node.Value) - if err != nil { - return 0, fmt.Errorf("failed to update counter for %s, error %v", vertex, err) - } - - count = counter + 1 - counterStr := fmt.Sprintf("%d", count) - - _, err = etcdStore.kv.Set(context.Background(), key, counterStr, &etcd.SetOptions{PrevIndex: modifyIndex}) - if err == nil { - return count, nil - } - serr = err + if resp.Node.Value != oldValue { + return fmt.Errorf("Old value doesn't match for key %s", key, err) } - return 0, fmt.Errorf("failed to update counter after max retry for %s, error %v", vertex, serr) -} -// SetState set state of pipeline -func (etcdStore *EtcdStateStore) SetState(state bool) error { - key := fmt.Sprintf("%s/state", etcdStore.etcdKeyPath) - stateStr := "false" - if state { - stateStr = "true" + _, err = etcdStore.kv.Set(context.Background(), key, newValue, &etcd.SetOptions{PrevIndex: modifyIndex}) + if err != nil { + return fmt.Errorf("failed to update value for key %s, error %v", key, err) } + return nil +} + +// Set Sets a value (override existing, or create one) +func (etcdStore *EtcdStateStore) Set(key string, value string) error { + key = fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, key) - _, err := etcdStore.kv.Set(context.Background(), key, stateStr, nil) + _, err := etcdStore.kv.Set(context.Background(), key, value, nil) if err != nil { - return fmt.Errorf("failed to set state, error %v", err) + return fmt.Errorf("failed to set value for key %s, error %v", key, err) } return nil } -// GetState set state of the pipeline -func (etcdStore *EtcdStateStore) GetState() (bool, error) { - key := fmt.Sprintf("%s/state", etcdStore.etcdKeyPath) +// Get Gets a value +func (etcdStore *EtcdStateStore) Get(key string) (string, error) { + key = fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, key) + resp, err := etcdStore.kv.Get(context.Background(), key, nil) if err != nil { - return false, fmt.Errorf("failed to get state, error %v", err) + return "", fmt.Errorf("failed to get key %s, error %v", key, err) } if resp == nil { - return false, fmt.Errorf("failed to get state") - } - state := false - if resp.Node.Value == "true" { - state = true + return "", fmt.Errorf("failed to get key %s, returned nil", key) } - return state, nil + return resp.Node.Value, nil } // Cleanup (Called only once in a request)