diff --git a/pkg/go.mod b/pkg/go.mod index 5e9313b879..3cac003033 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -115,6 +115,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/tchap/go-patricia v2.3.0+incompatible // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect diff --git a/pkg/helper/container_export.go b/pkg/helper/container_export.go index a63c89b447..45b89631bf 100644 --- a/pkg/helper/container_export.go +++ b/pkg/helper/container_export.go @@ -191,21 +191,24 @@ func SplitRegexFromMap(input map[string]string) (staticResult map[string]string, return staticResult, regexResult, nil } -func CreateDockerClient(opt ...docker.Opt) (client *docker.Client, err error) { +func CreateDockerClient(opt ...docker.Opt) (*DockerClientWrapper, error) { opt = append(opt, docker.FromEnv) - client, err = docker.NewClientWithOpts(opt...) + rawClient, err := docker.NewClientWithOpts(opt...) if err != nil { return nil, err } // add dockerClient connectivity tests pingCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - ping, err := client.Ping(pingCtx) + ping, err := rawClient.Ping(pingCtx) if err != nil { return nil, err } - client.NegotiateAPIVersionPing(ping) - return + rawClient.NegotiateAPIVersionPing(ping) + + return &DockerClientWrapper{ + client: rawClient, + }, nil } func RegisterDockerEventListener(c chan events.Message) { diff --git a/pkg/helper/docker_center.go b/pkg/helper/docker_center.go index 9d481160c8..f756f04ba4 100644 --- a/pkg/helper/docker_center.go +++ b/pkg/helper/docker_center.go @@ -18,6 +18,7 @@ import ( "context" "errors" "hash/fnv" + "io" "path" "regexp" "runtime" @@ -458,7 +459,7 @@ type DockerCenter struct { // For the CRI scenario, the container list only contains the real containers and excludes the sandbox containers. But the // sandbox meta would be saved to its bound container. containerMap map[string]*DockerInfoDetail // all containers will in this map - client *docker.Client + client DockerCenterClientInterface lastErrMu sync.Mutex lastErr error lock sync.RWMutex @@ -471,6 +472,43 @@ type DockerCenter struct { initStaticContainerInfoSuccess bool } +type DockerCenterClientInterface interface { + ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) + ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) + ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) + Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) + ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) + ContainerProcessAlive(pid int) bool +} + +type DockerClientWrapper struct { + client *docker.Client +} + +func (r *DockerClientWrapper) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + return r.client.ContainerList(ctx, options) +} + +func (r *DockerClientWrapper) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) { + return r.client.ImageInspectWithRaw(ctx, imageID) +} + +func (r *DockerClientWrapper) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return r.client.ContainerInspect(ctx, containerID) +} + +func (r *DockerClientWrapper) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { + return r.client.Events(ctx, options) +} + +func (r *DockerClientWrapper) ContainerProcessAlive(pid int) bool { + return ContainerProcessAlive(pid) +} + +func (r *DockerClientWrapper) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + return r.client.ContainerLogs(ctx, container, options) +} + func getIPByHosts(hostFileName, hostname string) string { lines, err := util.ReadLines(hostFileName) if err != nil { @@ -1023,35 +1061,6 @@ func (dc *DockerCenter) updateContainer(id string, container *DockerInfoDetail) dc.refreshLastUpdateMapTime() } -func (dc *DockerCenter) inspectOneContainer(containerID string) (types.ContainerJSON, error) { - var err error - var containerDetail types.ContainerJSON - for idx := 0; idx < 3; idx++ { - if containerDetail, err = dc.client.ContainerInspect(context.Background(), containerID); err == nil { - break - } - time.Sleep(time.Second * 5) - } - if err != nil { - dc.setLastError(err, "inspect container error "+containerID) - return types.ContainerJSON{}, err - } - if !ContainerProcessAlive(containerDetail.State.Pid) { - containerDetail.State.Status = ContainerStatusExited - finishedAt := containerDetail.State.FinishedAt - finishedAtTime, _ := time.Parse(time.RFC3339, finishedAt) - now := time.Now() - duration := now.Sub(finishedAtTime) - if duration >= ContainerInfoDeletedTimeout { - errMsg := "inspect time out container " + containerID - err = errors.New(errMsg) - dc.setLastError(err, errMsg) - return types.ContainerJSON{}, err - } - } - return containerDetail, nil -} - func (dc *DockerCenter) fetchAll() error { dc.containerStateLock.Lock() defer dc.containerStateLock.Unlock() @@ -1065,21 +1074,37 @@ func (dc *DockerCenter) fetchAll() error { for _, container := range containers { var containerDetail types.ContainerJSON - containerDetail, err = dc.inspectOneContainer(container.ID) + for idx := 0; idx < 3; idx++ { + if containerDetail, err = dc.client.ContainerInspect(context.Background(), container.ID); err == nil { + break + } + time.Sleep(time.Second * 5) + } if err == nil { + if !dc.client.ContainerProcessAlive(containerDetail.State.Pid) { + continue + } containerMap[container.ID] = dc.CreateInfoDetail(containerDetail, envConfigPrefix, false) + } else { + dc.setLastError(err, "inspect container error "+container.ID) } } dc.updateContainers(containerMap) - return nil } func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error { dc.containerStateLock.Lock() defer dc.containerStateLock.Unlock() - containerDetail, err := dc.inspectOneContainer(containerID) + containerDetail, err := dc.client.ContainerInspect(context.Background(), containerID) if err != nil { + dc.setLastError(err, "inspect container error "+containerID) + return err + } + if !dc.client.ContainerProcessAlive(containerDetail.State.Pid) { + errMsg := "inspect time out container " + containerID + err = errors.New(errMsg) + dc.setLastError(err, errMsg) return err } // docker 场景下 @@ -1093,7 +1118,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error if err != nil { dc.setLastError(err, "inspect sandbox container error "+id) } else { - if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) { + if containerDetail.State.Status == ContainerStatusRunning && !dc.client.ContainerProcessAlive(containerDetail.State.Pid) { containerDetail.State.Status = ContainerStatusExited } dc.updateContainer(id, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)) @@ -1171,10 +1196,12 @@ func dockerCenterRecover() { func (dc *DockerCenter) initClient() error { var err error - // DockerCenterTimeout should only be used by context.WithTimeout() in specific methods - if dc.client, err = CreateDockerClient(); err != nil { - dc.setLastError(err, "init docker client from env error") - return err + // do not CreateDockerClient multi times + if dc.client == nil { + if dc.client, err = CreateDockerClient(); err != nil { + dc.setLastError(err, "init docker client from env error") + return err + } } return nil } diff --git a/pkg/helper/docker_center_test.go b/pkg/helper/docker_center_test.go index 78678eb37d..40eb546b8b 100644 --- a/pkg/helper/docker_center_test.go +++ b/pkg/helper/docker_center_test.go @@ -15,13 +15,19 @@ package helper import ( + "context" + "errors" + "io" "os" "sync" "testing" + "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -338,3 +344,231 @@ func TestK8SInfo_IsMatch(t *testing.T) { }) } } + +type DockerClientMock struct { + mock.Mock +} + +// Events 实现了 DockerClient 的 Events 方法 +func (m *DockerClientMock) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { + args := m.Called(ctx, options) + return args.Get(0).(chan events.Message), args.Get(1).(chan error) +} + +func (m *DockerClientMock) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) { + args := m.Called(ctx, imageID) + return args.Get(0).(types.ImageInspect), args.Get(1).([]byte), args.Error(2) +} + +func (m *DockerClientMock) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { + args := m.Called(ctx, containerID) + return args.Get(0).(types.ContainerJSON), args.Error(1) +} +func (m *DockerClientMock) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + args := m.Called(ctx, options) + return args.Get(0).([]types.Container), args.Error(1) +} + +func (m *DockerClientMock) ContainerProcessAlive(pid int) bool { + args := m.Called(pid) + return args.Get(0).(bool) +} + +func (m *DockerClientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + args := m.Called(ctx, container, options) + return args.Get(0).(io.ReadCloser), args.Error(1) +} + +func TestDockerCenterEvents(t *testing.T) { + dockerCenterInstance = &DockerCenter{} + dockerCenterInstance.imageCache = make(map[string]string) + dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) + + mockClient := DockerClientMock{} + dockerCenterInstance.client = &mockClient + + // 创建一个模拟的事件通道 + eventChan := make(chan events.Message, 1) + errChan := make(chan error, 1) + + mockClient.On("Events", mock.Anything, mock.Anything).Return(eventChan, errChan) + + go dockerCenterInstance.eventListener() + + mockClient.On("ContainerProcessAlive", mock.Anything).Return(false).Once() + mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "event1", + Name: "name1", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + + eventChan <- events.Message{ID: "event1", Status: "rename"} + + time.Sleep(5 * time.Second) + containerLen := len(dockerCenterInstance.containerMap) + assert.Equal(t, 0, containerLen) + + mockClient.On("ContainerProcessAlive", mock.Anything).Return(true).Once() + mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "event1", + Name: "start", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + eventChan <- events.Message{ID: "event1", Status: "start"} + + time.Sleep(5 * time.Second) + // 设置期望 + close(eventChan) + + containerLen = len(dockerCenterInstance.containerMap) + assert.Equal(t, 1, containerLen) +} + +func TestDockerCenterFetchAll(t *testing.T) { + dockerCenterInstance = &DockerCenter{} + dockerCenterInstance.imageCache = make(map[string]string) + dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) + + mockClient := DockerClientMock{} + dockerCenterInstance.client = &mockClient + + mockContainerListResult := []types.Container{ + {ID: "id1"}, + {ID: "id2"}, + {ID: "id3"}, + } + + mockClient.On("ContainerProcessAlive", mock.Anything).Return(true) + + mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil).Once() + + mockClient.On("ContainerInspect", mock.Anything, "id1").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id1", + Name: "event_name1", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + mockClient.On("ContainerInspect", mock.Anything, "id2").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id2", + Name: "event_name2", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + // one failed inspect + mockClient.On("ContainerInspect", mock.Anything, "id3").Return(types.ContainerJSON{}, errors.New("id3 not exist")).Times(3) + + err := dockerCenterInstance.fetchAll() + assert.Nil(t, err) + + containerLen := len(dockerCenterInstance.containerMap) + assert.Equal(t, 2, containerLen) + + mockContainerListResult2 := []types.Container{ + {ID: "id4"}, + {ID: "id5"}, + } + + mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult2, nil).Once() + + mockClient.On("ContainerInspect", mock.Anything, "id4").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id4", + Name: "event_name4", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + + mockClient.On("ContainerInspect", mock.Anything, "id5").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id5", + Name: "event_name5", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + + err = dockerCenterInstance.fetchAll() + assert.Nil(t, err) + + containerLen = len(dockerCenterInstance.containerMap) + assert.Equal(t, 4, containerLen) +} + +func TestDockerCenterFetchAllAndOne(t *testing.T) { + dockerCenterInstance = &DockerCenter{} + dockerCenterInstance.imageCache = make(map[string]string) + dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) + + mockClient := DockerClientMock{} + dockerCenterInstance.client = &mockClient + + mockContainerListResult := []types.Container{ + {ID: "id1"}, + {ID: "id2"}, + } + + mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil) + + mockClient.On("ContainerInspect", mock.Anything, "id1").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id1", + Name: "event_name1", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil) + mockClient.On("ContainerInspect", mock.Anything, "id2").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id2", + Name: "event_name2", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil) + + mockClient.On("ContainerProcessAlive", mock.Anything).Return(true).Times(2) + + err := dockerCenterInstance.fetchAll() + assert.Nil(t, err) + + dockerCenterInstance.markRemove("id1") + dockerCenterInstance.markRemove("id2") + + mockClient.On("ContainerProcessAlive", mock.Anything).Return(false).Times(2) + err = dockerCenterInstance.fetchAll() + assert.Nil(t, err) + + containerLen := len(dockerCenterInstance.containerMap) + assert.Equal(t, 2, containerLen) + + for _, container := range dockerCenterInstance.containerMap { + assert.Equal(t, true, container.deleteFlag) + } +} diff --git a/pkg/helper/docker_cri_adapter.go b/pkg/helper/docker_cri_adapter.go index dcaef6c644..01e2c327de 100644 --- a/pkg/helper/docker_cri_adapter.go +++ b/pkg/helper/docker_cri_adapter.go @@ -26,7 +26,6 @@ import ( "time" "github.com/containerd/containerd" - containerdcriserver "github.com/containerd/containerd/pkg/cri/server" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" @@ -92,7 +91,8 @@ func IsCRIStatusValid(criRuntimeEndpoint string) bool { logger.Debug(context.Background(), "Dial", addr, "failed", err) return false } - + // must close,otherwise connections will leak and case mem increase + defer conn.Close() client := cri.NewRuntimeServiceClient(conn) // check cri status _, err = client.Status(ctx, &cri.StatusRequest{}) @@ -156,7 +156,6 @@ func newRuntimeServiceClient() (cri.RuntimeServiceClient, error) { if err != nil { return nil, err } - return cri.NewRuntimeServiceClient(conn), nil } @@ -243,17 +242,14 @@ func (cw *CRIRuntimeWrapper) createContainerInfo(containerID string) (detail *Do if state == cri.ContainerState_CONTAINER_RUNNING && ContainerProcessAlive(int(ci.Pid)) { stateStatus = ContainerStatusRunning } - finishedTime := status.GetStatus().GetFinishedAt() - finishedAt := time.Unix(0, finishedTime).Format(time.RFC3339Nano) dockerContainer := types.ContainerJSON{ ContainerJSONBase: &types.ContainerJSONBase{ ID: containerID, Created: time.Unix(0, status.GetStatus().CreatedAt).Format(time.RFC3339Nano), LogPath: status.GetStatus().GetLogPath(), State: &types.ContainerState{ - Status: stateStatus, - Pid: int(ci.Pid), - FinishedAt: finishedAt, + Status: stateStatus, + Pid: int(ci.Pid), }, HostConfig: &container.HostConfig{ VolumeDriver: ci.Snapshotter, @@ -364,13 +360,7 @@ func (cw *CRIRuntimeWrapper) fetchAll() error { continue } if dockerContainer.Status() != ContainerStatusRunning { - finishedAt := dockerContainer.FinishedAt() - finishedAtTime, _ := time.Parse(time.RFC3339, finishedAt) - now := time.Now() - duration := now.Sub(finishedAtTime) - if duration >= ContainerInfoDeletedTimeout { - continue - } + continue } cw.containers[c.GetId()] = &innerContainerInfo{ State: c.State, diff --git a/plugins/input/docker/rawstdout/input_docker_stdout.go b/plugins/input/docker/rawstdout/input_docker_stdout.go index 92c2a3040b..a4c539db25 100644 --- a/plugins/input/docker/rawstdout/input_docker_stdout.go +++ b/plugins/input/docker/rawstdout/input_docker_stdout.go @@ -24,7 +24,6 @@ import ( "time" "github.com/docker/docker/api/types" - docker "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" "github.com/alibaba/ilogtail/pkg/helper" @@ -84,7 +83,7 @@ type stdoutSyner struct { ExternalK8sLabelTag map[string]string info *helper.DockerInfoDetail - client *docker.Client + client helper.DockerCenterClientInterface startCheckPoint string lock sync.Mutex stdoutCheckPoint *StdoutCheckPoint @@ -361,7 +360,7 @@ type ServiceDockerStdout struct { K8sFilter *helper.K8SFilter synerMap map[string]*stdoutSyner - client *docker.Client + client helper.DockerCenterClientInterface shutdown chan struct{} waitGroup sync.WaitGroup context pipeline.Context