Skip to content

Commit 2515cf5

Browse files
authored
Fix CLI flag naming (#2070)
1 parent 397af3a commit 2515cf5

File tree

9 files changed

+181
-72
lines changed

9 files changed

+181
-72
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,12 @@ the [Processors documentation](https://conduit.io/docs/using/processors/getting-
250250
Conduit exposes a gRPC API and an HTTP API.
251251

252252
The gRPC API is by default running on port 8084. You can define a custom address
253-
using the CLI flag `-grpc.address`. To learn more about the gRPC API please have
253+
using the CLI flag `-api.grpc.address`. To learn more about the gRPC API please have
254254
a look at
255255
the [protobuf file](https://github.com/ConduitIO/conduit/blob/main/proto/api/v1/api.proto).
256256

257257
The HTTP API is by default running on port 8080. You can define a custom address
258-
using the CLI flag `-http.address`. It is generated
258+
using the CLI flag `-api.http.address`. It is generated
259259
using [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway) and is thus
260260
providing the same functionality as the gRPC API. To learn more about the HTTP
261261
API please have a look at the [API documentation](https://www.conduit.io/api),

cmd/conduit/cecdysis/decorators.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func getGRPCAddress(cmd *cobra.Command) (string, error) {
8383

8484
path, err = cmd.Flags().GetString("config.path")
8585
if err != nil || path == "" {
86-
path = conduit.DefaultConfig().ConduitCfgPath
86+
path = conduit.DefaultConfig().ConduitCfg.Path
8787
}
8888

8989
var usrCfg conduit.Config

cmd/conduit/root/config/config.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ the set environment variables, and the flags used. This command will show the co
5050
}
5151
}
5252

53-
func printStruct(v reflect.Value, parentPath string) {
53+
func printStruct(ctx context.Context, v reflect.Value, parentPath string) {
5454
if v.Kind() == reflect.Ptr {
5555
v = v.Elem()
5656
}
@@ -68,22 +68,26 @@ func printStruct(v reflect.Value, parentPath string) {
6868

6969
if fieldValue.Kind() == reflect.Struct ||
7070
(fieldValue.Kind() == reflect.Ptr && !fieldValue.IsNil() && fieldValue.Elem().Kind() == reflect.Struct) {
71-
printStruct(fieldValue, fullPath)
71+
printStruct(ctx, fieldValue, fullPath)
7272
continue
7373
}
7474

7575
if longName != "" {
7676
value := fmt.Sprintf("%v", fieldValue.Interface())
7777
if value != "" {
78-
fmt.Printf("%s: %s\n", fullPath, value)
78+
cobraCmd := ecdysis.CobraCmdFromContext(ctx)
79+
_, err := fmt.Fprintf(cobraCmd.OutOrStdout(), "%s: %s\n", fullPath, value)
80+
if err != nil {
81+
fmt.Printf("failed writing config value to out: %v", err)
82+
}
7983
}
8084
}
8185
}
8286
}
8387

8488
func (c *ConfigCommand) Usage() string { return "config" }
8589

86-
func (c ConfigCommand) Execute(_ context.Context) error {
87-
printStruct(reflect.ValueOf(c.RunCmd.Cfg), "")
90+
func (c ConfigCommand) Execute(ctx context.Context) error {
91+
printStruct(ctx, reflect.ValueOf(c.RunCmd.Cfg), "")
8892
return nil
8993
}

cmd/conduit/root/config/config_test.go

+138-40
Original file line numberDiff line numberDiff line change
@@ -18,59 +18,157 @@ import (
1818
"bytes"
1919
"io"
2020
"os"
21-
"reflect"
21+
"slices"
2222
"strings"
2323
"testing"
2424

25-
"github.com/conduitio/conduit/pkg/conduit"
25+
"github.com/conduitio/conduit/cmd/conduit/root/run"
26+
"github.com/conduitio/ecdysis"
2627
"github.com/matryer/is"
2728
)
2829

29-
func TestPrintStructOutput(t *testing.T) {
30-
is := is.New(t)
31-
32-
cfg := conduit.DefaultConfig()
33-
34-
oldStdout := os.Stdout
35-
defer func() { os.Stdout = oldStdout }()
30+
func TestConfig_WithFlags(t *testing.T) {
31+
testCases := []struct {
32+
name string
33+
args []string
34+
wantLines []string
35+
}{
36+
{
37+
name: "with flags (api, config, connectors, db, log)",
38+
args: []string{
39+
"--api.enabled=false",
40+
"--api.grpc.address", "localhost:9999",
41+
"--api.http.address", "localhost:8888",
42+
"--config.path", "/etc/conduit/config.yaml",
43+
"--connectors.path", "/opt/conduit/connectors",
44+
"--db.badger.path", "/var/lib/conduit/data.db",
45+
"--db.postgres.connection-string", "postgres://user:pass@localhost:5432/conduit",
46+
"--db.postgres.table", "my_conduit_store",
47+
"--db.sqlite.path", "/var/lib/conduit/conduit.sqlite",
48+
"--db.sqlite.table", "my_sqlite_store",
49+
"--db.type", "postgres",
50+
"--log.format", "json",
51+
"--log.level", "debug",
52+
},
53+
wantLines: []string{
54+
"config.path: /etc/conduit/config.yaml",
55+
"db.type: postgres",
56+
"db.postgres.table: my_conduit_store",
57+
"db.sqlite.table: my_sqlite_store",
58+
"api.enabled: false",
59+
"api.http.address: localhost:8888",
60+
"api.grpc.address: localhost:9999",
61+
"log.level: debug",
62+
"log.format: json",
63+
"pipelines.exit-on-degraded: false",
64+
"pipelines.error-recovery.min-delay: 1s",
65+
"pipelines.error-recovery.max-delay: 10m0s",
66+
"pipelines.error-recovery.backoff-factor: 2",
67+
"pipelines.error-recovery.max-retries: -1",
68+
"pipelines.error-recovery.max-retries-window: 5m0s",
69+
"schema-registry.type: builtin",
70+
"preview.pipeline-arch-v2: false",
71+
},
72+
},
73+
{
74+
name: "with flags (pipelines, preview, processors, schema, dev)",
75+
args: []string{
76+
"--pipelines.error-recovery.backoff-factor", "5",
77+
"--pipelines.error-recovery.max-delay", "30m",
78+
"--pipelines.error-recovery.max-retries", "10",
79+
"--pipelines.error-recovery.max-retries-window", "15m",
80+
"--pipelines.error-recovery.min-delay", "5s",
81+
"--pipelines.exit-on-degraded=true",
82+
"--pipelines.path", "/var/lib/conduit/pipelines",
83+
"--preview.pipeline-arch-v2=true",
84+
"--processors.path", "/opt/conduit/processors",
85+
"--schema-registry.confluent.connection-string", "http://localhost:8081",
86+
"--schema-registry.type", "confluent",
87+
"--dev.blockprofile", "/tmp/block.prof",
88+
"--dev.cpuprofile", "/tmp/cpu.prof",
89+
"--dev.memprofile", "/tmp/mem.prof",
90+
},
91+
wantLines: []string{
92+
"db.type: badger",
93+
"db.postgres.table: conduit_kv_store",
94+
"db.sqlite.table: conduit_kv_store",
95+
"api.enabled: true",
96+
"api.http.address: :8080",
97+
"api.grpc.address: :8084",
98+
"log.level: info",
99+
"log.format: cli",
100+
"processors.path: /opt/conduit/processors",
101+
"pipelines.path: /var/lib/conduit/pipelines",
102+
"pipelines.exit-on-degraded: true",
103+
"pipelines.error-recovery.min-delay: 5s",
104+
"pipelines.error-recovery.max-delay: 30m0s",
105+
"pipelines.error-recovery.backoff-factor: 5",
106+
"pipelines.error-recovery.max-retries: 10",
107+
"pipelines.error-recovery.max-retries-window: 15m0s",
108+
"schema-registry.type: confluent",
109+
"schema-registry.confluent.connection-string: http://localhost:8081",
110+
"preview.pipeline-arch-v2: false",
111+
"dev.cpuprofile: /tmp/cpu.prof",
112+
"dev.memprofile: /tmp/mem.prof",
113+
"dev.blockprofile: /tmp/block.prof",
114+
},
115+
},
116+
{
117+
name: "default values (no flags)",
118+
args: []string{},
119+
wantLines: []string{
120+
"db.type: badger",
121+
"db.postgres.table: conduit_kv_store",
122+
"db.sqlite.table: conduit_kv_store",
123+
"api.enabled: true",
124+
"api.http.address: :8080",
125+
"api.grpc.address: :8084",
126+
"log.level: info",
127+
"log.format: cli",
128+
"pipelines.exit-on-degraded: false",
129+
"pipelines.error-recovery.min-delay: 1s",
130+
"pipelines.error-recovery.max-delay: 10m0s",
131+
"pipelines.error-recovery.backoff-factor: 2",
132+
"pipelines.error-recovery.max-retries: -1",
133+
"pipelines.error-recovery.max-retries-window: 5m0s",
134+
"schema-registry.type: builtin",
135+
"preview.pipeline-arch-v2: false",
136+
},
137+
},
138+
}
36139

37-
r, w, err := os.Pipe()
38-
is.NoErr(err)
140+
for _, tc := range testCases {
141+
t.Run(tc.name, func(t *testing.T) {
142+
is := is.New(t)
39143

40-
os.Stdout = w
41-
t.Cleanup(func() { os.Stdout = oldStdout })
144+
readFrom, writeTo, err := os.Pipe()
145+
is.NoErr(err)
42146

43-
printStruct(reflect.ValueOf(cfg), "")
147+
e := ecdysis.New()
148+
cmd := e.MustBuildCobraCommand(&ConfigCommand{RunCmd: &run.RunCommand{}})
149+
cmd.SetArgs(tc.args)
150+
cmd.SetOut(writeTo)
151+
cmd.SetErr(writeTo)
44152

45-
err = w.Close()
46-
is.NoErr(err)
153+
err = cmd.Execute()
154+
is.NoErr(err)
47155

48-
var buf bytes.Buffer
49-
_, err = io.Copy(&buf, r)
50-
is.NoErr(err)
156+
err = writeTo.Close()
157+
is.NoErr(err)
51158

52-
output := buf.String()
159+
var buf bytes.Buffer
160+
_, err = io.Copy(&buf, readFrom)
161+
is.NoErr(err)
53162

54-
expectedLines := []string{
55-
"db.type: badger",
56-
"db.postgres.table: conduit_kv_store",
57-
"db.sqlite.table: conduit_kv_store",
58-
"api.enabled: true",
59-
"api.http.address: :8080",
60-
"api.grpc.address: :8084",
61-
"log.level: info",
62-
"log.format: cli",
63-
"pipelines.exit-on-degraded: false",
64-
"pipelines.error-recovery.min-delay: 1s",
65-
"pipelines.error-recovery.max-delay: 10m0s",
66-
"pipelines.error-recovery.backoff-factor: 2",
67-
"pipelines.error-recovery.max-retries: -1",
68-
"pipelines.error-recovery.max-retries-window: 5m0s",
69-
"schema-registry.type: builtin",
70-
"preview.pipeline-arch-v2: false",
71-
}
163+
output := buf.String()
164+
is.True(output != "")
72165

73-
for _, line := range expectedLines {
74-
is.True(strings.Contains(output, line))
166+
outputLines := strings.Split(output, "\n")
167+
for _, line := range tc.wantLines {
168+
if !slices.Contains(outputLines, line) {
169+
t.Errorf("output does not contain expected line: %q", line)
170+
}
171+
}
172+
})
75173
}
76174
}

cmd/conduit/root/initialize/init.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type InitCommand struct {
4646

4747
func (c *InitCommand) Flags() []ecdysis.Flag {
4848
flags := ecdysis.BuildFlags(&c.flags)
49-
flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfgPath))
49+
flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfg.Path))
5050
return flags
5151
}
5252

cmd/conduit/root/run/run.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ func (c *RunCommand) Execute(_ context.Context) error {
5454
}
5555

5656
func (c *RunCommand) Config() ecdysis.Config {
57-
path := filepath.Dir(c.flags.ConduitCfgPath)
57+
path := filepath.Dir(c.flags.ConduitCfg.Path)
5858

5959
return ecdysis.Config{
6060
EnvPrefix: "CONDUIT",
6161
Parsed: &c.Cfg,
62-
Path: c.flags.ConduitCfgPath,
62+
Path: c.flags.ConduitCfg.Path,
6363
DefaultValues: conduit.DefaultConfigWithBasePath(path),
6464
}
6565
}
@@ -75,7 +75,7 @@ func (c *RunCommand) Flags() []ecdysis.Flag {
7575
}
7676

7777
c.Cfg = conduit.DefaultConfigWithBasePath(currentPath)
78-
flags.SetDefault("config.path", c.Cfg.ConduitCfgPath)
78+
flags.SetDefault("config.path", c.Cfg.ConduitCfg.Path)
7979
flags.SetDefault("db.type", c.Cfg.DB.Type)
8080
flags.SetDefault("db.badger.path", c.Cfg.DB.Badger.Path)
8181
flags.SetDefault("db.postgres.connection-string", c.Cfg.DB.Postgres.ConnectionString)

cmd/conduit/root/run/run_test.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
func TestRunCommandFlags(t *testing.T) {
2626
is := is.New(t)
2727

28-
expectedFlags := []struct {
28+
wantFlags := []struct {
2929
longName string
3030
shortName string
3131
usage string
@@ -66,17 +66,20 @@ func TestRunCommandFlags(t *testing.T) {
6666
persistentFlags := c.PersistentFlags()
6767
cmdFlags := c.Flags()
6868

69-
for _, f := range expectedFlags {
69+
for _, wantFlag := range wantFlags {
7070
var cf *pflag.Flag
7171

72-
if f.persistent {
73-
cf = persistentFlags.Lookup(f.longName)
72+
if wantFlag.persistent {
73+
cf = persistentFlags.Lookup(wantFlag.longName)
7474
} else {
75-
cf = cmdFlags.Lookup(f.longName)
75+
cf = cmdFlags.Lookup(wantFlag.longName)
7676
}
77-
is.True(cf != nil)
78-
is.Equal(f.longName, cf.Name)
79-
is.Equal(f.shortName, cf.Shorthand)
80-
is.Equal(cf.Usage, f.usage)
77+
if cf == nil {
78+
t.Logf("flag %q expected, but not found", wantFlag.longName)
79+
t.FailNow()
80+
}
81+
is.Equal(wantFlag.longName, cf.Name)
82+
is.Equal(wantFlag.shortName, cf.Shorthand)
83+
is.Equal(cf.Usage, wantFlag.usage)
8184
}
8285
}

pkg/conduit/config.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ const (
4141

4242
// Config holds all configurable values for Conduit.
4343
type Config struct {
44-
ConduitCfgPath string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"`
44+
ConduitCfg struct {
45+
Path string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"`
46+
} `mapstructure:"config"`
4547

4648
DB struct {
4749
// When Driver is specified it takes precedence over other DB related
@@ -90,19 +92,19 @@ type Config struct {
9092

9193
Pipelines struct {
9294
Path string `long:"pipelines.path" usage:"path to pipelines' directory"`
93-
ExitOnDegraded bool `long:"pipelines.exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"`
95+
ExitOnDegraded bool `long:"pipelines.exit-on-degraded" mapstructure:"exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"`
9496
ErrorRecovery struct {
9597
// MinDelay is the minimum delay before restart: Default: 1 second
96-
MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" usage:"minimum delay before restart"`
98+
MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" mapstructure:"min-delay" usage:"minimum delay before restart"`
9799
// MaxDelay is the maximum delay before restart: Default: 10 minutes
98-
MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" usage:"maximum delay before restart"`
100+
MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" mapstructure:"max-delay" usage:"maximum delay before restart"`
99101
// BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2
100-
BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" usage:"backoff factor applied to the last delay"`
102+
BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" mapstructure:"backoff-factor" usage:"backoff factor applied to the last delay"`
101103
// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
102-
MaxRetries int64 `long:"pipelines.error-recovery.max-retries" usage:"maximum number of retries"`
104+
MaxRetries int64 `long:"pipelines.error-recovery.max-retries" mapstructure:"max-retries" usage:"maximum number of retries"`
103105
// MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes
104-
MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"`
105-
}
106+
MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" mapstructure:"max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"`
107+
} `mapstructure:"error-recovery"`
106108
}
107109

108110
ConnectorPlugins map[string]sdk.Connector
@@ -111,9 +113,9 @@ type Config struct {
111113
Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"`
112114

113115
Confluent struct {
114-
ConnectionString string `long:"schema-registry.confluent.connection-string" usage:"confluent schema registry connection string"`
116+
ConnectionString string `long:"schema-registry.confluent.connection-string" mapstructure:"connection-string" usage:"confluent schema registry connection string"`
115117
}
116-
}
118+
} `mapstructure:"schema-registry"`
117119

118120
Preview struct {
119121
// PipelineArchV2 enables the new pipeline architecture.
@@ -139,7 +141,7 @@ func DefaultConfig() Config {
139141
func DefaultConfigWithBasePath(basePath string) Config {
140142
var cfg Config
141143

142-
cfg.ConduitCfgPath = filepath.Join(basePath, "conduit.yaml")
144+
cfg.ConduitCfg.Path = filepath.Join(basePath, "conduit.yaml")
143145

144146
cfg.DB.Type = DBTypeBadger
145147
cfg.DB.Badger.Path = filepath.Join(basePath, "conduit.db")

0 commit comments

Comments
 (0)