Skip to content

Commit

Permalink
update redis wrapper (oauth2-proxy#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
rinfx authored Mar 18, 2024
1 parent ed55b65 commit 25c2f6e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 36 deletions.
2 changes: 1 addition & 1 deletion plugins/wasm-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/google/uuid v1.3.0
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240226064518-b3dc4646a35a
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240318034951-d5306e367c43
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.3
github.com/tidwall/resp v0.1.1
Expand Down
2 changes: 2 additions & 0 deletions plugins/wasm-go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbG
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240226064518-b3dc4646a35a h1:luYRvxLTE1xYxrXYj7nmjd1U0HHh8pUPiKfdZ0MhCGE=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240226064518-b3dc4646a35a/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240318034951-d5306e367c43 h1:dCw7F/9ciw4NZN7w68wQRaygZ2zGOWMTIEoRvP1tlWs=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240318034951-d5306e367c43/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo=
github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo=
github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
81 changes: 46 additions & 35 deletions plugins/wasm-go/pkg/wrapper/redis_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ package wrapper

import (
"bytes"
"encoding/base64"
"fmt"
"io"

"github.com/google/uuid"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/tidwall/resp"
)

type RedisResponseCallback func(status int, response resp.Value)
type RedisResponseCallback func(response resp.Value)

type RedisClient interface {
Init(username, password string, timeout int64) error
// with this function, you can call redis as if you are using redis-cli
Command(cmds []interface{}, callback RedisResponseCallback) error
// BatchCommands(cmds [][]interface{}, callback func(status int, response []resp.Value)) error
Eval(script string, params []interface{}, callback RedisResponseCallback) error
Eval(script string, numkeys int, keys, args []interface{}, callback RedisResponseCallback) error

// Key
Del(key string, callback RedisResponseCallback) error
Expand Down Expand Up @@ -112,42 +114,53 @@ func RedisInit(cluster Cluster, username, password string, timeout uint32) error
return proxywasm.RedisInit(cluster.ClusterName(), username, password, timeout)
}

func RedisCall(cluster Cluster, respQuery string, callback RedisResponseCallback) error {
func RedisCall(cluster Cluster, respQuery []byte, callback RedisResponseCallback) error {
requestID := uuid.New().String()
_, err := proxywasm.DispatchRedisCall(
cluster.ClusterName(),
respQuery,
func(status, responseSize int) {
// proxywasm.LogCriticalf("[rinfx log] responseSize is: %d", responseSize)
func(status int, responseSize int) {
response, err := proxywasm.GetRedisCallResponse(0, responseSize)
if err != nil {
proxywasm.LogCriticalf("failed to get redis response body: %v", err)
var responseValue resp.Value
if status != 0 {
proxywasm.LogCriticalf("Error occured while calling redis, it seems cannot connect to the redis cluster. request-id: %s", requestID)
responseValue = resp.ErrorValue(fmt.Errorf("cannot connect to redis cluster"))
} else {
if err != nil {
proxywasm.LogCriticalf("failed to get redis response body, request-id: %s, error: %v", requestID, err)
responseValue = resp.ErrorValue(fmt.Errorf("cannot get redis response"))
} else {
rd := resp.NewReader(bytes.NewReader(response))
value, _, err := rd.ReadValue()
if err != nil && err != io.EOF {
proxywasm.LogCriticalf("failed to read redis response body, request-id: %s, error: %v", requestID, err)
responseValue = resp.ErrorValue(fmt.Errorf("cannot read redis response"))
} else {
responseValue = value
proxywasm.LogDebugf("redis call end, request-id: %s, respQuery: %s, respValue: %s",
requestID, base64.StdEncoding.EncodeToString([]byte(respQuery)), base64.StdEncoding.EncodeToString(response))
}
}
}
rd := resp.NewReader(bytes.NewReader(response))
v, _, err := rd.ReadValue()
if err != nil && err != io.EOF {
proxywasm.LogCriticalf("failed to read redis response body: %v", err)
}
// log.Infof("value: %s", v.String())
// callback(status, v.String())
callback(status, v)
callback(responseValue)
})
if err != nil {
proxywasm.LogCriticalf("redis call failed: %v", err)
proxywasm.LogCriticalf("redis call failed, request-id: %s, error: %v", requestID, err)
} else {
proxywasm.LogDebugf("redis call start, request-id: %s, respQuery: %s", requestID, base64.StdEncoding.EncodeToString([]byte(respQuery)))
}
return err
}

func respString(args []interface{}) string {
func respString(args []interface{}) []byte {
var buf bytes.Buffer
wr := resp.NewWriter(&buf)
arr := make([]resp.Value, 0)
for _, arg := range args {
// arr = append(arr, resp.AnyValue(arg))
arr = append(arr, resp.StringValue(fmt.Sprint(arg)))
}
wr.WriteArray(arr)
// proxywasm.LogCriticalf("respString:\n%s", buf.String())
return buf.String()
return buf.Bytes()
}

func (c RedisClusterClient[C]) Init(username, password string, timeout int64) error {
Expand All @@ -159,16 +172,17 @@ func (c RedisClusterClient[C]) Init(username, password string, timeout int64) er
}

func (c RedisClusterClient[C]) Command(cmds []interface{}, callback RedisResponseCallback) error {
RedisCall(c.cluster, respString(cmds), callback)
return nil
return RedisCall(c.cluster, respString(cmds), callback)
}

func (c RedisClusterClient[C]) Eval(script string, params []interface{}, callback RedisResponseCallback) error {
args := make([]interface{}, 0)
args = append(args, "eval")
args = append(args, script)
args = append(args, params...)
return RedisCall(c.cluster, respString(args), callback)
func (c RedisClusterClient[C]) Eval(script string, numkeys int, keys, args []interface{}, callback RedisResponseCallback) error {
params := make([]interface{}, 0)
params = append(params, "eval")
params = append(params, script)
params = append(params, numkeys)
params = append(params, keys...)
params = append(params, args...)
return RedisCall(c.cluster, respString(params), callback)
}

// Key
Expand Down Expand Up @@ -214,8 +228,7 @@ func (c RedisClusterClient[C]) Set(key string, value interface{}, callback Redis
args = append(args, "set")
args = append(args, key)
args = append(args, value)
RedisCall(c.cluster, respString(args), callback)
return nil
return RedisCall(c.cluster, respString(args), callback)
}

func (c RedisClusterClient[C]) SetEx(key string, value interface{}, ttl int, callback RedisResponseCallback) error {
Expand All @@ -224,8 +237,7 @@ func (c RedisClusterClient[C]) SetEx(key string, value interface{}, ttl int, cal
args = append(args, key)
args = append(args, ttl)
args = append(args, value)
RedisCall(c.cluster, respString(args), callback)
return nil
return RedisCall(c.cluster, respString(args), callback)
}

func (c RedisClusterClient[C]) MGet(keys []string, callback RedisResponseCallback) error {
Expand All @@ -234,8 +246,7 @@ func (c RedisClusterClient[C]) MGet(keys []string, callback RedisResponseCallbac
for _, k := range keys {
args = append(args, k)
}
RedisCall(c.cluster, respString(args), callback)
return nil
return RedisCall(c.cluster, respString(args), callback)
}

func (c RedisClusterClient[C]) MSet(kvMap map[string]interface{}, callback RedisResponseCallback) error {
Expand Down

0 comments on commit 25c2f6e

Please sign in to comment.