Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
linrunqi08 committed Dec 16, 2024
1 parent 093b01c commit 0f6c3af
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 51 deletions.
13 changes: 5 additions & 8 deletions pkg/helper/container_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,21 @@ func SplitRegexFromMap(input map[string]string) (staticResult map[string]string,
return staticResult, regexResult, nil
}

func CreateDockerClient(opt ...docker.Opt) (*DockerClientWrapper, error) {
func CreateDockerClient(opt ...docker.Opt) (client *docker.Client, err error) {
opt = append(opt, docker.FromEnv)
rawClient, err := docker.NewClientWithOpts(opt...)
client, err = docker.NewClientWithOpts(opt...)
if err != nil {
return nil, err
}
// add dockerClient connectivity tests
pingCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
ping, err := rawClient.Ping(pingCtx)
ping, err := client.Ping(pingCtx)
if err != nil {
return nil, err
}
rawClient.NegotiateAPIVersionPing(ping)

return &DockerClientWrapper{
client: rawClient,
}, nil
client.NegotiateAPIVersionPing(ping)
return
}

func RegisterDockerEventListener(c chan events.Message) {
Expand Down
40 changes: 11 additions & 29 deletions pkg/helper/docker_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package helper
import (
"context"
"hash/fnv"
"io"
"path"
"regexp"
"runtime"
Expand All @@ -28,7 +27,6 @@ import (

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
docker "github.com/docker/docker/client"

"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/util"
Expand Down Expand Up @@ -459,6 +457,7 @@ type DockerCenter struct {
// sandbox meta would be saved to its bound container.
containerMap map[string]*DockerInfoDetail // all containers will in this map
client DockerCenterClientInterface
containerHelper *DockerCenterHelperWrapper
lastErrMu sync.Mutex
lastErr error
lock sync.RWMutex
Expand All @@ -476,38 +475,19 @@ type DockerCenterClientInterface interface {
ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error)
ContainerProcessAlive(pid int) bool
}

type DockerClientWrapper struct {
client *docker.Client
}

func (r *DockerClientWrapper) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
return r.client.ContainerList(ctx, options)
}

func (r *DockerClientWrapper) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) {
return r.client.ImageInspectWithRaw(ctx, imageID)
}

func (r *DockerClientWrapper) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return r.client.ContainerInspect(ctx, containerID)
type DockerCenterHelperInterface interface {
ContainerProcessAlive(pid int) bool
}

func (r *DockerClientWrapper) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
return r.client.Events(ctx, options)
type DockerCenterHelperWrapper struct {
}

func (r *DockerClientWrapper) ContainerProcessAlive(pid int) bool {
func (r *DockerCenterHelperWrapper) ContainerProcessAlive(pid int) bool {
return ContainerProcessAlive(pid)
}

func (r *DockerClientWrapper) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return r.client.ContainerLogs(ctx, container, options)
}

func getIPByHosts(hostFileName, hostname string) string {
lines, err := util.ReadLines(hostFileName)
if err != nil {
Expand Down Expand Up @@ -672,7 +652,9 @@ func getDockerCenterInstance() *DockerCenter {
logger.InitLogger()
// load EnvTags first
LoadEnvTags()
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance = &DockerCenter{
containerHelper: &DockerCenterHelperWrapper{},
}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)
// containerFindingManager works in a producer-consumer model
Expand Down Expand Up @@ -1080,7 +1062,7 @@ func (dc *DockerCenter) fetchAll() error {
time.Sleep(time.Second * 5)
}
if err == nil {
if !dc.client.ContainerProcessAlive(containerDetail.State.Pid) {
if !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
continue
}
containerMap[container.ID] = dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)
Expand All @@ -1100,7 +1082,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error
dc.setLastError(err, "inspect container error "+containerID)
return err
}
if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) {
if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
}
// docker 场景下
Expand All @@ -1114,7 +1096,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error
if err != nil {
dc.setLastError(err, "inspect sandbox container error "+id)
} else {
if containerDetail.State.Status == ContainerStatusRunning && !dc.client.ContainerProcessAlive(containerDetail.State.Pid) {
if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
}
dc.updateContainer(id, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false))
Expand Down
28 changes: 16 additions & 12 deletions pkg/helper/docker_center_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package helper
import (
"context"
"errors"
"io"
"os"
"sync"
"testing"
Expand Down Expand Up @@ -349,6 +348,10 @@ type DockerClientMock struct {
mock.Mock
}

type ContainerHelperMock struct {
mock.Mock
}

// Events 实现了 DockerClient 的 Events 方法
func (m *DockerClientMock) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
args := m.Called(ctx, options)
Expand All @@ -369,16 +372,11 @@ func (m *DockerClientMock) ContainerList(ctx context.Context, options types.Cont
return args.Get(0).([]types.Container), args.Error(1)
}

func (m *DockerClientMock) ContainerProcessAlive(pid int) bool {
func (m *ContainerHelperMock) ContainerProcessAlive(pid int) bool {
args := m.Called(pid)
return args.Get(0).(bool)
}

func (m *DockerClientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
args := m.Called(ctx, container, options)
return args.Get(0).(io.ReadCloser), args.Error(1)
}

func TestDockerCenterEvents(t *testing.T) {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
Expand All @@ -387,6 +385,8 @@ func TestDockerCenterEvents(t *testing.T) {
mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

// 创建一个模拟的事件通道
eventChan := make(chan events.Message, 1)
errChan := make(chan error, 1)
Expand All @@ -395,7 +395,7 @@ func TestDockerCenterEvents(t *testing.T) {

go dockerCenterInstance.eventListener()

mockClient.On("ContainerProcessAlive", mock.Anything).Return(false).Once()
containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Once()
mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "event1",
Expand All @@ -413,7 +413,7 @@ func TestDockerCenterEvents(t *testing.T) {
containerLen := len(dockerCenterInstance.containerMap)
assert.Equal(t, 1, containerLen)

mockClient.On("ContainerProcessAlive", mock.Anything).Return(true).Once()
containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Once()
mockClient.On("ContainerInspect", mock.Anything, "event1").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "event1",
Expand Down Expand Up @@ -442,13 +442,15 @@ func TestDockerCenterFetchAll(t *testing.T) {
mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

mockContainerListResult := []types.Container{
{ID: "id1"},
{ID: "id2"},
{ID: "id3"},
}

mockClient.On("ContainerProcessAlive", mock.Anything).Return(true)
containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true)

mockClient.On("ContainerList", mock.Anything, mock.Anything).Return(mockContainerListResult, nil).Once()

Expand Down Expand Up @@ -525,6 +527,8 @@ func TestDockerCenterFetchAllAndOne(t *testing.T) {
mockClient := DockerClientMock{}
dockerCenterInstance.client = &mockClient

containerHelper := ContainerHelperMock{}

mockContainerListResult := []types.Container{
{ID: "id1"},
{ID: "id2"},
Expand Down Expand Up @@ -553,15 +557,15 @@ func TestDockerCenterFetchAllAndOne(t *testing.T) {
},
}, nil)

mockClient.On("ContainerProcessAlive", mock.Anything).Return(true).Times(2)
containerHelper.On("ContainerProcessAlive", mock.Anything).Return(true).Times(2)

err := dockerCenterInstance.fetchAll()
assert.Nil(t, err)

dockerCenterInstance.markRemove("id1")
dockerCenterInstance.markRemove("id2")

mockClient.On("ContainerProcessAlive", mock.Anything).Return(false).Times(2)
containerHelper.On("ContainerProcessAlive", mock.Anything).Return(false).Times(2)
err = dockerCenterInstance.fetchAll()
assert.Nil(t, err)

Expand Down
5 changes: 3 additions & 2 deletions plugins/input/docker/rawstdout/input_docker_stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/docker/docker/api/types"
docker "github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"

"github.com/alibaba/ilogtail/pkg/helper"
Expand Down Expand Up @@ -83,7 +84,7 @@ type stdoutSyner struct {
ExternalK8sLabelTag map[string]string

info *helper.DockerInfoDetail
client helper.DockerCenterClientInterface
client *docker.Client
startCheckPoint string
lock sync.Mutex
stdoutCheckPoint *StdoutCheckPoint
Expand Down Expand Up @@ -360,7 +361,7 @@ type ServiceDockerStdout struct {
K8sFilter *helper.K8SFilter

synerMap map[string]*stdoutSyner
client helper.DockerCenterClientInterface
client *docker.Client
shutdown chan struct{}
waitGroup sync.WaitGroup
context pipeline.Context
Expand Down

0 comments on commit 0f6c3af

Please sign in to comment.