From 851f1c9bcdd6e3d0e68612784090e67cb1dac918 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Sat, 25 May 2024 18:04:54 +0900 Subject: [PATCH] fix(rpc): use google.golang.org/grpc.NewClient instead of DialContext Replace grpc.DialContext with grpc.NewClient in the rpc package to align with updated gRPC practices. The NewClient method creates a gRPC channel without performing I/O operations immediately, improving the connection management logic. - Update NewConn to use grpc.NewClient instead of grpc.DialContext. - Add detailed comments to explain the behavior of NewClient. - Modify TestManager to ensure each test case has an independent Manager instance. - Adjust UnreachableServer test case to reflect non-blocking behavior of NewClient. This change ensures better resource management and aligns with the latest gRPC client connection practices. --- internal/storagenode/client/manager_test.go | 6 +- pkg/rpc/manager_test.go | 103 ++++++++++++-------- pkg/rpc/rpc_conn.go | 23 +++-- tests/it/mrconnector/mr_connector_test.go | 3 +- 4 files changed, 82 insertions(+), 53 deletions(-) diff --git a/internal/storagenode/client/manager_test.go b/internal/storagenode/client/manager_test.go index af0171b54..ce2717663 100644 --- a/internal/storagenode/client/manager_test.go +++ b/internal/storagenode/client/manager_test.go @@ -37,11 +37,15 @@ func TestManager_UnreachableServer(t *testing.T) { ), ) assert.NoError(t, err) + t.Cleanup(func() { + err = mgr.Close() + assert.NoError(t, err) + }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() _, err = mgr.GetOrConnect(ctx, 1, "127.0.0.1:0") - assert.Error(t, err) + assert.NoError(t, err) } func TestManager_Client(t *testing.T) { diff --git a/pkg/rpc/manager_test.go b/pkg/rpc/manager_test.go index 2ade0781e..e9808c131 100644 --- a/pkg/rpc/manager_test.go +++ b/pkg/rpc/manager_test.go @@ -5,11 +5,10 @@ import ( "net" "sync" "testing" - "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/goleak" - "google.golang.org/grpc" "github.com/kakao/varlog/pkg/types" ) @@ -36,49 +35,69 @@ func testNewServer(t *testing.T) (addr string, closer func()) { } func TestManager(t *testing.T) { - mgr, err := NewManager[types.StorageNodeID]() - assert.NoError(t, err) - - defer func() { - assert.NoError(t, mgr.Close()) - - // close closed manager - assert.NoError(t, mgr.Close()) - }() - - addr1, closer1 := testNewServer(t) - defer closer1() - - // new - conn1, err := mgr.GetOrConnect(context.Background(), 1, addr1) - assert.NoError(t, err) - - // cached - conn2, err := mgr.GetOrConnect(context.Background(), 1, addr1) - assert.NoError(t, err) - assert.Equal(t, conn1, conn2) + tcs := []struct { + name string + testf func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) + }{ + { + name: "NewConn", + testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) { + _, err := mgr.GetOrConnect(context.Background(), 1, serverAddr) + require.NoError(t, err) + }, + }, + { + name: "GetConn", + testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) { + conn1, err := mgr.GetOrConnect(context.Background(), 1, serverAddr) + require.NoError(t, err) + + conn2, err := mgr.GetOrConnect(context.Background(), 1, serverAddr) + require.NoError(t, err) + + require.Equal(t, conn1, conn2) + }, + }, + { + name: "UnexpectedAddr", + testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) { + _, err := mgr.GetOrConnect(context.Background(), 1, serverAddr) + require.NoError(t, err) + + _, err = mgr.GetOrConnect(context.Background(), 1, serverAddr+"0") + require.Error(t, err) + }, + }, + { + name: "UnreachableServer", + testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) { + _, err := mgr.GetOrConnect(context.Background(), 1, "bad-address") + require.NoError(t, err) + }, + }, + { + name: "CloseUnknownID", + testf: func(t *testing.T, mgr *Manager[types.StorageNodeID], serverAddr string) { + err := mgr.CloseClient(2) + require.NoError(t, err) + }, + }, + } - // unexpected addr1 - _, err = mgr.GetOrConnect(context.Background(), 1, addr1+"0") - assert.Error(t, err) + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + mgr, err := NewManager[types.StorageNodeID]() + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, mgr.Close()) + }) - // failed connection - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - defer cancel() - _, err = mgr.GetOrConnect(ctx, 2, "bad-address", grpc.WithBlock()) - assert.Error(t, err) - - // close unknown id - err = mgr.CloseClient(2) - assert.NoError(t, err) + addr1, closer1 := testNewServer(t) + t.Cleanup(closer1) - addr2, closer2 := testNewServer(t) - defer closer2() - _, err = mgr.GetOrConnect(context.Background(), 2, addr2) - assert.NoError(t, err) - - err = mgr.CloseClient(2) - assert.NoError(t, err) + tc.testf(t, mgr, addr1) + }) + } } func TestManagerBadConfig(t *testing.T) { diff --git a/pkg/rpc/rpc_conn.go b/pkg/rpc/rpc_conn.go index 59608b02b..5ae55add1 100644 --- a/pkg/rpc/rpc_conn.go +++ b/pkg/rpc/rpc_conn.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "slices" "sync" "github.com/pkg/errors" @@ -9,27 +10,33 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var ( - defaultDialOption = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} -) +var defaultDialOption = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} type Conn struct { Conn *grpc.ClientConn once sync.Once } +// NewConn creates a new gRPC connection to the specified address using +// google.golang.org/grpc.NewClient. Note that this method does not perform any +// I/O, meaning the connection is not immediately established. If the server is +// unavailable, this method will not return an error. The connection is +// established automatically when an RPC is made using the returned Conn. +// DialOptions such as WithBlock, WithTimeout, and WithReturnConnectionError +// are not considered. +// Ensure to close the returned Conn after usage to avoid resource leaks. func NewConn(ctx context.Context, address string, opts ...grpc.DialOption) (*Conn, error) { - conn, err := grpc.DialContext(ctx, address, append(defaultDialOption, opts...)...) + conn, err := grpc.NewClient(address, slices.Concat(defaultDialOption, opts)...) if err != nil { return nil, errors.Wrapf(err, "rpc: %s", address) } return &Conn{Conn: conn}, nil } -func NewBlockingConn(ctx context.Context, address string) (*Conn, error) { - return NewConn(ctx, address, grpc.WithBlock(), grpc.WithReturnConnectionError()) -} - +// Close terminates the gRPC connection. This method ensures that the +// connection is closed only once, even if called multiple times. +// It is important to call Close after the connection is no longer needed to +// release resources properly. func (c *Conn) Close() (err error) { c.once.Do(func() { if c.Conn != nil { diff --git a/tests/it/mrconnector/mr_connector_test.go b/tests/it/mrconnector/mr_connector_test.go index 50e497788..b30ac0ec6 100644 --- a/tests/it/mrconnector/mr_connector_test.go +++ b/tests/it/mrconnector/mr_connector_test.go @@ -111,7 +111,7 @@ func TestMRConnector(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() // TODO (jun): Use NewConn - conn, err := rpc.NewBlockingConn(ctx, ep) + conn, err := rpc.NewConn(ctx, ep) if err == nil { So(conn.Close(), ShouldBeNil) } @@ -181,7 +181,6 @@ func TestMRConnector(t *testing.T) { } } return true - }), ShouldBeTrue) }