diff --git a/pkg/go.mod b/pkg/go.mod index b8f1c4d4d5..9e143d6b28 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -105,6 +105,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/tchap/go-patricia v2.3.0+incompatible // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect diff --git a/pkg/helper/docker_center.go b/pkg/helper/docker_center.go index 925dbba8d6..3ad16b1666 100644 --- a/pkg/helper/docker_center.go +++ b/pkg/helper/docker_center.go @@ -27,7 +27,6 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" - docker "github.com/docker/docker/client" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/util" @@ -457,7 +456,8 @@ type DockerCenter struct { // For the CRI scenario, the container list only contains the real containers and excludes the sandbox containers. But the // sandbox meta would be saved to its bound container. containerMap map[string]*DockerInfoDetail // all containers will in this map - client *docker.Client + client DockerCenterClientInterface + containerHelper ContainerHelperInterface lastErrMu sync.Mutex lastErr error lock sync.RWMutex @@ -470,6 +470,24 @@ type DockerCenter struct { initStaticContainerInfoSuccess bool } +type DockerCenterClientInterface interface { + ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) + ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) + ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) + Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) +} + +type ContainerHelperInterface interface { + ContainerProcessAlive(pid int) bool +} + +type ContainerHelperWrapper struct { +} + +func (r *ContainerHelperWrapper) ContainerProcessAlive(pid int) bool { + return ContainerProcessAlive(pid) +} + func getIPByHosts(hostFileName, hostname string) string { lines, err := util.ReadLines(hostFileName) if err != nil { @@ -634,7 +652,9 @@ func getDockerCenterInstance() *DockerCenter { logger.Init() // load EnvTags first LoadEnvTags() - dockerCenterInstance = &DockerCenter{} + dockerCenterInstance = &DockerCenter{ + containerHelper: &ContainerHelperWrapper{}, + } dockerCenterInstance.imageCache = make(map[string]string) dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) // containerFindingManager works in a producer-consumer model @@ -1042,15 +1062,8 @@ func (dc *DockerCenter) fetchAll() error { time.Sleep(time.Second * 5) } if err == nil { - if !ContainerProcessAlive(containerDetail.State.Pid) { - containerDetail.State.Status = ContainerStatusExited - finishedAt := containerDetail.State.FinishedAt - finishedAtTime, _ := time.Parse(time.RFC3339, finishedAt) - now := time.Now() - duration := now.Sub(finishedAtTime) - if duration >= ContainerInfoDeletedTimeout { - continue - } + if !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { + continue } containerMap[container.ID] = dc.CreateInfoDetail(containerDetail, envConfigPrefix, false) } else { @@ -1058,8 +1071,7 @@ func (dc *DockerCenter) fetchAll() error { } } dc.updateContainers(containerMap) - - return err + return nil } func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error { @@ -1070,7 +1082,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error dc.setLastError(err, "inspect container error "+containerID) return err } - if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) { + if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { containerDetail.State.Status = ContainerStatusExited } // docker 场景下 @@ -1084,7 +1096,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error if err != nil { dc.setLastError(err, "inspect sandbox container error "+id) } else { - if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) { + if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { containerDetail.State.Status = ContainerStatusExited } dc.updateContainer(id, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)) @@ -1162,10 +1174,12 @@ func dockerCenterRecover() { func (dc *DockerCenter) initClient() error { var err error - // DockerCenterTimeout should only be used by context.WithTimeout() in specific methods - if dc.client, err = CreateDockerClient(); err != nil { - dc.setLastError(err, "init docker client from env error") - return err + // do not CreateDockerClient multi times + if dc.client == nil { + if dc.client, err = CreateDockerClient(); err != nil { + dc.setLastError(err, "init docker client from env error") + return err + } } return nil } diff --git a/pkg/helper/docker_center_test.go b/pkg/helper/docker_center_test.go index 78678eb37d..1ad2b1881d 100644 --- a/pkg/helper/docker_center_test.go +++ b/pkg/helper/docker_center_test.go @@ -15,13 +15,18 @@ package helper import ( + "context" + "errors" "os" "sync" "testing" + "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -338,3 +343,236 @@ func TestK8SInfo_IsMatch(t *testing.T) { }) } } + +type DockerClientMock struct { + mock.Mock +} + +type ContainerHelperMock struct { + mock.Mock +} + +// Events 实现了 DockerClient 的 Events 方法 +func (m *DockerClientMock) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { + args := m.Called(ctx, options) + return args.Get(0).(chan events.Message), args.Get(1).(chan error) +} + +func (m *DockerClientMock) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) { + args := m.Called(ctx, imageID) + return args.Get(0).(types.ImageInspect), args.Get(1).([]byte), args.Error(2) +} + +func (m *DockerClientMock) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { + args := m.Called(ctx, containerID) + return args.Get(0).(types.ContainerJSON), args.Error(1) +} +func (m *DockerClientMock) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + args := m.Called(ctx, options) + return args.Get(0).([]types.Container), args.Error(1) +} + +func (m *ContainerHelperMock) ContainerProcessAlive(pid int) bool { + args := m.Called(pid) + return args.Get(0).(bool) +} + +func TestDockerCenterEvents(t *testing.T) { + dockerCenterInstance = &DockerCenter{} + dockerCenterInstance.imageCache = make(map[string]string) + dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) + + mockClient := DockerClientMock{} + dockerCenterInstance.client = &mockClient + + containerHelper := ContainerHelperMock{} + + // 创建一个模拟的事件通道 + eventChan := make(chan events.Message, 1) + errChan := make(chan error, 1) + + mockClient.On("Events", mock.Anything, mock.Anything).Return(eventChan, errChan) + + go dockerCenterInstance.eventListener() + + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Once() + mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "event1", + Name: "name1", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + + eventChan <- events.Message{ID: "event1", Status: "rename"} + + time.Sleep(5 * time.Second) + containerLen := len(dockerCenterInstance.containerMap) + assert.Equal(t, 1, containerLen) + + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Once() + mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "event1", + Name: "start", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + eventChan <- events.Message{ID: "event1", Status: "start"} + + time.Sleep(5 * time.Second) + // 设置期望 + close(eventChan) + + containerLen = len(dockerCenterInstance.containerMap) + assert.Equal(t, 1, containerLen) +} + +func TestDockerCenterFetchAll(t *testing.T) { + dockerCenterInstance = &DockerCenter{} + dockerCenterInstance.imageCache = make(map[string]string) + dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) + + mockClient := DockerClientMock{} + dockerCenterInstance.client = &mockClient + + containerHelper := ContainerHelperMock{} + + mockContainerListResult := []types.Container{ + {ID: "id1"}, + {ID: "id2"}, + {ID: "id3"}, + } + + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true) + + mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil).Once() + + mockClient.On("ContainerInspect", mock.Anything, "id1").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id1", + Name: "event_name1", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + mockClient.On("ContainerInspect", mock.Anything, "id2").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id2", + Name: "event_name2", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + // one failed inspect + mockClient.On("ContainerInspect", mock.Anything, "id3").Return(types.ContainerJSON{}, errors.New("id3 not exist")).Times(3) + + err := dockerCenterInstance.fetchAll() + assert.Nil(t, err) + + containerLen := len(dockerCenterInstance.containerMap) + assert.Equal(t, 2, containerLen) + + mockContainerListResult2 := []types.Container{ + {ID: "id4"}, + {ID: "id5"}, + } + + mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult2, nil).Once() + + mockClient.On("ContainerInspect", mock.Anything, "id4").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id4", + Name: "event_name4", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + + mockClient.On("ContainerInspect", mock.Anything, "id5").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id5", + Name: "event_name5", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil).Once() + + err = dockerCenterInstance.fetchAll() + assert.Nil(t, err) + + containerLen = len(dockerCenterInstance.containerMap) + assert.Equal(t, 4, containerLen) +} + +func TestDockerCenterFetchAllAndOne(t *testing.T) { + dockerCenterInstance = &DockerCenter{} + dockerCenterInstance.imageCache = make(map[string]string) + dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) + + mockClient := DockerClientMock{} + dockerCenterInstance.client = &mockClient + + containerHelper := ContainerHelperMock{} + + mockContainerListResult := []types.Container{ + {ID: "id1"}, + {ID: "id2"}, + } + + mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil) + + mockClient.On("ContainerInspect", mock.Anything, "id1").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id1", + Name: "event_name1", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil) + mockClient.On("ContainerInspect", mock.Anything, "id2").Return(types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: "id2", + Name: "event_name2", + State: &types.ContainerState{}, + }, + Config: &container.Config{ + Env: make([]string, 0), + }, + }, nil) + + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Times(2) + + err := dockerCenterInstance.fetchAll() + assert.Nil(t, err) + + dockerCenterInstance.markRemove("id1") + dockerCenterInstance.markRemove("id2") + + containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Times(2) + err = dockerCenterInstance.fetchAll() + assert.Nil(t, err) + + containerLen := len(dockerCenterInstance.containerMap) + assert.Equal(t, 2, containerLen) + + for _, container := range dockerCenterInstance.containerMap { + assert.Equal(t, true, container.deleteFlag) + } +} diff --git a/pkg/helper/docker_cri_adapter.go b/pkg/helper/docker_cri_adapter.go index 702c3d9d2f..5f2b040b34 100644 --- a/pkg/helper/docker_cri_adapter.go +++ b/pkg/helper/docker_cri_adapter.go @@ -30,7 +30,6 @@ import ( "time" "github.com/containerd/containerd" - containerdcriserver "github.com/containerd/containerd/pkg/cri/server" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" @@ -98,7 +97,8 @@ func IsCRIStatusValid(criRuntimeEndpoint string) bool { logger.Debug(context.Background(), "Dial", addr, "failed", err) return false } - + // must close,otherwise connections will leak and case mem increase + defer conn.Close() client := cri.NewRuntimeServiceClient(conn) // check cri status _, err = client.Status(ctx, &cri.StatusRequest{}) @@ -179,7 +179,6 @@ func newRuntimeServiceClient() (cri.RuntimeServiceClient, error) { if err != nil { return nil, err } - return cri.NewRuntimeServiceClient(conn), nil } @@ -266,17 +265,14 @@ func (cw *CRIRuntimeWrapper) createContainerInfo(containerID string) (detail *Do if state == cri.ContainerState_CONTAINER_RUNNING && ContainerProcessAlive(int(ci.Pid)) { stateStatus = ContainerStatusRunning } - finishedTime := status.GetStatus().GetFinishedAt() - finishedAt := time.Unix(0, finishedTime).Format(time.RFC3339Nano) dockerContainer := types.ContainerJSON{ ContainerJSONBase: &types.ContainerJSONBase{ ID: containerID, Created: time.Unix(0, status.GetStatus().CreatedAt).Format(time.RFC3339Nano), LogPath: status.GetStatus().GetLogPath(), State: &types.ContainerState{ - Status: stateStatus, - Pid: int(ci.Pid), - FinishedAt: finishedAt, + Status: stateStatus, + Pid: int(ci.Pid), }, HostConfig: &container.HostConfig{ VolumeDriver: ci.Snapshotter, @@ -387,13 +383,7 @@ func (cw *CRIRuntimeWrapper) fetchAll() error { continue } if dockerContainer.Status() != ContainerStatusRunning { - finishedAt := dockerContainer.FinishedAt() - finishedAtTime, _ := time.Parse(time.RFC3339, finishedAt) - now := time.Now() - duration := now.Sub(finishedAtTime) - if duration >= ContainerInfoDeletedTimeout { - continue - } + continue } cw.containers[c.GetId()] = &innerContainerInfo{ State: c.State, diff --git a/plugins/input/docker/stdout/input_docker_stdout.go b/plugins/input/docker/stdout/input_docker_stdout.go index 0dc24d556a..a5efc8b7eb 100644 --- a/plugins/input/docker/stdout/input_docker_stdout.go +++ b/plugins/input/docker/stdout/input_docker_stdout.go @@ -44,6 +44,10 @@ func logDriverSupported(container types.ContainerJSON) bool { } } +func logPathEmpty(container types.ContainerJSON) bool { + return len(container.LogPath) == 0 +} + type DockerFileSyner struct { dockerFileReader *helper.LogFileReader dockerFileProcessor *DockerStdoutProcessor @@ -305,6 +309,9 @@ func (sds *ServiceDockerStdout) FlushAll(c pipeline.Collector, firstStart bool) if !logDriverSupported(info.ContainerInfo) { continue } + if logPathEmpty(info.ContainerInfo) { + continue + } if _, ok := sds.synerMap[id]; !ok || firstStart { syner := NewDockerFileSyner(sds, info, sds.checkpointMap) logger.Info(sds.context.GetRuntimeContext(), "docker stdout", "added", "source host path", info.ContainerInfo.LogPath,