Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: support add remote config and scale deployment #1999

Merged
merged 2 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions test/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
AccessKeyID string `mapstructure:"access_key_id" yaml:"access_key_id"`
AccessKeySecret string `mapstructure:"access_key_secret" yaml:"access_key_secret"`
Endpoint string `mapstructure:"endpoint" yaml:"endpoint"`
QueryEndpoint string `mapstructure:"query_endpoint" yaml:"query_endpoint"`
Aliuid string `mapstructure:"aliuid" yaml:"aliuid"`
Region string `mapstructure:"region" yaml:"region"`
RetryTimeout time.Duration `mapstructure:"retry_timeout" yaml:"retry_timeout"`
Expand Down Expand Up @@ -93,6 +94,7 @@ func ParseConfig() {
TestConfig.AccessKeyID = os.Getenv("ACCESS_KEY_ID")
TestConfig.AccessKeySecret = os.Getenv("ACCESS_KEY_SECRET")
TestConfig.Endpoint = os.Getenv("ENDPOINT")
TestConfig.QueryEndpoint = os.Getenv("QUERY_ENDPOINT")
TestConfig.Aliuid = os.Getenv("ALIUID")
TestConfig.Region = os.Getenv("REGION")
timeout, err := strconv.ParseInt(os.Getenv("RETRY_TIMEOUT"), 10, 64)
Expand Down
11 changes: 11 additions & 0 deletions test/engine/control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ func RemoveHTTPConfig(ctx context.Context, configName string) (context.Context,
return ctx, nil
}

func AddRemoteConfig(ctx context.Context, configName, c string) (context.Context, error) {
if subscriber.TestSubscriber.Name() != "sls" {
return ctx, fmt.Errorf("only support sls subscriber")
}
slsSubscriber := subscriber.TestSubscriber.(*subscriber.SLSSubscriber)
if err := slsSubscriber.UpdateConfig(configName, c); err != nil {
return ctx, err
}
return ctx, nil
}

func completeConfigWithFlusher(c string) string {
if strings.Contains(c, "flushers") {
return c
Expand Down
19 changes: 15 additions & 4 deletions test/engine/control/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/alibaba/ilogtail/test/engine/setup/controller"
)

func AddLabel(ctx context.Context, labelStr string) (context.Context, error) {
func AddLabel(ctx context.Context, labelStr, deploymentName string) (context.Context, error) {
var labels map[string]string
if err := json.Unmarshal([]byte(labelStr), &labels); err != nil {
return ctx, err
Expand All @@ -31,7 +31,7 @@ func AddLabel(ctx context.Context, labelStr string) (context.Context, error) {
K8sLabel: labels,
}
if k8sEnv, ok := setup.Env.(*setup.K8sEnv); ok {
if err := k8sEnv.AddFilter(filter); err != nil {
if err := k8sEnv.AddFilter(deploymentName, filter); err != nil {
return ctx, err
}
} else {
Expand All @@ -40,7 +40,7 @@ func AddLabel(ctx context.Context, labelStr string) (context.Context, error) {
return ctx, nil
}

func RemoveLabel(ctx context.Context, labelStr string) (context.Context, error) {
func RemoveLabel(ctx context.Context, labelStr, deploymentName string) (context.Context, error) {
var labels map[string]string
if err := json.Unmarshal([]byte(labelStr), &labels); err != nil {
return ctx, err
Expand All @@ -49,7 +49,7 @@ func RemoveLabel(ctx context.Context, labelStr string) (context.Context, error)
K8sLabel: labels,
}
if k8sEnv, ok := setup.Env.(*setup.K8sEnv); ok {
if err := k8sEnv.RemoveFilter(filter); err != nil {
if err := k8sEnv.RemoveFilter(deploymentName, filter); err != nil {
return ctx, nil
}
} else {
Expand All @@ -58,6 +58,17 @@ func RemoveLabel(ctx context.Context, labelStr string) (context.Context, error)
return ctx, nil
}

func Scale(ctx context.Context, deploymentName, namespace string, replicas int) (context.Context, error) {
if k8sEnv, ok := setup.Env.(*setup.K8sEnv); ok {
if err := k8sEnv.Scale(deploymentName, namespace, replicas); err != nil {
return ctx, err
}
} else {
return ctx, fmt.Errorf("try to scale, but env is not k8s env")
}
return ctx, nil
}

func ApplyYaml(ctx context.Context, yaml string) (context.Context, error) {
if k8sEnv, ok := setup.Env.(*setup.K8sEnv); ok {
if err := k8sEnv.Apply(yaml); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions test/engine/setup/controller/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ func (c *DeploymentController) RemoveFilter(deploymentName string, filter Contai
return c.waitDeploymentAvailable(deploymentName, filter.K8sNamespace)
}

func (c *DeploymentController) Scale(deploymentName, deploymentNamespace string, replicas int) error {
deployment, err := c.k8sClient.AppsV1().Deployments(deploymentNamespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
if err != nil {
return err
}
replicaInt32 := int32(replicas)
deployment.Spec.Replicas = &replicaInt32
_, err = c.k8sClient.AppsV1().Deployments(deploymentNamespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
return err
}
return c.waitDeploymentAvailable(deploymentName, deploymentNamespace)
}

func (c *DeploymentController) waitDeploymentAvailable(deploymentName, deploymentNamespace string) error {
timeoutCtx, cancel := context.WithTimeout(context.TODO(), config.TestConfig.RetryTimeout)
defer cancel()
Expand Down
12 changes: 8 additions & 4 deletions test/engine/setup/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,16 @@ func (k *K8sEnv) ExecOnSource(ctx context.Context, command string) (string, erro
return k.execInPod(k.config, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, []string{"sh", "-c", command})
}

func (k *K8sEnv) AddFilter(filter controller.ContainerFilter) error {
return k.deploymentController.AddFilter("e2e-generator", filter)
func (k *K8sEnv) AddFilter(deploymentName string, filter controller.ContainerFilter) error {
return k.deploymentController.AddFilter(deploymentName, filter)
}

func (k *K8sEnv) RemoveFilter(filter controller.ContainerFilter) error {
return k.deploymentController.RemoveFilter("e2e-generator", filter)
func (k *K8sEnv) RemoveFilter(deploymentName string, filter controller.ContainerFilter) error {
return k.deploymentController.RemoveFilter(deploymentName, filter)
}

func (k *K8sEnv) Scale(deploymentName string, namespace string, replicas int) error {
return k.deploymentController.Scale(deploymentName, namespace, replicas)
}

func (k *K8sEnv) Apply(filePath string) error {
Expand Down
97 changes: 96 additions & 1 deletion test/engine/setup/subscriber/sls.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
sls "github.com/alibabacloud-go/sls-20201230/v5/client"
"github.com/alibabacloud-go/tea/tea"
"gopkg.in/yaml.v3"

"github.com/alibaba/ilogtail/pkg/doc"
"github.com/alibaba/ilogtail/pkg/protocol"
Expand Down Expand Up @@ -86,6 +87,100 @@ func (s *SLSSubscriber) Stop() error {
return nil
}

func (s *SLSSubscriber) UpdateConfig(configName, c string) error {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
if !strings.Contains(c, "flushers") {
c += s.FlusherConfig()
}
// Get old config first
response, err := s.client.GetLogtailPipelineConfig(tea.String(s.Project), tea.String(configName))
if err != nil {
return err
}
if *response.StatusCode != 200 {
return fmt.Errorf("get config %s failed, status code %d, message %s", configName, *response.StatusCode, response.Body.GoString())
}
config := response.Body
// Merge config
newConfig := make(map[string]interface{})
err = yaml.Unmarshal([]byte(c), newConfig)
if err != nil {
return err
}
if config == nil {
return fmt.Errorf("config %s not found", configName)
}
// Update config
for k, v := range newConfig {
switch k {
case "inputs":
newInput := make([]map[string]interface{}, 0)
if vArray, ok := v.([]interface{}); ok {
for _, vMap := range vArray {
if vMap, ok := vMap.(map[string]interface{}); ok {
newInput = append(newInput, vMap)
} else {
return fmt.Errorf("invalid input type")
}
}
} else {
return fmt.Errorf("invalid input type")
}
config.Inputs = newInput
case "processors":
newProcessor := make([]map[string]interface{}, 0)
if vArray, ok := v.([]interface{}); ok {
for _, vMap := range vArray {
if vMap, ok := vMap.(map[string]interface{}); ok {
newProcessor = append(newProcessor, vMap)
} else {
return fmt.Errorf("invalid processor type")
}
}
} else {
return fmt.Errorf("invalid processor type")
}
config.Processors = newProcessor
case "flushers":
newFlusher := make([]map[string]interface{}, 0)
if vArray, ok := v.([]interface{}); ok {
for _, vMap := range vArray {
if vMap, ok := vMap.(map[string]interface{}); ok {
newFlusher = append(newFlusher, vMap)
} else {
return fmt.Errorf("invalid flusher type")
}
}
} else {
return fmt.Errorf("invalid flusher type")
}
config.Flushers = newFlusher
case "global":
if vMap, ok := v.(map[string]interface{}); ok {
config.Global = vMap
} else {
return fmt.Errorf("invalid global type")
}
}
}
request := &sls.UpdateLogtailPipelineConfigRequest{
ConfigName: tea.String(configName),
Inputs: config.Inputs,
Processors: config.Processors,
Flushers: config.Flushers,
Aggregators: config.Aggregators,
Global: config.Global,
}
fmt.Println("update config", configName, "with", request.GoString())
updateResponse, err := s.client.UpdateLogtailPipelineConfig(tea.String(s.Project), tea.String(configName), request)
if err != nil {
return err
}
if *updateResponse.StatusCode != 200 {
return fmt.Errorf("update config %s failed, status code %d, message %s", configName, *updateResponse.StatusCode, updateResponse.GoString())
}
return nil
}

func (s *SLSSubscriber) getCompleteQuery(query string) string {
if query == "" {
return "*"
Expand Down Expand Up @@ -164,7 +259,7 @@ func init() {
if v, ok := spec["query_endpoint"]; ok {
l.QueryEndpoint = v.(string)
} else {
l.QueryEndpoint = config.TestConfig.Endpoint
l.QueryEndpoint = config.TestConfig.QueryEndpoint
}
if v, ok := spec["telemetry_type"]; ok {
l.TelemetryType = v.(string)
Expand Down
6 changes: 4 additions & 2 deletions test/engine/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func ScenarioInitializer(ctx *godog.ScenarioContext) {
ctx.Given(`^loongcollector container mount \{(.*)\} to \{(.*)\}`, setup.MountVolume)
ctx.Given(`^loongcollector expose port \{(.*)\} to \{(.*)\}`, setup.ExposePort)
ctx.Given(`^\{(.*)\} local config as below`, control.AddLocalConfig)
ctx.Given(`^\{(.*)\} remote config as below`, control.AddRemoteConfig)
ctx.Given(`^\{(.*)\} http config as below`, control.AddHTTPConfig)
ctx.Given(`^remove http config \{(.*)\}`, control.RemoveHTTPConfig)
ctx.Given(`^subcribe data from \{(\S+)\} with config`, subscriber.InitSubscriber)
Expand All @@ -43,8 +44,9 @@ func ScenarioInitializer(ctx *godog.ScenarioContext) {

// When
// ------------------------------------------
ctx.When(`^add k8s label \{(.*)\}`, control.AddLabel)
ctx.When(`^remove k8s label \{(.*)\}`, control.RemoveLabel)
ctx.When(`^add k8s label \{(.*)\} to \{(.*)\}`, control.AddLabel)
ctx.When(`^remove k8s label \{(.*)\} to \{(.*)\}`, control.RemoveLabel)
ctx.When(`^scale deployment \{(.*)\} in namespace \{(.*)\} to \{(\d+)\}`, control.Scale)
ctx.When(`^start docker-compose \{(\S+)\}`, setup.StartDockerComposeEnv)
ctx.When(`^switch working on deployment \{(.*)\}`, setup.SwitchCurrentWorkingDeployment)
ctx.When(`^query through \{(.*)\}`, control.SetQuery)
Expand Down
Loading