Skip to content

Commit

Permalink
Incorporate chnages based on new state store
Browse files Browse the repository at this point in the history
Signed-off-by: Swarvanu Sengupta <[email protected]>
  • Loading branch information
avcoat committed Jun 28, 2019
1 parent 3eef687 commit 92030e5
Showing 1 changed file with 32 additions and 62 deletions.
94 changes: 32 additions & 62 deletions etcd.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package EtcdStateStore

import (
"fmt"
etcd "go.etcd.io/etcd/client"

"context"
"fmt"
faasflow "github.com/s8sg/faas-flow"
"strconv"
etcd "go.etcd.io/etcd/client"
)

type EtcdStateStore struct {
Expand Down Expand Up @@ -45,81 +43,53 @@ func (etcdStore *EtcdStateStore) Init() error {
return nil
}

// Create
func (etcdStore *EtcdStateStore) Create(vertexs []string) error {
// Update Compare and Update a valuer
func (etcdStore *EtcdStateStore) Update(key string, oldValue string, newValue string) error {
key = fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, key)

for _, vertex := range vertexs {
key := fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, vertex)
_, err := etcdStore.kv.Set(context.Background(), key, "0", nil)
if err != nil {
return fmt.Errorf("failed to create vertex %s, error %v", vertex, err)
}
resp, err := etcdStore.kv.Get(context.Background(), key, nil)
if err != nil {
return fmt.Errorf("failed to get key %s, error %v", key, err)
}
return nil
}
if resp == nil {
return fmt.Errorf("failed to get key %s, returned nil", key)
}
modifyIndex := resp.Node.ModifiedIndex

// IncrementCounter
func (etcdStore *EtcdStateStore) IncrementCounter(vertex string) (int, error) {
var serr error
count := 0
key := fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, vertex)
for i := 0; i < etcdStore.RetryCount; i++ {
resp, err := etcdStore.kv.Get(context.Background(), key, nil)
if err != nil {
return 0, fmt.Errorf("failed to get vertex %s, error %v", vertex, err)
}
if resp == nil {
return 0, fmt.Errorf("failed to get vertex %s", vertex)
}
modifyIndex := resp.Node.ModifiedIndex

counter, err := strconv.Atoi(resp.Node.Value)
if err != nil {
return 0, fmt.Errorf("failed to update counter for %s, error %v", vertex, err)
}

count = counter + 1
counterStr := fmt.Sprintf("%d", count)

_, err = etcdStore.kv.Set(context.Background(), key, counterStr, &etcd.SetOptions{PrevIndex: modifyIndex})
if err == nil {
return count, nil
}
serr = err
if resp.Node.Value != oldValue {
return fmt.Errorf("Old value doesn't match for key %s", key, err)
}
return 0, fmt.Errorf("failed to update counter after max retry for %s, error %v", vertex, serr)
}

// SetState set state of pipeline
func (etcdStore *EtcdStateStore) SetState(state bool) error {
key := fmt.Sprintf("%s/state", etcdStore.etcdKeyPath)
stateStr := "false"
if state {
stateStr = "true"
_, err = etcdStore.kv.Set(context.Background(), key, newValue, &etcd.SetOptions{PrevIndex: modifyIndex})
if err != nil {
return fmt.Errorf("failed to update value for key %s, error %v", key, err)
}
return nil
}

// Set Sets a value (override existing, or create one)
func (etcdStore *EtcdStateStore) Set(key string, value string) error {
key = fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, key)

_, err := etcdStore.kv.Set(context.Background(), key, stateStr, nil)
_, err := etcdStore.kv.Set(context.Background(), key, value, nil)
if err != nil {
return fmt.Errorf("failed to set state, error %v", err)
return fmt.Errorf("failed to set value for key %s, error %v", key, err)
}
return nil
}

// GetState set state of the pipeline
func (etcdStore *EtcdStateStore) GetState() (bool, error) {
key := fmt.Sprintf("%s/state", etcdStore.etcdKeyPath)
// Get Gets a value
func (etcdStore *EtcdStateStore) Get(key string) (string, error) {
key = fmt.Sprintf("%s/%s", etcdStore.etcdKeyPath, key)

resp, err := etcdStore.kv.Get(context.Background(), key, nil)
if err != nil {
return false, fmt.Errorf("failed to get state, error %v", err)
return "", fmt.Errorf("failed to get key %s, error %v", key, err)
}
if resp == nil {
return false, fmt.Errorf("failed to get state")
}
state := false
if resp.Node.Value == "true" {
state = true
return "", fmt.Errorf("failed to get key %s, returned nil", key)
}
return state, nil
return resp.Node.Value, nil
}

// Cleanup (Called only once in a request)
Expand Down

0 comments on commit 92030e5

Please sign in to comment.