Skip to content

Commit

Permalink
[backport]: Fix the problem of increasing container meta information …
Browse files Browse the repository at this point in the history
…in Docker Center and add UT to this module (#1938) (#1969)

* refine docker center code

* fix comments

* fix comments
Conflicts:
	pkg/helper/docker_center.go
  • Loading branch information
linrunqi08 authored Dec 16, 2024
1 parent b014784 commit 48d786a
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 35 deletions.
1 change: 1 addition & 0 deletions pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tchap/go-patricia v2.3.0+incompatible // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect
Expand Down
54 changes: 34 additions & 20 deletions pkg/helper/docker_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
docker "github.com/docker/docker/client"

"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/util"
Expand Down Expand Up @@ -457,7 +456,8 @@ type DockerCenter struct {
// For the CRI scenario, the container list only contains the real containers and excludes the sandbox containers. But the
// sandbox meta would be saved to its bound container.
containerMap map[string]*DockerInfoDetail // all containers will in this map
client *docker.Client
client DockerCenterClientInterface
containerHelper ContainerHelperInterface
lastErrMu sync.Mutex
lastErr error
lock sync.RWMutex
Expand All @@ -470,6 +470,24 @@ type DockerCenter struct {
initStaticContainerInfoSuccess bool
}

type DockerCenterClientInterface interface {
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
}

type ContainerHelperInterface interface {
ContainerProcessAlive(pid int) bool
}

type ContainerHelperWrapper struct {
}

func (r *ContainerHelperWrapper) ContainerProcessAlive(pid int) bool {
return ContainerProcessAlive(pid)
}

func getIPByHosts(hostFileName, hostname string) string {
lines, err := util.ReadLines(hostFileName)
if err != nil {
Expand Down Expand Up @@ -634,7 +652,9 @@ func getDockerCenterInstance() *DockerCenter {
logger.Init()
// load EnvTags first
LoadEnvTags()
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance = &DockerCenter{
containerHelper: &ContainerHelperWrapper{},
}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)
// containerFindingManager works in a producer-consumer model
Expand Down Expand Up @@ -1042,24 +1062,16 @@ func (dc *DockerCenter) fetchAll() error {
time.Sleep(time.Second * 5)
}
if err == nil {
if !ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
finishedAt := containerDetail.State.FinishedAt
finishedAtTime, _ := time.Parse(time.RFC3339, finishedAt)
now := time.Now()
duration := now.Sub(finishedAtTime)
if duration >= ContainerInfoDeletedTimeout {
continue
}
if !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
continue
}
containerMap[container.ID] = dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)
} else {
dc.setLastError(err, "inspect container error "+container.ID)
}
}
dc.updateContainers(containerMap)

return err
return nil
}

func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error {
Expand All @@ -1070,7 +1082,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error
dc.setLastError(err, "inspect container error "+containerID)
return err
}
if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) {
if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
}
// docker 场景下
Expand All @@ -1084,7 +1096,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error
if err != nil {
dc.setLastError(err, "inspect sandbox container error "+id)
} else {
if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) {
if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
}
dc.updateContainer(id, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false))
Expand Down Expand Up @@ -1162,10 +1174,12 @@ func dockerCenterRecover() {

func (dc *DockerCenter) initClient() error {
var err error
// DockerCenterTimeout should only be used by context.WithTimeout() in specific methods
if dc.client, err = CreateDockerClient(); err != nil {
dc.setLastError(err, "init docker client from env error")
return err
// do not CreateDockerClient multi times
if dc.client == nil {
if dc.client, err = CreateDockerClient(); err != nil {
dc.setLastError(err, "init docker client from env error")
return err
}
}
return nil
}
Expand Down
238 changes: 238 additions & 0 deletions pkg/helper/docker_center_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package helper

import (
"context"
"errors"
"os"
"sync"
"testing"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -338,3 +343,236 @@ func TestK8SInfo_IsMatch(t *testing.T) {
})
}
}

type DockerClientMock struct {
mock.Mock
}

type ContainerHelperMock struct {
mock.Mock
}

