Skip to content

Commit

Permalink
Fix the problem of increasing container meta information in Docker Ce…
Browse files Browse the repository at this point in the history
…nter and add UT to this module (#1938)

* refine docker center code

* fix comments

* fix comments
  • Loading branch information
linrunqi08 authored Dec 16, 2024
1 parent 9fe3c04 commit 1a59190
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 34 deletions.
1 change: 1 addition & 0 deletions pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tchap/go-patricia v2.3.0+incompatible // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect
Expand Down
52 changes: 33 additions & 19 deletions pkg/helper/docker_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
docker "github.com/docker/docker/client"

"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/util"
Expand Down Expand Up @@ -457,7 +456,8 @@ type DockerCenter struct {
// For the CRI scenario, the container list only contains the real containers and excludes the sandbox containers. But the
// sandbox meta would be saved to its bound container.
containerMap map[string]*DockerInfoDetail // all containers will in this map
client *docker.Client
client DockerCenterClientInterface
containerHelper ContainerHelperInterface
lastErrMu sync.Mutex
lastErr error
lock sync.RWMutex
Expand All @@ -470,6 +470,24 @@ type DockerCenter struct {
initStaticContainerInfoSuccess bool
}

type DockerCenterClientInterface interface {
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
}

type ContainerHelperInterface interface {
ContainerProcessAlive(pid int) bool
}

type ContainerHelperWrapper struct {
}

func (r *ContainerHelperWrapper) ContainerProcessAlive(pid int) bool {
return ContainerProcessAlive(pid)
}

func getIPByHosts(hostFileName, hostname string) string {
lines, err := util.ReadLines(hostFileName)
if err != nil {
Expand Down Expand Up @@ -634,7 +652,9 @@ func getDockerCenterInstance() *DockerCenter {
logger.InitLogger()
// load EnvTags first
LoadEnvTags()
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance = &DockerCenter{
containerHelper: &ContainerHelperWrapper{},
}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)
// containerFindingManager works in a producer-consumer model
Expand Down Expand Up @@ -1042,23 +1062,15 @@ func (dc *DockerCenter) fetchAll() error {
time.Sleep(time.Second * 5)
}
if err == nil {
if !ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
finishedAt := containerDetail.State.FinishedAt
finishedAtTime, _ := time.Parse(time.RFC3339, finishedAt)
now := time.Now()
duration := now.Sub(finishedAtTime)
if duration >= ContainerInfoDeletedTimeout {
continue
}
if !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
continue
}
containerMap[container.ID] = dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)
} else {
dc.setLastError(err, "inspect container error "+container.ID)
}
}
dc.updateContainers(containerMap)

return nil
}

Expand All @@ -1070,7 +1082,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error
dc.setLastError(err, "inspect container error "+containerID)
return err
}
if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) {
if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
}
// docker 场景下
Expand All @@ -1084,7 +1096,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error
if err != nil {
dc.setLastError(err, "inspect sandbox container error "+id)
} else {
if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) {
if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
}
dc.updateContainer(id, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false))
Expand Down Expand Up @@ -1162,10 +1174,12 @@ func dockerCenterRecover() {

func (dc *DockerCenter) initClient() error {
var err error
// DockerCenterTimeout should only be used by context.WithTimeout() in specific methods
if dc.client, err = CreateDockerClient(); err != nil {
dc.setLastError(err, "init docker client from env error")
return err
// do not CreateDockerClient multi times
if dc.client == nil {
if dc.client, err = CreateDockerClient(); err != nil {
dc.setLastError(err, "init docker client from env error")
return err
}
}
return nil
}
Expand Down
238 changes: 238 additions & 0 deletions pkg/helper/docker_center_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package helper

import (
"context"
"errors"
"os"
"sync"
"testing"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -338,3 +343,236 @@ func TestK8SInfo_IsMatch(t *testing.T) {
})
}
}

type DockerClientMock struct {
mock.Mock
}

type ContainerHelperMock struct {
mock.Mock
}

