From bc54195ad12fb55bc342e2c32dfd40d61655df64 Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Tue, 12 Mar 2024 16:48:34 +0100 Subject: [PATCH] test(abciclient): add parallel grpc execution test (#758) * test(abciclient): add grpc client test * chore: typo --- abci/client/client.go | 8 +-- abci/client/grpc_client_test.go | 114 ++++++++++++++++++++++++++++++ abci/client/routed_client_test.go | 1 - 3 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 abci/client/grpc_client_test.go diff --git a/abci/client/client.go b/abci/client/client.go index 39f4a22c2..7f829e04f 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -69,9 +69,9 @@ func makeReqRes(ctx context.Context, req *types.Request) *requestAndResponse { } // markDone marks the ReqRes object as done. -func (r *requestAndResponse) markDone() { - r.mtx.Lock() - defer r.mtx.Unlock() +func (reqResp *requestAndResponse) markDone() { + reqResp.mtx.Lock() + defer reqResp.mtx.Unlock() - close(r.signal) + close(reqResp.signal) } diff --git a/abci/client/grpc_client_test.go b/abci/client/grpc_client_test.go new file mode 100644 index 000000000..6cf9c2b9a --- /dev/null +++ b/abci/client/grpc_client_test.go @@ -0,0 +1,114 @@ +package abciclient_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/fortytw2/leaktest" + + abciclient "github.com/dashpay/tenderdash/abci/client" + abciserver "github.com/dashpay/tenderdash/abci/server" + "github.com/dashpay/tenderdash/abci/types" + "github.com/dashpay/tenderdash/libs/log" + "github.com/dashpay/tenderdash/libs/service" +) + +// TestGRPCClientServerParallel tests that gRPC client and server can handle multiple parallel requests +func TestGRPCClientServerParallel(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + logger := log.NewNopLogger() + app := &mockApplication{t: t} + + socket := t.TempDir() + "/grpc_test" + client, _, err := makeGRPCClientServer(ctx, t, logger, app, socket) + if err != nil { + t.Fatal(err) + } + + // we'll use that mutex to ensure threads don't finish before we check status + app.mtx.Lock() + + const threads = 5 + // started will be marked as done as soon as app.Info() handler executes on the server + app.started.Add(threads) + // done will be used to wait for all threads to finish + var done sync.WaitGroup + done.Add(threads) + + for i := 0; i < threads; i++ { + thread := uint64(i) + go func() { + _, _ = client.Info(ctx, &types.RequestInfo{BlockVersion: thread}) + done.Done() + }() + } + + // wait for threads to execute + // note it doesn't mean threads are really done, as they are waiting on the mtx + // so if all `started` are marked as done, it means all threads have started + // in parallel + app.started.Wait() + + // unlock the mutex so that threads can finish their execution + app.mtx.Unlock() + + // wait for all threads to really finish + done.Wait() +} + +func makeGRPCClientServer( + ctx context.Context, + t *testing.T, + logger log.Logger, + app types.Application, + name string, +) (abciclient.Client, service.Service, error) { + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + t.Cleanup(leaktest.Check(t)) + + // Start the listener + socket := fmt.Sprintf("unix://%s.sock", name) + + server := abciserver.NewGRPCServer(logger.With("module", "abci-server"), socket, app) + + if err := server.Start(ctx); err != nil { + cancel() + return nil, nil, err + } + + client := abciclient.NewGRPCClient(logger.With("module", "abci-client"), socket, true) + + if err := client.Start(ctx); err != nil { + cancel() + return nil, nil, err + } + return client, server, nil +} + +// mockApplication that will decrease mockApplication.started when called Info, and then wait until +// mtx is unlocked before it finishes +type mockApplication struct { + types.BaseApplication + mtx sync.Mutex + // we'll use that to ensure all threads have started + started sync.WaitGroup + + t *testing.T +} + +func (m *mockApplication) Info(_ctx context.Context, req *types.RequestInfo) (res *types.ResponseInfo, err error) { + m.t.Logf("Info %d called", req.BlockVersion) + // mark wg as done to signal that we have executed + m.started.Done() + // we will wait here until all threads mark wg as done + m.mtx.Lock() + defer m.mtx.Unlock() + m.t.Logf("Info %d finished", req.BlockVersion) + return &types.ResponseInfo{}, nil +} diff --git a/abci/client/routed_client_test.go b/abci/client/routed_client_test.go index b796bf6ec..54e765b0f 100644 --- a/abci/client/routed_client_test.go +++ b/abci/client/routed_client_test.go @@ -128,7 +128,6 @@ func TestRoutedClientGrpc(t *testing.T) { logger := log.NewTestingLogger(t) - // app := types.NewBaseApplication() app := mocks.NewApplication(t) defer app.AssertExpectations(t) app.On("Echo", mock.Anything, mock.Anything).Return(