Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/go_modules/golang.org/x/net-0.17.0
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang authored Dec 10, 2023
2 parents 4159c66 + 07f3646 commit dcafe7f
Show file tree
Hide file tree
Showing 29 changed files with 1,301 additions and 435 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ nacos_cache
*.p12
dist
vendor
bin/

19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# Changelog

#### Version 3.0.8 (2023-08-xx)

Improvements:

- "too many simultaneous queries" from ClickHouse will no longer be a fatal error, instead sinker will keep retrying untill complete the write operation
- Output the offset range per batch

New Features:

- Introduced a task level configuration property "field", which accepts a valid json string and allows additional
fields to be appened to each input message.

#### Version 3.0.7 (2023-07-19)

Improvements:

- when run sinker in cluster mode, caculate lag failure of certain task should not block the entire rebalance work. Instead, the current task should not be assigned to any running sinker instance.
- sinker will not crash when handling incompatible map type.

#### Version 3.0.6 (2023-05-19)

Improvements:
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ pre:

.PHONY: build
build: pre
$(GOBUILD) -ldflags '$(SINKER_LDFLAGS)' -o . ./...
$(GOBUILD) -ldflags '$(SINKER_LDFLAGS)' -o bin/ ./...

.PHONY: debug
debug: pre
$(GOBUILD) -ldflags '$(SINKER_LDFLAGS)' -gcflags "all=-N -l" -o . ./...
$(GOBUILD) -ldflags '$(SINKER_LDFLAGS)' -gcflags "all=-N -l" -o bin/ ./...

.PHONY: benchtest
benchtest: pre
Expand Down
6 changes: 5 additions & 1 deletion cmd/clickhouse_sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var (
runner *task.Sinker
)

const (
HttpPortBase = 10000
)

