diff --git a/test/config/config.go b/test/config/config.go index ccbb41d7d1..e013e1991b 100644 --- a/test/config/config.go +++ b/test/config/config.go @@ -46,6 +46,7 @@ type Config struct { AccessKeyID string `mapstructure:"access_key_id" yaml:"access_key_id"` AccessKeySecret string `mapstructure:"access_key_secret" yaml:"access_key_secret"` Endpoint string `mapstructure:"endpoint" yaml:"endpoint"` + QueryEndpoint string `mapstructure:"query_endpoint" yaml:"query_endpoint"` Aliuid string `mapstructure:"aliuid" yaml:"aliuid"` Region string `mapstructure:"region" yaml:"region"` RetryTimeout time.Duration `mapstructure:"retry_timeout" yaml:"retry_timeout"` @@ -93,6 +94,7 @@ func ParseConfig() { TestConfig.AccessKeyID = os.Getenv("ACCESS_KEY_ID") TestConfig.AccessKeySecret = os.Getenv("ACCESS_KEY_SECRET") TestConfig.Endpoint = os.Getenv("ENDPOINT") + TestConfig.QueryEndpoint = os.Getenv("QUERY_ENDPOINT") TestConfig.Aliuid = os.Getenv("ALIUID") TestConfig.Region = os.Getenv("REGION") timeout, err := strconv.ParseInt(os.Getenv("RETRY_TIMEOUT"), 10, 64) diff --git a/test/engine/control/config.go b/test/engine/control/config.go index f670f5cc91..40bd9a481e 100644 --- a/test/engine/control/config.go +++ b/test/engine/control/config.go @@ -125,6 +125,39 @@ func RemoveHTTPConfig(ctx context.Context, configName string) (context.Context, return ctx, nil } +func AddRemoteConfig(ctx context.Context, configName, c string) (context.Context, error) { + if subscriber.TestSubscriber.Name() != "sls" { + return ctx, fmt.Errorf("only support sls subscriber") + } + slsSubscriber := subscriber.TestSubscriber.(*subscriber.SLSSubscriber) + if err := slsSubscriber.UpdateConfig(configName, c); err != nil { + return ctx, err + } + return ctx, nil +} + +func ApplyRemoteConfig(ctx context.Context, configName, machineGroup string) (context.Context, error) { + if subscriber.TestSubscriber.Name() != "sls" { + return ctx, fmt.Errorf("only support sls subscriber") + } + slsSubscriber := subscriber.TestSubscriber.(*subscriber.SLSSubscriber) + if err := slsSubscriber.ApplyConfig(configName, machineGroup); err != nil { + return ctx, err + } + return ctx, nil +} + +func RemoveRemoteConfig(ctx context.Context, configName, machineGroup string) (context.Context, error) { + if subscriber.TestSubscriber.Name() != "sls" { + return ctx, fmt.Errorf("only support sls subscriber") + } + slsSubscriber := subscriber.TestSubscriber.(*subscriber.SLSSubscriber) + if err := slsSubscriber.RemoveConfig(configName, machineGroup); err != nil { + return ctx, err + } + return ctx, nil +} + func completeConfigWithFlusher(c string) string { if strings.Contains(c, "flushers") { return c diff --git a/test/engine/control/kubernetes.go b/test/engine/control/kubernetes.go index 4a7272aba2..3d15b3f9ff 100644 --- a/test/engine/control/kubernetes.go +++ b/test/engine/control/kubernetes.go @@ -22,7 +22,7 @@ import ( "github.com/alibaba/ilogtail/test/engine/setup/controller" ) -func AddLabel(ctx context.Context, labelStr string) (context.Context, error) { +func AddLabel(ctx context.Context, labelStr, deploymentName string) (context.Context, error) { var labels map[string]string if err := json.Unmarshal([]byte(labelStr), &labels); err != nil { return ctx, err @@ -31,7 +31,7 @@ func AddLabel(ctx context.Context, labelStr string) (context.Context, error) { K8sLabel: labels, } if k8sEnv, ok := setup.Env.(*setup.K8sEnv); ok { - if err := k8sEnv.AddFilter(filter); err != nil { + if err := k8sEnv.AddFilter(deploymentName, filter); err != nil { return ctx, err } } else { @@ -40,7 +40,7 @@ func AddLabel(ctx context.Context, labelStr string) (context.Context, error) { return ctx, nil } -func RemoveLabel(ctx context.Context, labelStr string) (context.Context, error) { +func RemoveLabel(ctx context.Context, labelStr, deploymentName string) (context.Context, error) { var labels map[string]string if err := json.Unmarshal([]byte(labelStr), &labels); err != nil { return ctx, err @@ -49,7 +49,7 @@ func RemoveLabel(ctx context.Context, labelStr string) (context.Context, error) K8sLabel: labels, } if k8sEnv, ok := setup.Env.(*setup.K8sEnv); ok { - if err := k8sEnv.RemoveFilter(filter); err != nil { + if err := k8sEnv.RemoveFilter(deploymentName, filter); err != nil { return ctx, nil } } else { @@ -58,6 +58,17 @@ func RemoveLabel(ctx context.Context, labelStr string) (context.Context, error) return ctx, nil } +func Scale(ctx context.Context, deploymentName, namespace string, replicas int) (context.Context, error) { + if k8sEnv, ok := setup.Env.(*setup.K8sEnv); ok { + if err := k8sEnv.Scale(deploymentName, namespace, replicas); err != nil { + return ctx, err + } + } else { + return ctx, fmt.Errorf("try to scale, but env is not k8s env") + } + return ctx, nil +} + func ApplyYaml(ctx context.Context, yaml string) (context.Context, error) { if k8sEnv, ok := setup.Env.(*setup.K8sEnv); ok { if err := k8sEnv.Apply(yaml); err != nil { diff --git a/test/engine/setup/controller/kubernetes.go b/test/engine/setup/controller/kubernetes.go index 8343ebcd51..3f5b897d8f 100644 --- a/test/engine/setup/controller/kubernetes.go +++ b/test/engine/setup/controller/kubernetes.go @@ -153,6 +153,20 @@ func (c *DeploymentController) RemoveFilter(deploymentName string, filter Contai return c.waitDeploymentAvailable(deploymentName, filter.K8sNamespace) } +func (c *DeploymentController) Scale(deploymentName, deploymentNamespace string, replicas int) error { + deployment, err := c.k8sClient.AppsV1().Deployments(deploymentNamespace).Get(context.TODO(), deploymentName, metav1.GetOptions{}) + if err != nil { + return err + } + replicaInt32 := int32(replicas) + deployment.Spec.Replicas = &replicaInt32 + _, err = c.k8sClient.AppsV1().Deployments(deploymentNamespace).Update(context.TODO(), deployment, metav1.UpdateOptions{}) + if err != nil { + return err + } + return c.waitDeploymentAvailable(deploymentName, deploymentNamespace) +} + func (c *DeploymentController) waitDeploymentAvailable(deploymentName, deploymentNamespace string) error { timeoutCtx, cancel := context.WithTimeout(context.TODO(), config.TestConfig.RetryTimeout) defer cancel() diff --git a/test/engine/setup/k8s.go b/test/engine/setup/k8s.go index 6c1463444b..a070f4ca55 100644 --- a/test/engine/setup/k8s.go +++ b/test/engine/setup/k8s.go @@ -115,12 +115,16 @@ func (k *K8sEnv) ExecOnSource(ctx context.Context, command string) (string, erro return k.execInPod(k.config, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, []string{"sh", "-c", command}) } -func (k *K8sEnv) AddFilter(filter controller.ContainerFilter) error { - return k.deploymentController.AddFilter("e2e-generator", filter) +func (k *K8sEnv) AddFilter(deploymentName string, filter controller.ContainerFilter) error { + return k.deploymentController.AddFilter(deploymentName, filter) } -func (k *K8sEnv) RemoveFilter(filter controller.ContainerFilter) error { - return k.deploymentController.RemoveFilter("e2e-generator", filter) +func (k *K8sEnv) RemoveFilter(deploymentName string, filter controller.ContainerFilter) error { + return k.deploymentController.RemoveFilter(deploymentName, filter) +} + +func (k *K8sEnv) Scale(deploymentName string, namespace string, replicas int) error { + return k.deploymentController.Scale(deploymentName, namespace, replicas) } func (k *K8sEnv) Apply(filePath string) error { diff --git a/test/engine/setup/subscriber/sls.go b/test/engine/setup/subscriber/sls.go index c2f074e8c7..00e456c70a 100644 --- a/test/engine/setup/subscriber/sls.go +++ b/test/engine/setup/subscriber/sls.go @@ -9,6 +9,7 @@ import ( openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client" sls "github.com/alibabacloud-go/sls-20201230/v5/client" "github.com/alibabacloud-go/tea/tea" + "gopkg.in/yaml.v3" "github.com/alibaba/ilogtail/pkg/doc" "github.com/alibaba/ilogtail/pkg/protocol" @@ -86,6 +87,122 @@ func (s *SLSSubscriber) Stop() error { return nil } +func (s *SLSSubscriber) UpdateConfig(configName, configYaml string) error { + if !strings.Contains(configYaml, "flushers") { + configYaml += s.FlusherConfig() + } + // Get old config first + response, err := s.client.GetLogtailPipelineConfig(tea.String(s.Project), tea.String(configName)) + if err != nil { + return err + } + if *response.StatusCode != 200 { + return fmt.Errorf("get config %s failed, status code %d, message %s", configName, *response.StatusCode, response.Body.GoString()) + } + config := response.Body + // Merge config + newConfig := make(map[string]interface{}) + err = yaml.Unmarshal([]byte(configYaml), newConfig) + if err != nil { + return err + } + if config == nil { + return fmt.Errorf("config %s not found", configName) + } + // Update config + for k, v := range newConfig { + switch k { + case "inputs": + newInput := make([]map[string]interface{}, 0) + if vArray, ok := v.([]interface{}); ok { + for _, vMap := range vArray { + if vMap, ok := vMap.(map[string]interface{}); ok { + newInput = append(newInput, vMap) + } else { + return fmt.Errorf("invalid input type") + } + } + } else { + return fmt.Errorf("invalid input type") + } + config.Inputs = newInput + case "processors": + newProcessor := make([]map[string]interface{}, 0) + if vArray, ok := v.([]interface{}); ok { + for _, vMap := range vArray { + if vMap, ok := vMap.(map[string]interface{}); ok { + newProcessor = append(newProcessor, vMap) + } else { + return fmt.Errorf("invalid processor type") + } + } + } else { + return fmt.Errorf("invalid processor type") + } + config.Processors = newProcessor + case "flushers": + newFlusher := make([]map[string]interface{}, 0) + if vArray, ok := v.([]interface{}); ok { + for _, vMap := range vArray { + if vMap, ok := vMap.(map[string]interface{}); ok { + newFlusher = append(newFlusher, vMap) + } else { + return fmt.Errorf("invalid flusher type") + } + } + } else { + return fmt.Errorf("invalid flusher type") + } + config.Flushers = newFlusher + case "global": + if vMap, ok := v.(map[string]interface{}); ok { + config.Global = vMap + } else { + return fmt.Errorf("invalid global type") + } + } + } + request := &sls.UpdateLogtailPipelineConfigRequest{ + ConfigName: tea.String(configName), + Inputs: config.Inputs, + Processors: config.Processors, + Flushers: config.Flushers, + Aggregators: config.Aggregators, + Global: config.Global, + } + fmt.Println("update config", configName, "with", request.GoString()) + updateResponse, err := s.client.UpdateLogtailPipelineConfig(tea.String(s.Project), tea.String(configName), request) + if err != nil { + return err + } + if *updateResponse.StatusCode != 200 { + return fmt.Errorf("update config %s failed, status code %d, message %s", configName, *updateResponse.StatusCode, updateResponse.GoString()) + } + return nil +} + +func (s *SLSSubscriber) ApplyConfig(configName, machineGroup string) error { + response, err := s.client.ApplyConfigToMachineGroup(tea.String(s.Project), tea.String(machineGroup), tea.String(configName)) + if err != nil { + return err + } + if *response.StatusCode != 200 { + return fmt.Errorf("apply config %s to machine group %s failed, status code %d, message %s", configName, machineGroup, *response.StatusCode, response.GoString()) + } + return nil +} + +func (s *SLSSubscriber) RemoveConfig(configName, machineGroup string) error { + response, err := s.client.RemoveConfigFromMachineGroup(tea.String(s.Project), tea.String(machineGroup), tea.String(configName)) + if err != nil { + return err + } + if *response.StatusCode != 200 { + return fmt.Errorf("remove config %s from machine group %s failed, status code %d, message %s", configName, machineGroup, *response.StatusCode, response.GoString()) + } + return nil +} + func (s *SLSSubscriber) getCompleteQuery(query string) string { if query == "" { return "*" @@ -164,7 +281,7 @@ func init() { if v, ok := spec["query_endpoint"]; ok { l.QueryEndpoint = v.(string) } else { - l.QueryEndpoint = config.TestConfig.Endpoint + l.QueryEndpoint = config.TestConfig.QueryEndpoint } if v, ok := spec["telemetry_type"]; ok { l.TelemetryType = v.(string) diff --git a/test/engine/steps.go b/test/engine/steps.go index ed921b8b08..69dea13406 100644 --- a/test/engine/steps.go +++ b/test/engine/steps.go @@ -27,6 +27,7 @@ func ScenarioInitializer(ctx *godog.ScenarioContext) { ctx.Given(`^loongcollector container mount \{(.*)\} to \{(.*)\}`, setup.MountVolume) ctx.Given(`^loongcollector expose port \{(.*)\} to \{(.*)\}`, setup.ExposePort) ctx.Given(`^\{(.*)\} local config as below`, control.AddLocalConfig) + ctx.Given(`^\{(.*)\} remote config as below`, control.AddRemoteConfig) ctx.Given(`^\{(.*)\} http config as below`, control.AddHTTPConfig) ctx.Given(`^remove http config \{(.*)\}`, control.RemoveHTTPConfig) ctx.Given(`^subcribe data from \{(\S+)\} with config`, subscriber.InitSubscriber) @@ -43,8 +44,9 @@ func ScenarioInitializer(ctx *godog.ScenarioContext) { // When // ------------------------------------------ - ctx.When(`^add k8s label \{(.*)\}`, control.AddLabel) - ctx.When(`^remove k8s label \{(.*)\}`, control.RemoveLabel) + ctx.When(`^add k8s label \{(.*)\} to \{(.*)\}`, control.AddLabel) + ctx.When(`^remove k8s label \{(.*)\} to \{(.*)\}`, control.RemoveLabel) + ctx.When(`^scale deployment \{(.*)\} in namespace \{(.*)\} to \{(\d+)\}`, control.Scale) ctx.When(`^start docker-compose \{(\S+)\}`, setup.StartDockerComposeEnv) ctx.When(`^switch working on deployment \{(.*)\}`, setup.SwitchCurrentWorkingDeployment) ctx.When(`^query through \{(.*)\}`, control.SetQuery) @@ -52,6 +54,8 @@ func ScenarioInitializer(ctx *godog.ScenarioContext) { ctx.When(`^delete yaml \{(.*)\} from k8s`, control.DeleteYaml) ctx.When(`^restart agent`, control.RestartAgent) ctx.When(`^force restart agent`, control.ForceRestartAgent) + ctx.When(`^apply remote config \{(.*)\} to machine group \{(.*)\}`, control.ApplyRemoteConfig) + ctx.When(`^remove remote config \{(.*)\} from machine group \{(.*)\}`, control.RemoveRemoteConfig) // generate ctx.When(`^begin trigger`, trigger.BeginTrigger)