From 0f6c3afbf4e27d5adcbbf998348d7dc7b8e6f711 Mon Sep 17 00:00:00 2001 From: linrunqi08 Date: Mon, 16 Dec 2024 06:08:11 +0000 Subject: [PATCH] fix comments --- pkg/helper/container_export.go | 13 +++--- pkg/helper/docker_center.go | 40 +++++-------------- pkg/helper/docker_center_test.go | 28 +++++++------ .../docker/rawstdout/input_docker_stdout.go | 5 ++- 4 files changed, 35 insertions(+), 51 deletions(-) diff --git a/pkg/helper/container_export.go b/pkg/helper/container_export.go index 45b89631bf..a63c89b447 100644 --- a/pkg/helper/container_export.go +++ b/pkg/helper/container_export.go @@ -191,24 +191,21 @@ func SplitRegexFromMap(input map[string]string) (staticResult map[string]string, return staticResult, regexResult, nil } -func CreateDockerClient(opt ...docker.Opt) (*DockerClientWrapper, error) { +func CreateDockerClient(opt ...docker.Opt) (client *docker.Client, err error) { opt = append(opt, docker.FromEnv) - rawClient, err := docker.NewClientWithOpts(opt...) + client, err = docker.NewClientWithOpts(opt...) if err != nil { return nil, err } // add dockerClient connectivity tests pingCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - ping, err := rawClient.Ping(pingCtx) + ping, err := client.Ping(pingCtx) if err != nil { return nil, err } - rawClient.NegotiateAPIVersionPing(ping) - - return &DockerClientWrapper{ - client: rawClient, - }, nil + client.NegotiateAPIVersionPing(ping) + return } func RegisterDockerEventListener(c chan events.Message) { diff --git a/pkg/helper/docker_center.go b/pkg/helper/docker_center.go index 87f0f9356e..275fdd18a0 100644 --- a/pkg/helper/docker_center.go +++ b/pkg/helper/docker_center.go @@ -17,7 +17,6 @@ package helper import ( "context" "hash/fnv" - "io" "path" "regexp" "runtime" @@ -28,7 +27,6 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" - docker "github.com/docker/docker/client" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/util" @@ -459,6 +457,7 @@ type DockerCenter struct { // sandbox meta would be saved to its bound container. containerMap map[string]*DockerInfoDetail // all containers will in this map client DockerCenterClientInterface + containerHelper *DockerCenterHelperWrapper lastErrMu sync.Mutex lastErr error lock sync.RWMutex @@ -476,38 +475,19 @@ type DockerCenterClientInterface interface { ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) - ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) - ContainerProcessAlive(pid int) bool -} - -type DockerClientWrapper struct { - client *docker.Client } -func (r *DockerClientWrapper) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { - return r.client.ContainerList(ctx, options) -} - -func (r *DockerClientWrapper) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) { - return r.client.ImageInspectWithRaw(ctx, imageID) -} - -func (r *DockerClientWrapper) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { - return r.client.ContainerInspect(ctx, containerID) +type DockerCenterHelperInterface interface { + ContainerProcessAlive(pid int) bool } -func (r *DockerClientWrapper) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { - return r.client.Events(ctx, options) +type DockerCenterHelperWrapper struct { } -func (r *DockerClientWrapper) ContainerProcessAlive(pid int) bool { +func (r *DockerCenterHelperWrapper) ContainerProcessAlive(pid int) bool { return ContainerProcessAlive(pid) } -func (r *DockerClientWrapper) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) { - return r.client.ContainerLogs(ctx, container, options) -} - func getIPByHosts(hostFileName, hostname string) string { lines, err := util.ReadLines(hostFileName) if err != nil { @@ -672,7 +652,9 @@ func getDockerCenterInstance() *DockerCenter { logger.InitLogger() // load EnvTags first LoadEnvTags() - dockerCenterInstance = &DockerCenter{} + dockerCenterInstance = &DockerCenter{ + containerHelper: &DockerCenterHelperWrapper{}, + } dockerCenterInstance.imageCache = make(map[string]string) dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) // containerFindingManager works in a producer-consumer model @@ -1080,7 +1062,7 @@ func (dc *DockerCenter) fetchAll() error { time.Sleep(time.Second * 5) } if err == nil { - if !dc.client.ContainerProcessAlive(containerDetail.State.Pid) { + if !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { continue } containerMap[container.ID] = dc.CreateInfoDetail(containerDetail, envConfigPrefix, false) @@ -1100,7 +1082,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error dc.setLastError(err, "inspect container error "+containerID) return err } - if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) { + if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { containerDetail.State.Status = ContainerStatusExited } // docker 场景下 @@ -1114,7 +1096,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error if err != nil { dc.setLastError(err, "inspect sandbox container error "+id) } else { - if containerDetail.State.Status == ContainerStatusRunning && !dc.client.ContainerProcessAlive(containerDetail.State.Pid) { + if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { containerDetail.State.Status = ContainerStatusExited } dc.updateContainer(id, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)) diff --git a/pkg/helper/docker_center_test.go b/pkg/helper/docker_center_test.go index 7d293bed11..1ad2b1881d 100644 --- a/pkg/helper/docker_center_test.go +++ b/pkg/helper/docker_center_test.go @@ -17,7 +17,6 @@ package helper import ( "context" "errors" - "io" "os" "sync" "testing" @@ -349,6 +348,10 @@ type DockerClientMock struct { mock.Mock } +type ContainerHelperMock struct { + mock.Mock +} + // Events 实现了 DockerClient 的 Events 方法 func (m *DockerClientMock) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { args := m.Called(ctx, options) @@ -369,16 +372,11 @@ func (m *DockerClientMock) ContainerList(ctx context.Context, options types.Cont return args.Get(0).([]types.Container), args.Error(1) } -func (m *DockerClientMock) ContainerProcessAlive(pid int) bool { +func (m *ContainerHelperMock) ContainerProcessAlive(pid int) bool { args := m.Called(pid) return args.Get(0).(bool) } -func (m *DockerClientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) { - args := m.Called(ctx, container, options) - return args.Get(0).(io.ReadCloser), args.Error(1) -} - func TestDockerCenterEvents(t *testing.T) { dockerCenterInstance = &DockerCenter{} dockerCenterInstance.imageCache = make(map[string]string) @@ -387,6 +385,8 @@ func TestDockerCenterEvents(t *testing.T) { mockClient := DockerClientMock{} dockerCenterInstance.client = &mockClient + containerHelper := ContainerHelperMock{} + // 创建一个模拟的事件通道 eventChan := make(chan events.Message, 1) errChan := make(chan error, 1) @@ -395,7 +395,7 @@ func TestDockerCenterEvents(t *testing.T) { go dockerCenterInstance.eventListener() - mockClient.On("ContainerProcessAlive", mock.Anything).Return(false).Once() + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Once() mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{ ContainerJSONBase: &types.ContainerJSONBase{ ID: "event1", @@ -413,7 +413,7 @@ func TestDockerCenterEvents(t *testing.T) { containerLen := len(dockerCenterInstance.containerMap) assert.Equal(t, 1, containerLen) - mockClient.On("ContainerProcessAlive", mock.Anything).Return(true).Once() + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Once() mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{ ContainerJSONBase: &types.ContainerJSONBase{ ID: "event1", @@ -442,13 +442,15 @@ func TestDockerCenterFetchAll(t *testing.T) { mockClient := DockerClientMock{} dockerCenterInstance.client = &mockClient + containerHelper := ContainerHelperMock{} + mockContainerListResult := []types.Container{ {ID: "id1"}, {ID: "id2"}, {ID: "id3"}, } - mockClient.On("ContainerProcessAlive", mock.Anything).Return(true) + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true) mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil).Once() @@ -525,6 +527,8 @@ func TestDockerCenterFetchAllAndOne(t *testing.T) { mockClient := DockerClientMock{} dockerCenterInstance.client = &mockClient + containerHelper := ContainerHelperMock{} + mockContainerListResult := []types.Container{ {ID: "id1"}, {ID: "id2"}, @@ -553,7 +557,7 @@ func TestDockerCenterFetchAllAndOne(t *testing.T) { }, }, nil) - mockClient.On("ContainerProcessAlive", mock.Anything).Return(true).Times(2) + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Times(2) err := dockerCenterInstance.fetchAll() assert.Nil(t, err) @@ -561,7 +565,7 @@ func TestDockerCenterFetchAllAndOne(t *testing.T) { dockerCenterInstance.markRemove("id1") dockerCenterInstance.markRemove("id2") - mockClient.On("ContainerProcessAlive", mock.Anything).Return(false).Times(2) + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Times(2) err = dockerCenterInstance.fetchAll() assert.Nil(t, err) diff --git a/plugins/input/docker/rawstdout/input_docker_stdout.go b/plugins/input/docker/rawstdout/input_docker_stdout.go index a4c539db25..92c2a3040b 100644 --- a/plugins/input/docker/rawstdout/input_docker_stdout.go +++ b/plugins/input/docker/rawstdout/input_docker_stdout.go @@ -24,6 +24,7 @@ import ( "time" "github.com/docker/docker/api/types" + docker "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" "github.com/alibaba/ilogtail/pkg/helper" @@ -83,7 +84,7 @@ type stdoutSyner struct { ExternalK8sLabelTag map[string]string info *helper.DockerInfoDetail - client helper.DockerCenterClientInterface + client *docker.Client startCheckPoint string lock sync.Mutex stdoutCheckPoint *StdoutCheckPoint @@ -360,7 +361,7 @@ type ServiceDockerStdout struct { K8sFilter *helper.K8SFilter synerMap map[string]*stdoutSyner - client helper.DockerCenterClientInterface + client *docker.Client shutdown chan struct{} waitGroup sync.WaitGroup context pipeline.Context