// Events 实现了 DockerClient 的 Events 方法
func (m *DockerClientMock) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
args := m.Called(ctx, options)
return args.Get(0).(chan events.Message), args.Get(1).(chan error)
}

func (m *DockerClientMock) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) {
args := m.Called(ctx, imageID)
return args.Get(0).(types.ImageInspect), args.Get(1).([]byte), args.Error(2)
}

func (m *DockerClientMock) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
args := m.Called(ctx, containerID)
return args.Get(0).(types.ContainerJSON), args.Error(1)
}
func (m *DockerClientMock) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
args := m.Called(ctx, options)
return args.Get(0).([]types.Container), args.Error(1)
}

func (m *ContainerHelperMock) ContainerProcessAlive(pid int) bool {
args := m.Called(pid)
return args.Get(0).(bool)
}

func TestDockerCenterEvents(t *testing.T) {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)

mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

// 创建一个模拟的事件通道
eventChan := make(chan events.Message, 1)
errChan := make(chan error, 1)

mockClient.On("Events", mock.Anything, mock.Anything).Return(eventChan, errChan)

go dockerCenterInstance.eventListener()

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Once()
mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "event1",
Name: "name1",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()

eventChan <- events.Message{ID: "event1", Status: "rename"}

time.Sleep(5 * time.Second)
containerLen := len(dockerCenterInstance.containerMap)
assert.Equal(t, 1, containerLen)

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Once()
mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "event1",
Name: "start",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()
eventChan <- events.Message{ID: "event1", Status: "start"}

time.Sleep(5 * time.Second)
// 设置期望
close(eventChan)

containerLen = len(dockerCenterInstance.containerMap)
assert.Equal(t, 1, containerLen)
}

func TestDockerCenterFetchAll(t *testing.T) {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)

mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

mockContainerListResult := []types.Container{
{ID: "id1"},
{ID: "id2"},
{ID: "id3"},
}

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true)

mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil).Once()

mockClient.On("ContainerInspect", mock.Anything, "id1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id1",
Name: "event_name1",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()
mockClient.On("ContainerInspect", mock.Anything, "id2").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id2",
Name: "event_name2",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()
// one failed inspect
mockClient.On("ContainerInspect", mock.Anything, "id3").Return(types.ContainerJSON{}, errors.New("id3 not exist")).Times(3)

err := dockerCenterInstance.fetchAll()
assert.Nil(t, err)

containerLen := len(dockerCenterInstance.containerMap)
assert.Equal(t, 2, containerLen)

mockContainerListResult2 := []types.Container{
{ID: "id4"},
{ID: "id5"},
}

mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult2, nil).Once()

mockClient.On("ContainerInspect", mock.Anything, "id4").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id4",
Name: "event_name4",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()

mockClient.On("ContainerInspect", mock.Anything, "id5").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id5",
Name: "event_name5",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil).Once()

err = dockerCenterInstance.fetchAll()
assert.Nil(t, err)

containerLen = len(dockerCenterInstance.containerMap)
assert.Equal(t, 4, containerLen)
}

func TestDockerCenterFetchAllAndOne(t *testing.T) {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)

mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

mockContainerListResult := []types.Container{
{ID: "id1"},
{ID: "id2"},
}

mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil)

mockClient.On("ContainerInspect", mock.Anything, "id1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id1",
Name: "event_name1",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil)
mockClient.On("ContainerInspect", mock.Anything, "id2").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "id2",
Name: "event_name2",
State: &types.ContainerState{},
},
Config: &container.Config{
Env: make([]string, 0),
},
}, nil)

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Times(2)

err := dockerCenterInstance.fetchAll()
assert.Nil(t, err)

dockerCenterInstance.markRemove("id1")
dockerCenterInstance.markRemove("id2")

containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Times(2)
err = dockerCenterInstance.fetchAll()
assert.Nil(t, err)

containerLen := len(dockerCenterInstance.containerMap)
assert.Equal(t, 2, containerLen)

for _, container := range dockerCenterInstance.containerMap {
assert.Equal(t, true, container.deleteFlag)
}
}
Loading

0 comments on commit 48d786a

Please sign in to comment.