func initCmdOptions() {
// 1. Set options to default value.
cmdOps = util.CmdOptions{
Expand Down Expand Up @@ -177,7 +181,7 @@ func main() {
// cmdOps.HTTPPort=0: let OS choose the listen port, and record the exact metrics URL to log.
httpPort := cmdOps.HTTPPort
if httpPort == 0 {
httpPort = util.GetSpareTCPPort(httpPort)
httpPort = util.GetSpareTCPPort(HttpPortBase)
}

httpHost := cmdOps.HTTPHost
Expand Down
91 changes: 63 additions & 28 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ limitations under the License.
package config

import (
"net"
"os"
"regexp"
"strings"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/hjson/hjson-go/v4"
"go.uber.org/zap"

Expand All @@ -44,9 +46,10 @@ type Config struct {

// KafkaConfig configuration parameters
type KafkaConfig struct {
Brokers string
Security map[string]string
TLS struct {
Brokers string
ResetSaslRealm bool
Security map[string]string
TLS struct {
Enable bool
CaCertFiles string // CA cert.pem with which Kafka brokers certs be signed. Leave empty for certificates trusted by the OS
ClientCertFile string // Required for client authentication. It's client cert.pem.
Expand Down Expand Up @@ -92,6 +95,7 @@ type ClickHouseConfig struct {
Port int
Username string
Password string
Protocol string //native, http

// Whether enable TLS encryption with clickhouse-server
Secure bool
Expand Down Expand Up @@ -139,6 +143,8 @@ type TaskConfig struct {
WhiteList string // the regexp of white list
BlackList string // the regexp of black list
}
// additional fields to be appended to each input message, should be a valid json string
Fields string `json:"fields,omitempty"`
// PrometheusSchema expects each message is a Prometheus metric(timestamp, value, metric name and a list of labels).
PrometheusSchema bool
// fields match PromLabelsBlackList are not considered as labels. Requires PrometheusSchema be true.
Expand Down Expand Up @@ -246,6 +252,11 @@ func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Cr
err = errors.Newf("kafka SASL mechanism %s is unsupported", cfg.Kafka.Sasl.Mechanism)
return
}

if cfg.Kafka.ResetSaslRealm {
port := getKfkPort(cfg.Kafka.Brokers)
os.Setenv("DOMAIN_REALM", net.JoinHostPort("hadoop."+strings.ToLower(cfg.Kafka.Sasl.GSSAPI.Realm), port))
}
}
if cfg.Clickhouse.RetryTimes < 0 {
cfg.Clickhouse.RetryTimes = 0
Expand All @@ -254,6 +265,18 @@ func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Cr
cfg.Clickhouse.MaxOpenConns = defaultMaxOpenConns
}

if cfg.Clickhouse.Protocol == "" {
cfg.Clickhouse.Protocol = clickhouse.Native.String()
}

if cfg.Clickhouse.Port == 0 {
if cfg.Clickhouse.Protocol == clickhouse.HTTP.String() {
cfg.Clickhouse.Port = 8123
} else {
cfg.Clickhouse.Port = 9000
}
}

if cfg.Task != nil {
cfg.Tasks = append(cfg.Tasks, cfg.Task)
cfg.Task = nil
Expand Down Expand Up @@ -391,45 +414,44 @@ func (cfg *Config) convertKfkSecurity() {
}

if strings.Contains(protocol, "SSL") {
cfg.Kafka.TLS.Enable = true
cfg.Kafka.TLS.EndpIdentAlgo = cfg.Kafka.Security["ssl.endpoint.identification.algorithm"]
cfg.Kafka.TLS.TrustStoreLocation = cfg.Kafka.Security["ssl.truststore.location"]
cfg.Kafka.TLS.TrustStorePassword = cfg.Kafka.Security["ssl.truststore.password"]
cfg.Kafka.TLS.KeystoreLocation = cfg.Kafka.Security["ssl.keystore.location"]
cfg.Kafka.TLS.KeystorePassword = cfg.Kafka.Security["ssl.keystore.password"]
util.TrySetValue(&cfg.Kafka.TLS.Enable, true)
util.TrySetValue(&cfg.Kafka.TLS.EndpIdentAlgo, cfg.Kafka.Security["ssl.endpoint.identification.algorithm"])
util.TrySetValue(&cfg.Kafka.TLS.TrustStoreLocation, cfg.Kafka.Security["ssl.truststore.location"])
util.TrySetValue(&cfg.Kafka.TLS.TrustStorePassword, cfg.Kafka.Security["ssl.truststore.password"])
util.TrySetValue(&cfg.Kafka.TLS.KeystoreLocation, cfg.Kafka.Security["ssl.keystore.location"])
util.TrySetValue(&cfg.Kafka.TLS.KeystorePassword, cfg.Kafka.Security["ssl.keystore.password"])
}

if strings.Contains(protocol, "SASL") {
cfg.Kafka.Sasl.Enable = true
cfg.Kafka.Sasl.Mechanism = cfg.Kafka.Security["sasl.mechanism"]
util.TrySetValue(&cfg.Kafka.Sasl.Enable, true)
util.TrySetValue(&cfg.Kafka.Sasl.Mechanism, cfg.Kafka.Security["sasl.mechanism"])
if config, ok := cfg.Kafka.Security["sasl.jaas.config"]; ok {
configMap := readConfig(config)
if strings.Contains(cfg.Kafka.Sasl.Mechanism, "GSSAPI") {
// GSSAPI
if configMap["useKeyTab"] != "true" {
// Username and password
cfg.Kafka.Sasl.GSSAPI.AuthType = 1
cfg.Kafka.Sasl.GSSAPI.Username = configMap["username"]
cfg.Kafka.Sasl.GSSAPI.Password = configMap["password"]
//Username and password
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.AuthType, 1)
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.Username, configMap["username"])
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.Password, configMap["password"])
} else {
// Keytab
cfg.Kafka.Sasl.GSSAPI.AuthType = 2
cfg.Kafka.Sasl.GSSAPI.KeyTabPath = configMap["keyTab"]
//Keytab
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.AuthType, 2)
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.KeyTabPath, configMap["keyTab"])
if principal, ok := configMap["principal"]; ok {
username := strings.Split(principal, "@")[0]
realm := strings.Split(principal, "@")[1]
cfg.Kafka.Sasl.GSSAPI.Username = username
cfg.Kafka.Sasl.GSSAPI.Realm = realm
}
cfg.Kafka.Sasl.GSSAPI.ServiceName = cfg.Kafka.Security["sasl.kerberos.service.name"]
if cfg.Kafka.Sasl.GSSAPI.KerberosConfigPath == "" {
cfg.Kafka.Sasl.GSSAPI.KerberosConfigPath = defaultKerberosConfigPath
prins := strings.Split(principal, "@")
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.Username, prins[0])
if len(prins) > 1 {
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.Realm, prins[1])
}
}
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.ServiceName, cfg.Kafka.Security["sasl.kerberos.service.name"])
util.TrySetValue(&cfg.Kafka.Sasl.GSSAPI.KerberosConfigPath, defaultKerberosConfigPath)
}
} else {
// PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512
cfg.Kafka.Sasl.Username = configMap["username"]
cfg.Kafka.Sasl.Password = configMap["password"]
util.TrySetValue(&cfg.Kafka.Sasl.Username, configMap["username"])
util.TrySetValue(&cfg.Kafka.Sasl.Password, configMap["password"])
}
}
}
Expand Down Expand Up @@ -461,3 +483,16 @@ func readConfig(config string) map[string]string {
}
return configMap
}