// Events 实现了 DockerClient 的 Events 方法
func (m *DockerClientMock) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
args := m.Called(ctx, options)
return args.Get(0).(chan events.Message), args.Get(1).(chan error)
}

func (m *DockerClientMock) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) {
args := m.Called(ctx, imageID)
return args.Get(0).(types.ImageInspect), args.Get(1).([]byte), args.Error(2)
}

func (m *DockerClientMock) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
args := m.Called(ctx, containerID)
return args.Get(0).(types.ContainerJSON), args.Error(1)
}
func (m *DockerClientMock) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
args := m.Called(ctx, options)
return args.Get(0).([]types.Container), args.Error(1)
}

func (m *ContainerHelperMock) ContainerProcessAlive(pid int) bool {
args := m.Called(pid)
return args.Get(0).(bool)
}

func TestDockerCenterEvents(t *testing.T) {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)

mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

// 创建一个模拟的事件通道
eventChan := make(chan events.Message, 1)
errChan := make(chan error, 1)

mockClient.On("Events", mock.Anything, mock.Anything).Return(eventChan, errChan)

go dockerCenterInstance.eventListener()

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Once()
mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "event1",
Name: "name1",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()

eventChan <- events.Message{ID: "event1", Status: "rename"}

time.Sleep(5 * time.Second)
containerLen := len(dockerCenterInstance.containerMap)
assert.Equal(t, 1, containerLen)

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Once()
mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "event1",
Name: "start",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()
eventChan <- events.Message{ID: "event1", Status: "start"}

time.Sleep(5 * time.Second)
// 设置期望
close(eventChan)

containerLen = len(dockerCenterInstance.containerMap)
assert.Equal(t, 1, containerLen)
}

func TestDockerCenterFetchAll(t *testing.T) {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)

mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

mockContainerListResult := []types.Container{
{ID: "id1"},
{ID: "id2"},
{ID: "id3"},
}

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true)

mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil).Once()

mockClient.On("ContainerInspect", mock.Anything, "id1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id1",
Name: "event_name1",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()
mockClient.On("ContainerInspect", mock.Anything, "id2").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id2",
Name: "event_name2",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()
// one failed inspect
mockClient.On("ContainerInspect", mock.Anything, "id3").Return(types.ContainerJSON{}, errors.New("id3 not exist")).Times(3)

err := dockerCenterInstance.fetchAll()
assert.Nil(t, err)

containerLen := len(dockerCenterInstance.containerMap)
assert.Equal(t, 2, containerLen)

mockContainerListResult2 := []types.Container{
{ID: "id4"},
{ID: "id5"},
}

mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult2, nil).Once()

mockClient.On("ContainerInspect", mock.Anything, "id4").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id4",
Name: "event_name4",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()

mockClient.On("ContainerInspect", mock.Anything, "id5").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id5",
Name: "event_name5",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()

err = dockerCenterInstance.fetchAll()
assert.Nil(t, err)

containerLen = len(dockerCenterInstance.containerMap)
assert.Equal(t, 4, containerLen)
}

func TestDockerCenterFetchAllAndOne(t *testing.T) {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)

mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

mockContainerListResult := []types.Container{
{ID: "id1"},
{ID: "id2"},
}

mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil)

mockClient.On("ContainerInspect", mock.Anything, "id1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id1",
Name: "event_name1",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil)
mockClient.On("ContainerInspect", mock.Anything, "id2").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id2",
Name: "event_name2",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil)

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Times(2)

err := dockerCenterInstance.fetchAll()
assert.Nil(t, err)

dockerCenterInstance.markRemove("id1")
dockerCenterInstance.markRemove("id2")

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Times(2)
err = dockerCenterInstance.fetchAll()
assert.Nil(t, err)

containerLen := len(dockerCenterInstance.containerMap)
assert.Equal(t, 2, containerLen)

for _, container := range dockerCenterInstance.containerMap {
assert.Equal(t, true, container.deleteFlag)
}
}
Loading

0 comments on commit 1a59190

Please sign in to comment.