Skip to content

Commit

Permalink
fix: infinite retry container discovery (#1454)
Browse files Browse the repository at this point in the history
* fix: infinite retry container discovery

* fix

* Merge remote-tracking branch 'alibaba/main'

* fix

* fix

* fix
  • Loading branch information
Abingcbc authored and linrunqi08 committed Jun 18, 2024
1 parent a08da25 commit 8643395
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 72 deletions.
65 changes: 45 additions & 20 deletions pkg/helper/container_discover_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ type ContainerDiscoverManager struct {
fetchOneLock sync.Mutex
}

func NewContainerDiscoverManager(enableDockerDiscover, enableCRIDiscover, enableStaticDiscover bool) *ContainerDiscoverManager {
func NewContainerDiscoverManager() *ContainerDiscoverManager {
return &ContainerDiscoverManager{
enableDockerDiscover: enableDockerDiscover,
enableCRIDiscover: enableCRIDiscover,
enableStaticDiscover: enableStaticDiscover,
enableDockerDiscover: false,
enableCRIDiscover: false,
enableStaticDiscover: false,
}
}

Expand Down Expand Up @@ -125,7 +125,7 @@ func (c *ContainerDiscoverManager) fetchCRI() error {
return criRuntimeWrapper.fetchAll()
}

func (c *ContainerDiscoverManager) SyncContainers() {
func (c *ContainerDiscoverManager) StartSyncContainers() {
if c.enableCRIDiscover {
logger.Debug(context.Background(), "discover manager start sync containers goroutine", "cri")
go criRuntimeWrapper.loopSyncContainers()
Expand Down Expand Up @@ -159,8 +159,42 @@ func (c *ContainerDiscoverManager) LogAlarm(err error, msg string) {
}
}

func (c *ContainerDiscoverManager) Init(initTryTimes int) {
func (c *ContainerDiscoverManager) Init() bool {
defer dockerCenterRecover()

// discover which runtime is valid
if IsCRIRuntimeValid(containerdUnixSocket) {
var err error
criRuntimeWrapper, err = NewCRIRuntimeWrapper(dockerCenterInstance)
if err != nil {
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] creare cri-runtime client error: %v", err)
criRuntimeWrapper = nil
} else {
logger.Infof(context.Background(), "[CRIRuntime] create cri-runtime client successfully")
}
}
if ok, err := util.PathExists(DefaultLogtailMountPath); err == nil {
if !ok {
logger.Info(context.Background(), "no docker mount path", "set empty")
DefaultLogtailMountPath = ""
}
} else {
logger.Warning(context.Background(), "check docker mount path error", err.Error())
}
c.enableCRIDiscover = criRuntimeWrapper != nil
c.enableDockerDiscover = dockerCenterInstance.initClient() == nil
c.enableStaticDiscover = isStaticContainerInfoEnabled()
discoverdRuntime := false
if len(os.Getenv("USE_CONTAINERD")) > 0 {
discoverdRuntime = c.enableCRIDiscover
} else {
discoverdRuntime = c.enableCRIDiscover || c.enableDockerDiscover || c.enableStaticDiscover
}
if !discoverdRuntime {
return false
}

// try to connect to runtime
logger.Info(context.Background(), "input", "param", "docker discover", c.enableDockerDiscover, "cri discover", c.enableCRIDiscover, "static discover", c.enableStaticDiscover)
listenLoopIntervalSec := 0
// Get env in the same order as in C Logtail
Expand Down Expand Up @@ -222,31 +256,22 @@ func (c *ContainerDiscoverManager) Init(initTryTimes int) {

var err error
if c.enableDockerDiscover {
for i := 0; i < initTryTimes; i++ {
if err = c.fetchDocker(); err == nil {
break
}
}
if err != nil {
if err = c.fetchDocker(); err != nil {
c.enableDockerDiscover = false
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch docker containers error in %d times, close docker discover", initTryTimes)
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch docker containers error, close docker discover, will retry")
}
}
if c.enableCRIDiscover {
for i := 0; i < initTryTimes; i++ {
if err = c.fetchCRI(); err == nil {
break
}
}
if err != nil {
if err = c.fetchCRI(); err != nil {
c.enableCRIDiscover = false
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch cri containers error in %d times, close cri discover", initTryTimes)
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch cri containers error, close cri discover, will retry")
}
}
if c.enableStaticDiscover {
c.fetchStatic()
}
logger.Info(context.Background(), "final", "param", "docker discover", c.enableDockerDiscover, "cri discover", c.enableCRIDiscover, "static discover", c.enableStaticDiscover)
return c.enableCRIDiscover || c.enableDockerDiscover || c.enableStaticDiscover
}

func (c *ContainerDiscoverManager) TimerFetch() {
Expand Down
43 changes: 14 additions & 29 deletions pkg/helper/docker_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,39 +632,24 @@ func getDockerCenterInstance() *DockerCenter {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)
if IsCRIRuntimeValid(containerdUnixSocket) {
retryTimes := 10
for i := 0; i < retryTimes; i++ {
var err error
criRuntimeWrapper, err = NewCRIRuntimeWrapper(dockerCenterInstance)
if err != nil {
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] creare cri-runtime client error: %v", err)
criRuntimeWrapper = nil
} else {
logger.Infof(context.Background(), "[CRIRuntime] create cri-runtime client successfully")
// containerFindingManager works in a producer-consumer model
// so even manager is not initialized, it will not affect consumers like service_stdout
go func() {
retryCount := 0
containerFindingManager = NewContainerDiscoverManager()
for {
if containerFindingManager.Init() {
break
}
time.Sleep(time.Second * 1)
if i == retryTimes-1 {
logger.Error(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] create cri-runtime client failed")
if retryCount%10 == 0 {
logger.Error(context.Background(), "DOCKER_CENTER_ALARM", "docker center init failed", "retry count", retryCount)
}
retryCount++
time.Sleep(time.Second * 1)
}
}
if ok, err := util.PathExists(DefaultLogtailMountPath); err == nil {
if !ok {
logger.Info(context.Background(), "no docker mount path", "set empty")
DefaultLogtailMountPath = ""
}
} else {
logger.Warning(context.Background(), "check docker mount path error", err.Error())
}
var enableCriFinding = criRuntimeWrapper != nil
var enableDocker = dockerCenterInstance.initClient() == nil
var enableStatic = isStaticContainerInfoEnabled()
containerFindingManager = NewContainerDiscoverManager(enableDocker, enableCriFinding, enableStatic)
containerFindingManager.Init(3)
containerFindingManager.TimerFetch()
containerFindingManager.SyncContainers()
containerFindingManager.TimerFetch()
containerFindingManager.StartSyncContainers()
}()
})
return dockerCenterInstance
}
Expand Down
29 changes: 6 additions & 23 deletions pkg/helper/docker_cri_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ type CRIRuntimeWrapper struct {
}

func IsCRIRuntimeValid(criRuntimeEndpoint string) bool {
if len(os.Getenv("USE_CONTAINERD")) > 0 {
return true
}

// Verify containerd.sock cri valid.
if fi, err := os.Stat(criRuntimeEndpoint); err == nil && !fi.IsDir() {
if IsCRIStatusValid(criRuntimeEndpoint) {
Expand All @@ -105,30 +101,17 @@ func IsCRIStatusValid(criRuntimeEndpoint string) bool {

client := cri.NewRuntimeServiceClient(conn)
// check cri status
for tryCount := 0; tryCount < 5; tryCount++ {
_, err = client.Status(ctx, &cri.StatusRequest{})
if err == nil {
break
}
if strings.Contains(err.Error(), "code = Unimplemented") {
logger.Debug(context.Background(), "Status failed", err)
return false
}
time.Sleep(time.Millisecond * 100)
}
_, err = client.Status(ctx, &cri.StatusRequest{})
if err != nil {
logger.Debug(context.Background(), "Status failed", err)
return false
}
// check running containers
for tryCount := 0; tryCount < 5; tryCount++ {
var containersResp *cri.ListContainersResponse
containersResp, err = client.ListContainers(ctx, &cri.ListContainersRequest{Filter: nil})
if err == nil {
logger.Debug(context.Background(), "ListContainers result", containersResp.Containers)
return containersResp.Containers != nil
}
time.Sleep(time.Millisecond * 100)
var containersResp *cri.ListContainersResponse
containersResp, err = client.ListContainers(ctx, &cri.ListContainersRequest{Filter: nil})
if err == nil {
logger.Debug(context.Background(), "ListContainers result", containersResp.Containers)
return containersResp.Containers != nil
}
logger.Debug(context.Background(), "ListContainers failed", err)
return false
Expand Down

0 comments on commit 8643395

Please sign in to comment.