func getKfkPort(brokers string) string {
hosts := strings.Split(brokers, ",")
var port string
for _, host := range hosts {
_, p, err := net.SplitHostPort(host)
if err != nil {
port = p
break
}
}
return port
}
7 changes: 6 additions & 1 deletion config_manager/lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"github.com/housepower/clickhouse_sinker/config"
"github.com/housepower/clickhouse_sinker/input"
"github.com/housepower/clickhouse_sinker/statistics"
"github.com/housepower/clickhouse_sinker/util"
"github.com/thanos-io/thanos/pkg/errors"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -39,7 +41,10 @@ func GetTaskStateAndLags(cfg *config.Config) (stateLags map[string]StateLag, err
var state string
var totalLags int64
if state, totalLags, err = getStateAndLag(theAdm, taskCfg.Topic, taskCfg.ConsumerGroup); err != nil {
return
// skip this task for now, wait next assign cycle
util.Logger.Error("retrieve lag failed", zap.String("task", taskCfg.Name), zap.Error(err))
statistics.ConsumeLags.WithLabelValues(taskCfg.ConsumerGroup, taskCfg.Topic, taskCfg.Name).Set(float64(-1))
continue
}
stateLags[taskCfg.Name] = StateLag{State: state, Lag: totalLags}
statistics.ConsumeLags.WithLabelValues(taskCfg.ConsumerGroup, taskCfg.Topic, taskCfg.Name).Set(float64(totalLags))
Expand Down
6 changes: 4 additions & 2 deletions config_manager/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,10 @@ func (ncm *NacosConfManager) assign() (err error) {

var validTasks []string
for _, taskCfg := range newCfg.Tasks {
if _, ok := stateLags[taskCfg.Name]; ok {
validTasks = append(validTasks, taskCfg.Name)
// make sure all tasks get properly assigned
validTasks = append(validTasks, taskCfg.Name)
if _, ok := stateLags[taskCfg.Name]; !ok {
stateLags[taskCfg.Name] = StateLag{State: "NA", Lag: 0}
}
}
sort.Slice(validTasks, func(i, j int) bool {
Expand Down
10 changes: 9 additions & 1 deletion docs/configuration/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
// retryTimes when error occurs in inserting datas
"retryTimes": 0,
// max open connections with each clickhouse node. default to 1.
"maxOpenConns": 1
"maxOpenConns": 1,
// native or http, if configured secure and http both, means support https. default to native.
"protocol": "native"
},

// Kafka config
Expand All @@ -50,6 +52,8 @@
"sasl.mechanism":"GSSAPI",
"sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true keyTab=\"/etc/security/mmmtest.keytab\" principal=\"[email protected]\";"
},
// whether reset domain realm. if this option is true, domain realm will replaced by "hadoop.{toLower(GSSAPI.Realm)}:{port}", this feature is worked when clickhouse_sinker connect to HUAWEI MRS kerberos kafka.
"resetSaslRealm": false,

// SSL
"tls": {
Expand Down Expand Up @@ -148,6 +152,10 @@
"blackList": "@"
},

// additional fields to be appended to each input message, should be a valid json string
// e.g. fields: "{\"Enable\":true,\"MaxDims\":0,\"Earliest\":false,\"Parser\":\"fastjson\"}"
"fields": "",

// PrometheusSchema expects each message is a Prometheus metric(timestamp, value, metric name and a list of labels).
"prometheusSchema": true,
// the regexp of labels black list, fields match promLabelsBlackList are not considered as part of labels column in series table
Expand Down
Loading

0 comments on commit dcafe7f

Please sign in to comment.