Skip to content

Commit

Permalink
Merge pull request #794 from kakao/grpc_new_client
Browse files Browse the repository at this point in the history
fix(rpc): use google.golang.org/grpc.NewClient instead of DialContext
  • Loading branch information
ijsong authored Jun 10, 2024
2 parents e28a257 + 851f1c9 commit ea59039
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 53 deletions.
6 changes: 5 additions & 1 deletion internal/storagenode/client/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ func TestManager_UnreachableServer(t *testing.T) {
),
)
assert.NoError(t, err)
t.Cleanup(func() {
err = mgr.Close()
assert.NoError(t, err)
})

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
_, err = mgr.GetOrConnect(ctx, 1, "127.0.0.1:0")
assert.Error(t, err)
assert.NoError(t, err)
}

func TestManager_Client(t *testing.T) {
Expand Down
103 changes: 61 additions & 42 deletions pkg/rpc/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"

"github.com/kakao/varlog/pkg/types"
)
Expand All @@ -36,49 +35,69 @@ func testNewServer(t *testing.T) (addr string, closer func()) {
}

func TestManager(t *testing.T) {
mgr, err := NewManager[types.StorageNodeID]()
assert.NoError(t, err)

defer func() {
assert.NoError(t, mgr.Close())

// close closed manager
assert.NoError(t, mgr.Close())
}()

addr1, closer1 := testNewServer(t)
defer closer1()

// new
conn1, err := mgr.GetOrConnect(context.Background(), 1, addr1)
assert.NoError(t, err)

// cached
conn2, err := mgr.GetOrConnect(context.Background(), 1, addr1)
assert.NoError(t, err)
assert.Equal(t, conn1, conn2)
tcs := []struct {
name string
testf func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string)
}{
{
name: "NewConn",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
_, err := mgr.GetOrConnect(context.Background(), 1, serverAddr)
require.NoError(t, err)
},
},
{
name: "GetConn",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
conn1, err := mgr.GetOrConnect(context.Background(), 1, serverAddr)
require.NoError(t, err)

conn2, err := mgr.GetOrConnect(context.Background(), 1, serverAddr)
require.NoError(t, err)

require.Equal(t, conn1, conn2)
},
},
{
name: "UnexpectedAddr",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
_, err := mgr.GetOrConnect(context.Background(), 1, serverAddr)
require.NoError(t, err)

_, err = mgr.GetOrConnect(context.Background(), 1, serverAddr+"0")
require.Error(t, err)
},
},
{
name: "UnreachableServer",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
_, err := mgr.GetOrConnect(context.Background(), 1, "bad-address")
require.NoError(t, err)
},
},
{
name: "CloseUnknownID",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
err := mgr.CloseClient(2)
require.NoError(t, err)
},
},
}

// unexpected addr1
_, err = mgr.GetOrConnect(context.Background(), 1, addr1+"0")
assert.Error(t, err)
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
mgr, err := NewManager[types.StorageNodeID]()
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, mgr.Close())
})

// failed connection
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
_, err = mgr.GetOrConnect(ctx, 2, "bad-address", grpc.WithBlock())
assert.Error(t, err)

// close unknown id
err = mgr.CloseClient(2)
assert.NoError(t, err)
addr1, closer1 := testNewServer(t)
t.Cleanup(closer1)

addr2, closer2 := testNewServer(t)
defer closer2()
_, err = mgr.GetOrConnect(context.Background(), 2, addr2)
assert.NoError(t, err)

err = mgr.CloseClient(2)
assert.NoError(t, err)
tc.testf(t, mgr, addr1)
})
}
}

func TestManagerBadConfig(t *testing.T) {
Expand Down
23 changes: 15 additions & 8 deletions pkg/rpc/rpc_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,41 @@ package rpc

import (
"context"
"slices"
"sync"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var (
defaultDialOption = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
)
var defaultDialOption = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}

type Conn struct {
Conn *grpc.ClientConn
once sync.Once
}

// NewConn creates a new gRPC connection to the specified address using
// google.golang.org/grpc.NewClient. Note that this method does not perform any
// I/O, meaning the connection is not immediately established. If the server is
// unavailable, this method will not return an error. The connection is
// established automatically when an RPC is made using the returned Conn.
// DialOptions such as WithBlock, WithTimeout, and WithReturnConnectionError
// are not considered.
// Ensure to close the returned Conn after usage to avoid resource leaks.
func NewConn(ctx context.Context, address string, opts ...grpc.DialOption) (*Conn, error) {
conn, err := grpc.DialContext(ctx, address, append(defaultDialOption, opts...)...)
conn, err := grpc.NewClient(address, slices.Concat(defaultDialOption, opts)...)
if err != nil {
return nil, errors.Wrapf(err, "rpc: %s", address)
}
return &Conn{Conn: conn}, nil
}

func NewBlockingConn(ctx context.Context, address string) (*Conn, error) {
return NewConn(ctx, address, grpc.WithBlock(), grpc.WithReturnConnectionError())
}

// Close terminates the gRPC connection. This method ensures that the
// connection is closed only once, even if called multiple times.
// It is important to call Close after the connection is no longer needed to
// release resources properly.
func (c *Conn) Close() (err error) {
c.once.Do(func() {
if c.Conn != nil {
Expand Down
3 changes: 1 addition & 2 deletions tests/it/mrconnector/mr_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestMRConnector(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()
// TODO (jun): Use NewConn
conn, err := rpc.NewBlockingConn(ctx, ep)
conn, err := rpc.NewConn(ctx, ep)
if err == nil {
So(conn.Close(), ShouldBeNil)
}
Expand Down Expand Up @@ -181,7 +181,6 @@ func TestMRConnector(t *testing.T) {
}
}
return true

}), ShouldBeTrue)
}

Expand Down

0 comments on commit ea59039

Please sign in to comment.