Skip to content

Commit

Permalink
fix(rpc): use google.golang.org/grpc.NewClient instead of DialContext
Browse files Browse the repository at this point in the history
Replace grpc.DialContext with grpc.NewClient in the rpc package to align with
updated gRPC practices. The NewClient method creates a gRPC channel without
performing I/O operations immediately, improving the connection management
logic.

- Update NewConn to use grpc.NewClient instead of grpc.DialContext.
- Add detailed comments to explain the behavior of NewClient.
- Modify TestManager to ensure each test case has an independent Manager
  instance.
- Adjust UnreachableServer test case to reflect non-blocking behavior of
  NewClient.

This change ensures better resource management and aligns with the latest gRPC
client connection practices.
  • Loading branch information
ijsong committed Jun 10, 2024
1 parent e28a257 commit 851f1c9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 53 deletions.
6 changes: 5 additions & 1 deletion internal/storagenode/client/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ func TestManager_UnreachableServer(t *testing.T) {
),
)
assert.NoError(t, err)
t.Cleanup(func() {
err = mgr.Close()
assert.NoError(t, err)
})

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
_, err = mgr.GetOrConnect(ctx, 1, "127.0.0.1:0")
assert.Error(t, err)
assert.NoError(t, err)
}

func TestManager_Client(t *testing.T) {
Expand Down
103 changes: 61 additions & 42 deletions pkg/rpc/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"

"github.com/kakao/varlog/pkg/types"
)
Expand All @@ -36,49 +35,69 @@ func testNewServer(t *testing.T) (addr string, closer func()) {
}

func TestManager(t *testing.T) {
mgr, err := NewManager[types.StorageNodeID]()
assert.NoError(t, err)

defer func() {
assert.NoError(t, mgr.Close())

// close closed manager
assert.NoError(t, mgr.Close())
}()

addr1, closer1 := testNewServer(t)
defer closer1()

// new
conn1, err := mgr.GetOrConnect(context.Background(), 1, addr1)
assert.NoError(t, err)

// cached
conn2, err := mgr.GetOrConnect(context.Background(), 1, addr1)
assert.NoError(t, err)
assert.Equal(t, conn1, conn2)
tcs := []struct {
name string
testf func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string)
}{
{
name: "NewConn",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
_, err := mgr.GetOrConnect(context.Background(), 1, serverAddr)
require.NoError(t, err)
},
},
{
name: "GetConn",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
conn1, err := mgr.GetOrConnect(context.Background(), 1, serverAddr)
require.NoError(t, err)

conn2, err := mgr.GetOrConnect(context.Background(), 1, serverAddr)
require.NoError(t, err)

require.Equal(t, conn1, conn2)
},
},
{
name: "UnexpectedAddr",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
_, err := mgr.GetOrConnect(context.Background(), 1, serverAddr)
require.NoError(t, err)

_, err = mgr.GetOrConnect(context.Background(), 1, serverAddr+"0")
require.Error(t, err)
},
},
{
name: "UnreachableServer",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
_, err := mgr.GetOrConnect(context.Background(), 1, "bad-address")
require.NoError(t, err)
},
},
{
name: "CloseUnknownID",
testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) {
err := mgr.CloseClient(2)
require.NoError(t, err)
},
},
}

// unexpected addr1
_, err = mgr.GetOrConnect(context.Background(), 1, addr1+"0")
assert.Error(t, err)
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
mgr, err := NewManager[types.StorageNodeID]()
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, mgr.Close())
})

// failed connection
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
_, err = mgr.GetOrConnect(ctx, 2, "bad-address", grpc.WithBlock())
assert.Error(t, err)

// close unknown id
err = mgr.CloseClient(2)
assert.NoError(t, err)
addr1, closer1 := testNewServer(t)
t.Cleanup(closer1)

addr2, closer2 := testNewServer(t)
defer closer2()
_, err = mgr.GetOrConnect(context.Background(), 2, addr2)
assert.NoError(t, err)

err = mgr.CloseClient(2)
assert.NoError(t, err)
tc.testf(t, mgr, addr1)
})
}
}

func TestManagerBadConfig(t *testing.T) {
Expand Down
23 changes: 15 additions & 8 deletions pkg/rpc/rpc_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,41 @@ package rpc

import (
"context"
"slices"
"sync"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var (
defaultDialOption = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
)
var defaultDialOption = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}

type Conn struct {
Conn *grpc.ClientConn
once sync.Once
}

// NewConn creates a new gRPC connection to the specified address using
// google.golang.org/grpc.NewClient. Note that this method does not perform any
// I/O, meaning the connection is not immediately established. If the server is
// unavailable, this method will not return an error. The connection is
// established automatically when an RPC is made using the returned Conn.
// DialOptions such as WithBlock, WithTimeout, and WithReturnConnectionError
// are not considered.
// Ensure to close the returned Conn after usage to avoid resource leaks.
func NewConn(ctx context.Context, address string, opts ...grpc.DialOption) (*Conn, error) {
conn, err := grpc.DialContext(ctx, address, append(defaultDialOption, opts...)...)
conn, err := grpc.NewClient(address, slices.Concat(defaultDialOption, opts)...)
if err != nil {
return nil, errors.Wrapf(err, "rpc: %s", address)
}
return &Conn{Conn: conn}, nil
}

func NewBlockingConn(ctx context.Context, address string) (*Conn, error) {
return NewConn(ctx, address, grpc.WithBlock(), grpc.WithReturnConnectionError())
}

// Close terminates the gRPC connection. This method ensures that the
// connection is closed only once, even if called multiple times.
// It is important to call Close after the connection is no longer needed to
// release resources properly.
func (c *Conn) Close() (err error) {
c.once.Do(func() {
if c.Conn != nil {
Expand Down
3 changes: 1 addition & 2 deletions tests/it/mrconnector/mr_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestMRConnector(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()
// TODO (jun): Use NewConn
conn, err := rpc.NewBlockingConn(ctx, ep)
conn, err := rpc.NewConn(ctx, ep)
if err == nil {
So(conn.Close(), ShouldBeNil)
}
Expand Down Expand Up @@ -181,7 +181,6 @@ func TestMRConnector(t *testing.T) {
}
}
return true

}), ShouldBeTrue)
}

Expand Down

0 comments on commit 851f1c9

Please sign in to comment.