From 99b7b748d9ce367090a8f96da5dca430259c5bee Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 5 Jan 2024 11:41:55 +0800 Subject: [PATCH] [close #381] Support Kafka sink (#380) * wip Signed-off-by: Ping Yu * add go:generate Signed-off-by: Ping Yu * add tools/Makefile Signed-off-by: Ping Yu * fix build error Signed-off-by: Ping Yu * built Signed-off-by: Ping Yu * fix Signed-off-by: Ping Yu * fix ut Signed-off-by: Ping Yu * clean up Signed-off-by: Ping Yu * fix ut Signed-off-by: Ping Yu * add ut Signed-off-by: Ping Yu * fix check Signed-off-by: Ping Yu * polish Signed-off-by: Ping Yu --------- Signed-off-by: Ping Yu --- cdc/.gitignore | 4 +- cdc/cdc/model/kv.go | 9 +- cdc/cdc/model/sink.go | 36 ++ cdc/cdc/model/sink_gen.go | 59 +++ cdc/cdc/model/sink_gen_test.go | 3 + cdc/cdc/sink/codec/codec_test.go | 154 ++++++ cdc/cdc/sink/codec/interface.go | 164 ++++++ cdc/cdc/sink/codec/interface_test.go | 54 ++ cdc/cdc/sink/codec/json.go | 551 +++++++++++++++++++++ cdc/cdc/sink/codec/json_test.go | 294 +++++++++++ cdc/cdc/sink/mq.go | 370 ++++++++++++++ cdc/cdc/sink/mq_test.go | 212 ++++++++ cdc/cdc/sink/producer/kafka/config.go | 285 +++++++++++ cdc/cdc/sink/producer/kafka/config_test.go | 162 ++++++ cdc/cdc/sink/producer/kafka/kafka.go | 470 ++++++++++++++++++ cdc/cdc/sink/producer/kafka/kafka_test.go | 445 +++++++++++++++++ cdc/cdc/sink/producer/mq_producer.go | 35 ++ cdc/cdc/sink/sink.go | 53 +- cdc/cdc/sink/sink_test.go | 28 +- cdc/errors.toml | 4 +- cdc/pkg/errors/errors.go | 2 +- 21 files changed, 3374 insertions(+), 20 deletions(-) create mode 100644 cdc/cdc/model/sink.go create mode 100644 cdc/cdc/model/sink_gen.go create mode 100644 cdc/cdc/model/sink_gen_test.go create mode 100644 cdc/cdc/sink/codec/codec_test.go create mode 100644 cdc/cdc/sink/codec/interface.go create mode 100644 cdc/cdc/sink/codec/interface_test.go create mode 100644 cdc/cdc/sink/codec/json.go create mode 100644 cdc/cdc/sink/codec/json_test.go create mode 100644 cdc/cdc/sink/mq.go create mode 100644 cdc/cdc/sink/mq_test.go create mode 100644 cdc/cdc/sink/producer/kafka/config.go create mode 100644 cdc/cdc/sink/producer/kafka/config_test.go create mode 100644 cdc/cdc/sink/producer/kafka/kafka.go create mode 100644 cdc/cdc/sink/producer/kafka/kafka_test.go create mode 100644 cdc/cdc/sink/producer/mq_producer.go diff --git a/cdc/.gitignore b/cdc/.gitignore index 04626eb1..fe692800 100644 --- a/cdc/.gitignore +++ b/cdc/.gitignore @@ -42,8 +42,8 @@ cmd/cdc/cdc tiflash-config-preprocessed.toml # Files generated when running docker-compose -deployments/ticdc/docker-compose/data -deployments/ticdc/docker-compose/logs +deployments/tikv-cdc/docker-compose/data +deployments/tikv-cdc/docker-compose/logs # Binary file when running intergration test integration/integration diff --git a/cdc/cdc/model/kv.go b/cdc/cdc/model/kv.go index 4497bbbc..2937aa86 100644 --- a/cdc/cdc/model/kv.go +++ b/cdc/cdc/model/kv.go @@ -25,11 +25,12 @@ import ( type OpType int // OpType for kv +// Use explicit values to avoid compatibility issues. const ( - OpTypeUnknow OpType = iota - OpTypePut - OpTypeDelete - OpTypeResolved + OpTypeUnknown OpType = 0 + OpTypePut OpType = 1 + OpTypeDelete OpType = 2 + OpTypeResolved OpType = 3 ) // RegionFeedEvent from the kv layer. diff --git a/cdc/cdc/model/sink.go b/cdc/cdc/model/sink.go new file mode 100644 index 00000000..0568bb4f --- /dev/null +++ b/cdc/cdc/model/sink.go @@ -0,0 +1,36 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +//go:generate msgp + +// MqMessageType is the type of message +type MqMessageType int + +// Use explicit values to avoid compatibility issues. +const ( + // MqMessageTypeUnknown is unknown type of message key + MqMessageTypeUnknown MqMessageType = 0 + + // MqMessageTypeRow is row type of message key + // MqMessageTypeRow MqMessageType = 1 + + // MqMessageTypeDDL is ddl type of message key + // MqMessageTypeDDL MqMessageType = 2 + + // MqMessageTypeResolved is resolved type of message key + MqMessageTypeResolved MqMessageType = 3 + // MqMessageTypeKv is RawKV entry type of message key + MqMessageTypeKv MqMessageType = 4 +) diff --git a/cdc/cdc/model/sink_gen.go b/cdc/cdc/model/sink_gen.go new file mode 100644 index 00000000..ab27f72d --- /dev/null +++ b/cdc/cdc/model/sink_gen.go @@ -0,0 +1,59 @@ +package model + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *MqMessageType) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 int + zb0001, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = MqMessageType(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z MqMessageType) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteInt(int(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z MqMessageType) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendInt(o, int(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *MqMessageType) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 int + zb0001, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = MqMessageType(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z MqMessageType) Msgsize() (s int) { + s = msgp.IntSize + return +} diff --git a/cdc/cdc/model/sink_gen_test.go b/cdc/cdc/model/sink_gen_test.go new file mode 100644 index 00000000..38768a5c --- /dev/null +++ b/cdc/cdc/model/sink_gen_test.go @@ -0,0 +1,3 @@ +package model + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. diff --git a/cdc/cdc/sink/codec/codec_test.go b/cdc/cdc/sink/codec/codec_test.go new file mode 100644 index 00000000..df655e2f --- /dev/null +++ b/cdc/cdc/sink/codec/codec_test.go @@ -0,0 +1,154 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "bytes" + "compress/zlib" + "testing" + + "github.com/pingcap/check" + "github.com/tikv/migration/cdc/cdc/model" + "github.com/tikv/migration/cdc/pkg/util/testleak" +) + +func Test(t *testing.T) { check.TestingT(t) } + +var ( + codecEntryCases = [][]*model.RawKVEntry{{{ + OpType: model.OpTypePut, + Key: []byte("indexInfo_:_pf01_:_APD0101_:_0000000000000000000"), + Value: []byte("0000000000000000000"), + CRTs: 424316552636792833, + ExpiredTs: 424316552636792833, + }}, {{ + OpType: model.OpTypePut, + Key: []byte("indexInfo_:_pf01_:_APD0101_:_0000000000000000001"), + Value: []byte("0000000000000000001"), + CRTs: 424316553934667777, + ExpiredTs: 424316552636792833, + }, { + OpType: model.OpTypeDelete, + Key: []byte("indexInfo_:_pf01_:_APD0101_:_0000000000000000002"), + CRTs: 424316554327097345, + }, { + OpType: model.OpTypeDelete, + Key: []byte("indexInfo_:_pf01_:_APD0101_:_0000000000000000003"), + CRTs: 424316554746789889, + }, { + OpType: model.OpTypePut, + Key: []byte("indexInfo_:_pf01_:_APD0101_:_0000000000000000004"), + Value: []byte("0000000000000000004"), + CRTs: 424316555073945601, + }}, {}} + + codecResolvedTSCases = [][]uint64{{424316592563683329}, {424316594097225729, 424316594214141953, 424316594345213953}, {}} + + codecBenchmarkKvChanges = codecEntryCases[1] + + codecJSONEncodedKvChanges = []*MQMessage{} +) + +var _ = check.Suite(&codecTestSuite{}) + +type codecTestSuite struct{} + +func (s *codecTestSuite) checkCompressedSize(messages []*MQMessage) (int, int) { + var buff bytes.Buffer + writer := zlib.NewWriter(&buff) + originalSize := 0 + for _, message := range messages { + originalSize += len(message.Key) + len(message.Value) + if len(message.Key) > 0 { + _, _ = writer.Write(message.Key) + } + _, _ = writer.Write(message.Value) + } + writer.Close() + return originalSize, buff.Len() +} + +func (s *codecTestSuite) encodeKvCase(c *check.C, encoder EventBatchEncoder, events []*model.RawKVEntry) []*MQMessage { + msg, err := codecEncodeKvCase(encoder, events) + c.Assert(err, check.IsNil) + return msg +} + +func (s *codecTestSuite) TestJsonVsNothing(c *check.C) { + defer testleak.AfterTest(c)() + c.Logf("| case | json size | json compressed |") + c.Logf("| :--- | :-------- | :-------------- |") + for i, cs := range codecEntryCases { + if len(cs) == 0 { + continue + } + jsonEncoder := NewJSONEventBatchEncoder() + jsonMessages := s.encodeKvCase(c, jsonEncoder, cs) + jsonOriginal, jsonCompressed := s.checkCompressedSize(jsonMessages) + c.Logf("| case %d | %d | %d (%d%%)- |", i, + jsonOriginal, jsonCompressed, 100-100*jsonCompressed/jsonOriginal) + } +} + +func codecEncodeKvCase(encoder EventBatchEncoder, events []*model.RawKVEntry) ([]*MQMessage, error) { + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + if err != nil { + return nil, err + } + + for _, event := range events { + _, err := encoder.AppendChangedEvent(event) + if err != nil { + return nil, err + } + } + + if len(events) > 0 { + return encoder.Build(), nil + } + return nil, nil +} + +func init() { + var err error + if codecJSONEncodedKvChanges, err = codecEncodeKvCase(NewJSONEventBatchEncoder(), codecBenchmarkKvChanges); err != nil { + panic(err) + } +} + +func BenchmarkJsonEncoding(b *testing.B) { + for i := 0; i < b.N; i++ { + _, _ = codecEncodeKvCase(NewJSONEventBatchEncoder(), codecBenchmarkKvChanges) + } +} + +func BenchmarkJsonDecoding(b *testing.B) { + for i := 0; i < b.N; i++ { + for _, message := range codecJSONEncodedKvChanges { + decoder, err := NewJSONEventBatchDecoder(message.Key, message.Value) + if err != nil { + panic(err) + } + for { + if _, hasNext, err := decoder.HasNext(); err != nil { + panic(err) + } else if hasNext { + _, _ = decoder.NextChangedEvent() + } else { + break + } + } + } + } +} diff --git a/cdc/cdc/sink/codec/interface.go b/cdc/cdc/sink/codec/interface.go new file mode 100644 index 00000000..5b2faaa8 --- /dev/null +++ b/cdc/cdc/sink/codec/interface.go @@ -0,0 +1,164 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "context" + "encoding/binary" + "time" + + "github.com/pingcap/log" + "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/migration/cdc/cdc/model" + "github.com/tikv/migration/cdc/pkg/config" + "github.com/tikv/migration/cdc/pkg/security" + "go.uber.org/zap" +) + +// EventBatchEncoder is an abstraction for events encoder +type EventBatchEncoder interface { + // EncodeCheckpointEvent appends a checkpoint event into the batch. + // This event will be broadcast to all partitions to signal a global checkpoint. + EncodeCheckpointEvent(ts uint64) (*MQMessage, error) + // AppendChangedEvent appends a changed event into the batch + AppendChangedEvent(e *model.RawKVEntry) (EncoderResult, error) + // AppendResolvedEvent appends a resolved event into the batch. + // This event is used to tell the encoder that no event prior to ts will be sent. + AppendResolvedEvent(ts uint64) (EncoderResult, error) + // Build builds the batch and returns the bytes of key and value. + Build() []*MQMessage + // MixedBuild builds the batch and returns the bytes of mixed keys and values. + // This is used for cdc log, to merge key and value into one byte slice + // when first create file, we should set withVersion to true, to tell us that + // the first 8 byte represents the encoder version + // TODO decouple it out + MixedBuild(withVersion bool) []byte + // Size returns the size of the batch(bytes) + // Deprecated: Size is deprecated + Size() int + // Reset reset the kv buffer + Reset() + // SetParams provides the encoder with more info on the sink + SetParams(params map[string]string) error +} + +// MQMessage represents an MQ message to the mqSink +type MQMessage struct { + Key []byte + Value []byte + Ts uint64 // reserved for possible output sorting + Type model.MqMessageType // type + Protocol config.Protocol // protocol + entriesCount int // entries in one MQ Message +} + +// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. +// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 +// for TiKV-CDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer. +const maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 + +// Length returns the expected size of the Kafka message +// We didn't append any `Headers` when send the message, so ignore the calculations related to it. +// If `ProducerMessage` Headers fields used, this method should also adjust. +func (m *MQMessage) Length() int { + return len(m.Key) + len(m.Value) + maximumRecordOverhead +} + +// PhysicalTime returns physical time part of Ts in time.Time +func (m *MQMessage) PhysicalTime() time.Time { + return oracle.GetTimeFromTS(m.Ts) +} + +// GetEntriesCount returns the number of entries batched in one MQMessage +func (m *MQMessage) GetEntriesCount() int { + return m.entriesCount +} + +// SetEntriesCount set the number of entries +func (m *MQMessage) SetEntriesCount(cnt int) { + m.entriesCount = cnt +} + +// IncEntriesCount increase the number of entries +func (m *MQMessage) IncEntriesCount() { + m.entriesCount++ +} + +func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *MQMessage { + return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved) +} + +// NewMQMessage should be used when creating a MQMessage struct. +// It copies the input byte slices to avoid any surprises in asynchronous MQ writes. +func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType) *MQMessage { + ret := &MQMessage{ + Key: nil, + Value: nil, + Ts: ts, + Type: ty, + Protocol: proto, + entriesCount: 0, + } + + if key != nil { + ret.Key = make([]byte, len(key)) + copy(ret.Key, key) + } + + if value != nil { + ret.Value = make([]byte, len(value)) + copy(ret.Value, value) + } + + return ret +} + +// EventBatchDecoder is an abstraction for events decoder +// this interface is only for testing now +type EventBatchDecoder interface { + // HasNext returns + // 1. the type of the next event + // 2. a bool if the next event is exist + // 3. error + HasNext() (model.MqMessageType, bool, error) + // NextResolvedEvent returns the next resolved event if exists + NextResolvedEvent() (uint64, error) + // NextChangedEvent returns the next row changed event if exists + NextChangedEvent() (*model.RawKVEntry, error) +} + +// EncoderResult indicates an action request by the encoder to the mqSink +type EncoderResult uint8 + +// Enum types of EncoderResult +const ( + EncoderNoOperation EncoderResult = iota + EncoderNeedAsyncWrite + EncoderNeedSyncWrite +) + +type EncoderBuilder interface { + Build(ctx context.Context) (EventBatchEncoder, error) +} + +// NewEventBatchEncoderBuilder returns an EncoderBuilder +func NewEventBatchEncoderBuilder(p config.Protocol, credential *security.Credential, opts map[string]string) (EncoderBuilder, error) { + switch p { + case config.ProtocolDefault, config.ProtocolOpen: + return newJSONEventBatchEncoderBuilder(opts), nil + default: + log.Warn("unknown codec protocol value of EventBatchEncoder, use open-protocol as the default", zap.Int("protocol_value", int(p))) + return newJSONEventBatchEncoderBuilder(opts), nil + } +} diff --git a/cdc/cdc/sink/codec/interface_test.go b/cdc/cdc/sink/codec/interface_test.go new file mode 100644 index 00000000..1d501833 --- /dev/null +++ b/cdc/cdc/sink/codec/interface_test.go @@ -0,0 +1,54 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "github.com/pingcap/check" + "github.com/tikv/migration/cdc/cdc/model" + "github.com/tikv/migration/cdc/pkg/config" + "github.com/tikv/migration/cdc/pkg/util/testleak" +) + +type codecInterfaceSuite struct{} + +var _ = check.Suite(&codecInterfaceSuite{}) + +func (s *codecInterfaceSuite) SetUpSuite(c *check.C) { +} + +func (s *codecInterfaceSuite) TearDownSuite(c *check.C) { +} + +func (s *codecInterfaceSuite) TestCreate(c *check.C) { + defer testleak.AfterTest(c)() + event := &model.RawKVEntry{ + StartTs: 1234, + CRTs: 5678, + } + + msg := NewMQMessage(config.ProtocolOpen, []byte("key1"), []byte("value1"), event.CRTs, model.MqMessageTypeKv) + + c.Assert(msg.Key, check.BytesEquals, []byte("key1")) + c.Assert(msg.Value, check.BytesEquals, []byte("value1")) + c.Assert(msg.Ts, check.Equals, event.CRTs) + c.Assert(msg.Type, check.Equals, model.MqMessageTypeKv) + c.Assert(msg.Protocol, check.Equals, config.ProtocolOpen) + + msg = newResolvedMQMessage(config.ProtocolOpen, nil, nil, 1234) + c.Assert(msg.Key, check.IsNil) + c.Assert(msg.Value, check.IsNil) + c.Assert(msg.Ts, check.Equals, uint64(1234)) + c.Assert(msg.Type, check.Equals, model.MqMessageTypeResolved) + c.Assert(msg.Protocol, check.Equals, config.ProtocolOpen) +} diff --git a/cdc/cdc/sink/codec/json.go b/cdc/cdc/sink/codec/json.go new file mode 100644 index 00000000..6ab9961f --- /dev/null +++ b/cdc/cdc/sink/codec/json.go @@ -0,0 +1,551 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "strconv" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/migration/cdc/cdc/model" + "github.com/tikv/migration/cdc/pkg/config" + cerror "github.com/tikv/migration/cdc/pkg/errors" + "go.uber.org/zap" +) + +const ( + // BatchVersion1 represents the version of batch format + BatchVersion1 uint64 = 1 + // DefaultMaxBatchSize sets the default value for max-batch-size + DefaultMaxBatchSize int = 16 +) + +type mqMessageKey struct { + // CRTs in key to keep all versions in MQ against compaction. + CRTs uint64 `json:"ts"` + Key []byte `json:"k,omitempty"` + Type model.MqMessageType `json:"t"` +} + +func (m *mqMessageKey) Encode() ([]byte, error) { + data, err := json.Marshal(m) + return data, cerror.WrapError(cerror.ErrMarshalFailed, err) +} + +func (m *mqMessageKey) Decode(data []byte) error { + return cerror.WrapError(cerror.ErrUnmarshalFailed, json.Unmarshal(data, m)) +} + +type mqMessageValue struct { + OpType model.OpType `json:"op"` + Value []byte `json:"v,omitempty"` + ExpiredTs *uint64 `json:"ex,omitempty"` +} + +func (m *mqMessageValue) Encode() ([]byte, error) { + data, err := json.Marshal(m) + return data, cerror.WrapError(cerror.ErrMarshalFailed, err) +} + +func (m *mqMessageValue) Decode(data []byte) error { + decoder := json.NewDecoder(bytes.NewReader(data)) + decoder.UseNumber() + err := decoder.Decode(m) + if err != nil { + return cerror.WrapError(cerror.ErrUnmarshalFailed, err) + } + return nil +} + +func newResolvedMessage(ts uint64) *mqMessageKey { + return &mqMessageKey{ + CRTs: ts, + Type: model.MqMessageTypeResolved, + } +} + +func encodeExpiredTs(e *model.RawKVEntry) *uint64 { + expiredTs := e.ExpiredTs + if expiredTs == 0 { + return nil + } + return &expiredTs +} + +func decodeExpiredTs(expiredTs *uint64) uint64 { + if expiredTs == nil { + return 0 + } + return *expiredTs +} + +func kvEventToMqMessage(e *model.RawKVEntry) (*mqMessageKey, *mqMessageValue) { + key := &mqMessageKey{ + e.CRTs, + e.Key, + model.MqMessageTypeKv, + } + value := &mqMessageValue{ + e.OpType, + e.Value, + encodeExpiredTs(e), + } + return key, value +} + +func mqMessageToKvEvent(key *mqMessageKey, value *mqMessageValue) *model.RawKVEntry { + return &model.RawKVEntry{ + OpType: value.OpType, + Key: key.Key, + Value: value.Value, + CRTs: key.CRTs, + ExpiredTs: decodeExpiredTs(value.ExpiredTs), + } +} + +// JSONEventBatchEncoder encodes the events into the byte of a batch into. +type JSONEventBatchEncoder struct { + // TODO remove deprecated fields + keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now + valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now + supportMixedBuild bool // TODO decouple this out + + messageBuf []*MQMessage + curBatchSize int + // configs + maxMessageBytes int + maxBatchSize int +} + +// GetMaxMessageBytes is only for unit testing. +func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int { + return d.maxMessageBytes +} + +// GetMaxBatchSize is only for unit testing. +func (d *JSONEventBatchEncoder) GetMaxBatchSize() int { + return d.maxBatchSize +} + +// SetMixedBuildSupport is used by CDC Log +func (d *JSONEventBatchEncoder) SetMixedBuildSupport(enabled bool) { + d.supportMixedBuild = enabled +} + +// AppendResolvedEvent is no-op +func (d *JSONEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) { + return EncoderNoOperation, nil +} + +// EncodeCheckpointEvent implements the EventBatchEncoder interface +func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { + keyMsg := newResolvedMessage(ts) + key, err := keyMsg.Encode() + if err != nil { + return nil, errors.Trace(err) + } + + var keyLenByte [8]byte + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + var valueLenByte [8]byte + binary.BigEndian.PutUint64(valueLenByte[:], 0) + + if d.supportMixedBuild { + d.keyBuf.Write(keyLenByte[:]) + d.keyBuf.Write(key) + d.valueBuf.Write(valueLenByte[:]) + return nil, nil + } + + keyBuf := new(bytes.Buffer) + var versionByte [8]byte + binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) + keyBuf.Write(versionByte[:]) + keyBuf.Write(keyLenByte[:]) + keyBuf.Write(key) + + valueBuf := new(bytes.Buffer) + valueBuf.Write(valueLenByte[:]) + + ret := newResolvedMQMessage(config.ProtocolOpen, keyBuf.Bytes(), valueBuf.Bytes(), ts) + return ret, nil +} + +// AppendRowChangedEvent implements the EventBatchEncoder interface +func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (EncoderResult, error) { + keyMsg, valueMsg := kvEventToMqMessage(e) + key, err := keyMsg.Encode() + if err != nil { + return EncoderNoOperation, errors.Trace(err) + } + value, err := valueMsg.Encode() + if err != nil { + return EncoderNoOperation, errors.Trace(err) + } + + var keyLenByte [8]byte + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + var valueLenByte [8]byte + binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) + + if d.supportMixedBuild { + d.keyBuf.Write(keyLenByte[:]) + d.keyBuf.Write(key) + + d.valueBuf.Write(valueLenByte[:]) + d.valueBuf.Write(value) + } else { + // for single message that longer than max-message-size, do not send it. + // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` + length := len(key) + len(value) + maximumRecordOverhead + 16 + 8 + if length > d.maxMessageBytes { + log.Warn("Single message too large", + zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length)) + return EncoderNoOperation, cerror.ErrJSONCodecKvTooLarge.GenWithStackByArgs() + } + + if len(d.messageBuf) == 0 || + d.curBatchSize >= d.maxBatchSize || + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes { + + versionHead := make([]byte, 8) + binary.BigEndian.PutUint64(versionHead, BatchVersion1) + + d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeKv)) + d.curBatchSize = 0 + } + + message := d.messageBuf[len(d.messageBuf)-1] + message.Key = append(message.Key, keyLenByte[:]...) + message.Key = append(message.Key, key...) + message.Value = append(message.Value, valueLenByte[:]...) + message.Value = append(message.Value, value...) + message.Ts = e.CRTs + message.IncEntriesCount() + + if message.Length() > d.maxMessageBytes { + // `len(d.messageBuf) == 1` is implied + log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", + zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes)) + } + d.curBatchSize++ + } + return EncoderNoOperation, nil +} + +// Build implements the EventBatchEncoder interface +func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { + if d.supportMixedBuild { + if d.valueBuf.Len() == 0 { + return nil + } + /* there could be multiple types of event encoded within a single message which means the type is not sure */ + ret := NewMQMessage(config.ProtocolOpen, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown) + return []*MQMessage{ret} + } + + ret := d.messageBuf + d.messageBuf = make([]*MQMessage, 0) + return ret +} + +// MixedBuild implements the EventBatchEncoder interface +func (d *JSONEventBatchEncoder) MixedBuild(withVersion bool) []byte { + if !d.supportMixedBuild { + log.Panic("mixedBuildSupport not enabled!") + return nil + } + keyBytes := d.keyBuf.Bytes() + valueBytes := d.valueBuf.Bytes() + mixedBytes := make([]byte, len(keyBytes)+len(valueBytes)) + + index := uint64(0) + keyIndex := uint64(0) + valueIndex := uint64(0) + + if withVersion { + // the first 8 bytes is the version, we should copy directly + // then skip 8 bytes for next round key value parse + copy(mixedBytes[:8], keyBytes[:8]) + index = uint64(8) // skip version + keyIndex = uint64(8) // skip version + } + + for { + if keyIndex >= uint64(len(keyBytes)) { + break + } + keyLen := binary.BigEndian.Uint64(keyBytes[keyIndex : keyIndex+8]) + offset := keyLen + 8 + copy(mixedBytes[index:index+offset], keyBytes[keyIndex:keyIndex+offset]) + keyIndex += offset + index += offset + + valueLen := binary.BigEndian.Uint64(valueBytes[valueIndex : valueIndex+8]) + offset = valueLen + 8 + copy(mixedBytes[index:index+offset], valueBytes[valueIndex:valueIndex+offset]) + valueIndex += offset + index += offset + } + return mixedBytes +} + +// Size implements the EventBatchEncoder interface +func (d *JSONEventBatchEncoder) Size() int { + return d.keyBuf.Len() + d.valueBuf.Len() +} + +// Reset implements the EventBatchEncoder interface +func (d *JSONEventBatchEncoder) Reset() { + d.keyBuf.Reset() + d.valueBuf.Reset() +} + +// SetParams reads relevant parameters for Open Protocol +func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { + var err error + + maxMessageBytes, ok := params["max-message-bytes"] + if !ok { + return cerror.ErrSinkInvalidConfig.Wrap(errors.New("max-message-bytes not found")) + } + + d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes) + if err != nil { + return cerror.ErrSinkInvalidConfig.Wrap(err) + } + if d.maxMessageBytes <= 0 { + return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes)) + } + + d.maxBatchSize = DefaultMaxBatchSize + if maxBatchSize, ok := params["max-batch-size"]; ok { + d.maxBatchSize, err = strconv.Atoi(maxBatchSize) + if err != nil { + return cerror.ErrSinkInvalidConfig.Wrap(err) + } + } + if d.maxBatchSize <= 0 { + return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-batch-size %d", d.maxBatchSize)) + } + + return nil +} + +type jsonEventBatchEncoderBuilder struct { + opts map[string]string +} + +// Build a JSONEventBatchEncoder +func (b *jsonEventBatchEncoderBuilder) Build(ctx context.Context) (EventBatchEncoder, error) { + encoder := NewJSONEventBatchEncoder() + if err := encoder.SetParams(b.opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + return encoder, nil +} + +func newJSONEventBatchEncoderBuilder(opts map[string]string) EncoderBuilder { + return &jsonEventBatchEncoderBuilder{opts: opts} +} + +// NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder. +func NewJSONEventBatchEncoder() EventBatchEncoder { + batch := &JSONEventBatchEncoder{ + keyBuf: &bytes.Buffer{}, + valueBuf: &bytes.Buffer{}, + } + var versionByte [8]byte + binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) + batch.keyBuf.Write(versionByte[:]) + return batch +} + +// JSONEventBatchMixedDecoder decodes the byte of a batch into the original messages. +type JSONEventBatchMixedDecoder struct { + mixedBytes []byte + nextKey *mqMessageKey + nextKeyLen uint64 +} + +// HasNext implements the EventBatchDecoder interface +func (b *JSONEventBatchMixedDecoder) HasNext() (model.MqMessageType, bool, error) { + if !b.hasNext() { + return 0, false, nil + } + if err := b.decodeNextKey(); err != nil { + return 0, false, err + } + return b.nextKey.Type, true, nil +} + +// NextResolvedEvent implements the EventBatchDecoder interface +func (b *JSONEventBatchMixedDecoder) NextResolvedEvent() (uint64, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return 0, err + } + } + b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MqMessageTypeResolved { + return 0, cerror.ErrJSONCodecInvalidData.GenWithStack("not found resolved event message") + } + valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) + b.mixedBytes = b.mixedBytes[valueLen+8:] + resolvedTs := b.nextKey.CRTs + b.nextKey = nil + return resolvedTs, nil +} + +// NextChangedEvent implements the EventBatchDecoder interface +func (b *JSONEventBatchMixedDecoder) NextChangedEvent() (*model.RawKVEntry, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return nil, err + } + } + b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MqMessageTypeKv { + return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found kv event message") + } + valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) + value := b.mixedBytes[8 : valueLen+8] + b.mixedBytes = b.mixedBytes[valueLen+8:] + kvMsg := new(mqMessageValue) + if err := kvMsg.Decode(value); err != nil { + return nil, errors.Trace(err) + } + kvEvent := mqMessageToKvEvent(b.nextKey, kvMsg) + b.nextKey = nil + return kvEvent, nil +} + +func (b *JSONEventBatchMixedDecoder) hasNext() bool { + return len(b.mixedBytes) > 0 +} + +func (b *JSONEventBatchMixedDecoder) decodeNextKey() error { + keyLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) + key := b.mixedBytes[8 : keyLen+8] + // drop value bytes + msgKey := new(mqMessageKey) + err := msgKey.Decode(key) + if err != nil { + return errors.Trace(err) + } + b.nextKey = msgKey + b.nextKeyLen = keyLen + return nil +} + +// JSONEventBatchDecoder decodes the byte of a batch into the original messages. +type JSONEventBatchDecoder struct { + keyBytes []byte + valueBytes []byte + nextKey *mqMessageKey + nextKeyLen uint64 +} + +// HasNext implements the EventBatchDecoder interface +func (b *JSONEventBatchDecoder) HasNext() (model.MqMessageType, bool, error) { + if !b.hasNext() { + return 0, false, nil + } + if err := b.decodeNextKey(); err != nil { + return 0, false, err + } + return b.nextKey.Type, true, nil +} + +// NextResolvedEvent implements the EventBatchDecoder interface +func (b *JSONEventBatchDecoder) NextResolvedEvent() (uint64, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return 0, err + } + } + b.keyBytes = b.keyBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MqMessageTypeResolved { + return 0, cerror.ErrJSONCodecInvalidData.GenWithStack("not found resolved event message") + } + valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) + b.valueBytes = b.valueBytes[valueLen+8:] + resolvedTs := b.nextKey.CRTs + b.nextKey = nil + return resolvedTs, nil +} + +// NextChangedEvent implements the EventBatchDecoder interface +func (b *JSONEventBatchDecoder) NextChangedEvent() (*model.RawKVEntry, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return nil, err + } + } + b.keyBytes = b.keyBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MqMessageTypeKv { + return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found row event message") + } + valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) + value := b.valueBytes[8 : valueLen+8] + b.valueBytes = b.valueBytes[valueLen+8:] + mvMsg := new(mqMessageValue) + if err := mvMsg.Decode(value); err != nil { + return nil, errors.Trace(err) + } + kvEvent := mqMessageToKvEvent(b.nextKey, mvMsg) + b.nextKey = nil + return kvEvent, nil +} + +func (b *JSONEventBatchDecoder) hasNext() bool { + return len(b.keyBytes) > 0 && len(b.valueBytes) > 0 +} + +func (b *JSONEventBatchDecoder) decodeNextKey() error { + keyLen := binary.BigEndian.Uint64(b.keyBytes[:8]) + key := b.keyBytes[8 : keyLen+8] + msgKey := new(mqMessageKey) + err := msgKey.Decode(key) + if err != nil { + return errors.Trace(err) + } + b.nextKey = msgKey + b.nextKeyLen = keyLen + return nil +} + +// NewJSONEventBatchDecoder creates a new JSONEventBatchDecoder. +func NewJSONEventBatchDecoder(key []byte, value []byte) (EventBatchDecoder, error) { + version := binary.BigEndian.Uint64(key[:8]) + key = key[8:] + if version != BatchVersion1 { + return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("unexpected key format version") + } + // if only decode one byte slice, we choose MixedDecoder + if len(key) > 0 && len(value) == 0 { + return &JSONEventBatchMixedDecoder{ + mixedBytes: key, + }, nil + } + return &JSONEventBatchDecoder{ + keyBytes: key, + valueBytes: value, + }, nil +} diff --git a/cdc/cdc/sink/codec/json_test.go b/cdc/cdc/sink/codec/json_test.go new file mode 100644 index 00000000..7b06d03d --- /dev/null +++ b/cdc/cdc/sink/codec/json_test.go @@ -0,0 +1,294 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "context" + "math" + "strconv" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/tikv/migration/cdc/cdc/model" + "github.com/tikv/migration/cdc/pkg/config" + "github.com/tikv/migration/cdc/pkg/util/testleak" +) + +type batchSuite struct { + kvCases [][]*model.RawKVEntry + resolvedTsCases [][]uint64 +} + +var _ = check.Suite(&batchSuite{ + kvCases: codecEntryCases, + resolvedTsCases: codecResolvedTSCases, +}) + +func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEncoder, newDecoder func(key []byte, value []byte) (EventBatchDecoder, error)) { + checkKvDecoder := func(decoder EventBatchDecoder, cs []*model.RawKVEntry) { + index := 0 + for { + tp, hasNext, err := decoder.HasNext() + c.Assert(err, check.IsNil) + if !hasNext { + break + } + c.Assert(tp, check.Equals, model.MqMessageTypeKv) + kv, err := decoder.NextChangedEvent() + c.Assert(err, check.IsNil) + c.Assert(kv, check.DeepEquals, cs[index]) + index++ + } + } + checkTSDecoder := func(decoder EventBatchDecoder, cs []uint64) { + index := 0 + for { + tp, hasNext, err := decoder.HasNext() + c.Assert(err, check.IsNil) + if !hasNext { + break + } + c.Assert(tp, check.Equals, model.MqMessageTypeResolved) + ts, err := decoder.NextResolvedEvent() + c.Assert(err, check.IsNil) + c.Assert(ts, check.DeepEquals, cs[index]) + index++ + } + } + + for _, cs := range s.kvCases { + encoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + + mixedEncoder := newEncoder() + mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) + for _, row := range cs { + _, err := encoder.AppendChangedEvent(row) + c.Assert(err, check.IsNil) + + op, err := mixedEncoder.AppendChangedEvent(row) + c.Assert(op, check.Equals, EncoderNoOperation) + c.Assert(err, check.IsNil) + } + // test mixed decode + mixed := mixedEncoder.MixedBuild(true) + c.Assert(len(mixed), check.Equals, mixedEncoder.Size()) + mixedDecoder, err := newDecoder(mixed, nil) + c.Assert(err, check.IsNil) + checkKvDecoder(mixedDecoder, cs) + // test normal decode + if len(cs) > 0 { + res := encoder.Build() + c.Assert(res, check.HasLen, 1) + decoder, err := newDecoder(res[0].Key, res[0].Value) + c.Assert(err, check.IsNil) + checkKvDecoder(decoder, cs) + } + } + + for _, cs := range s.resolvedTsCases { + encoder := newEncoder() + mixedEncoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + + mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) + for i, ts := range cs { + msg, err := encoder.EncodeCheckpointEvent(ts) + c.Assert(err, check.IsNil) + c.Assert(msg, check.NotNil) + decoder, err := newDecoder(msg.Key, msg.Value) + c.Assert(err, check.IsNil) + checkTSDecoder(decoder, cs[i:i+1]) + + msg, err = mixedEncoder.EncodeCheckpointEvent(ts) + c.Assert(msg, check.IsNil) + c.Assert(err, check.IsNil) + } + + // test mixed encode + mixed := mixedEncoder.MixedBuild(true) + c.Assert(len(mixed), check.Equals, mixedEncoder.Size()) + mixedDecoder, err := newDecoder(mixed, nil) + c.Assert(err, check.IsNil) + checkTSDecoder(mixedDecoder, cs) + } +} + +func (s *batchSuite) TestParamsEdgeCases(c *check.C) { + defer testleak.AfterTest(c)() + encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder) + err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"}) + c.Assert(err, check.IsNil) + c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) + c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) + + err = encoder.SetParams(map[string]string{"max-message-bytes": "0"}) + c.Assert(err, check.ErrorMatches, ".*invalid.*") + + err = encoder.SetParams(map[string]string{"max-message-bytes": "-1"}) + c.Assert(err, check.ErrorMatches, ".*invalid.*") + + err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)}) + c.Assert(err, check.IsNil) + c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) + c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32) + + err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)}) + c.Assert(err, check.IsNil) + c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) + c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxUint32) + + err = encoder.SetParams(map[string]string{"max-batch-size": "0"}) + c.Assert(err, check.ErrorMatches, ".*invalid.*") + + err = encoder.SetParams(map[string]string{"max-batch-size": "-1"}) + c.Assert(err, check.ErrorMatches, ".*invalid.*") + + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)}) + c.Assert(err, check.IsNil) + c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32) + c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) + + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)}) + c.Assert(err, check.IsNil) + c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32) + c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) +} + +func (s *batchSuite) TestSetParams(c *check.C) { + defer testleak.AfterTest(c) + + opts := make(map[string]string) + encoderBuilder := newJSONEventBatchEncoderBuilder(opts) + c.Assert(encoderBuilder, check.NotNil) + encoder, err := encoderBuilder.Build(context.Background()) + c.Assert(encoder, check.IsNil) + c.Assert( + errors.Cause(err), + check.ErrorMatches, + ".*max-message-bytes not found.*", + ) + + opts["max-message-bytes"] = "1" + encoderBuilder = newJSONEventBatchEncoderBuilder(opts) + c.Assert(encoderBuilder, check.NotNil) + encoder, err = encoderBuilder.Build(context.Background()) + c.Assert(err, check.IsNil) + c.Assert(encoder, check.NotNil) + + jsonEncoder, ok := encoder.(*JSONEventBatchEncoder) + c.Assert(ok, check.IsTrue) + c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1) +} + +func (s *batchSuite) TestMaxMessageBytes(c *check.C) { + defer testleak.AfterTest(c)() + encoder := NewJSONEventBatchEncoder() + + // the size of `testEvent` is 75 + testEvent := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("key"), + Value: []byte("value"), + CRTs: 100, + ExpiredTs: 200, + } + eventSize := 75 + + // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it. + a := strconv.Itoa(eventSize + 44) + err := encoder.SetParams(map[string]string{"max-message-bytes": a}) + c.Check(err, check.IsNil) + r, err := encoder.AppendChangedEvent(testEvent) + c.Check(err, check.IsNil) + c.Check(r, check.Equals, EncoderNoOperation) + + a = strconv.Itoa(eventSize + 43) + err = encoder.SetParams(map[string]string{"max-message-bytes": a}) + c.Assert(err, check.IsNil) + r, err = encoder.AppendChangedEvent(testEvent) + c.Check(err, check.NotNil) + c.Check(r, check.Equals, EncoderNoOperation) + + // make sure each batch's `Length` not greater than `max-message-bytes` + err = encoder.SetParams(map[string]string{"max-message-bytes": "256"}) + c.Check(err, check.IsNil) + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + for _, msg := range messages { + c.Assert(msg.Length(), check.LessEqual, 256) + } +} + +func (s *batchSuite) TestMaxBatchSize(c *check.C) { + defer testleak.AfterTest(c)() + encoderBuilder := newJSONEventBatchEncoderBuilder(map[string]string{"max-message-bytes": "1048576", "max-batch-size": "64"}) + c.Assert(encoderBuilder, check.NotNil) + encoder, err := encoderBuilder.Build(context.Background()) + c.Assert(err, check.IsNil) + c.Assert(encoder, check.NotNil) + + testEvent := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("key"), + Value: []byte("value"), + CRTs: 1, + ExpiredTs: 20, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + sum := 0 + for _, msg := range messages { + decoder, err := NewJSONEventBatchDecoder(msg.Key, msg.Value) + c.Check(err, check.IsNil) + count := 0 + for { + t, hasNext, err := decoder.HasNext() + c.Check(err, check.IsNil) + if !hasNext { + break + } + + c.Check(t, check.Equals, model.MqMessageTypeKv) + _, err = decoder.NextChangedEvent() + c.Check(err, check.IsNil) + count++ + } + c.Check(count, check.LessEqual, 64) + sum += count + } + c.Check(sum, check.Equals, 10000) +} + +func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) { + defer testleak.AfterTest(c)() + s.testBatchCodec(c, func() EventBatchEncoder { + encoder := NewJSONEventBatchEncoder() + return encoder + }, NewJSONEventBatchDecoder) +} diff --git a/cdc/cdc/sink/mq.go b/cdc/cdc/sink/mq.go new file mode 100644 index 00000000..289d241d --- /dev/null +++ b/cdc/cdc/sink/mq.go @@ -0,0 +1,370 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "net/url" + "strings" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/migration/cdc/cdc/model" + "github.com/tikv/migration/cdc/cdc/sink/codec" + "github.com/tikv/migration/cdc/cdc/sink/producer" + "github.com/tikv/migration/cdc/cdc/sink/producer/kafka" + "github.com/tikv/migration/cdc/pkg/config" + cerror "github.com/tikv/migration/cdc/pkg/errors" + + "github.com/tikv/migration/cdc/pkg/notify" + "github.com/tikv/migration/cdc/pkg/security" + "github.com/twmb/murmur3" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type mqEvent struct { + entry *model.RawKVEntry + resolvedTs uint64 +} + +const ( + defaultPartitionInputChSize = 12800 +) + +type mqSink struct { + mqProducer producer.Producer + encoderBuilder codec.EncoderBuilder + protocol config.Protocol + + partitionNum int32 + partitionInput []chan mqEvent + partitionResolvedTs []uint64 + checkpointTs uint64 + resolvedNotifier *notify.Notifier + resolvedReceiver *notify.Receiver + + statistics *Statistics +} + +func newMqSink( + ctx context.Context, credential *security.Credential, mqProducer producer.Producer, + replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error, +) (*mqSink, error) { + var protocol config.Protocol + err := protocol.FromString(replicaConfig.Sink.Protocol) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + encoderBuilder, err := codec.NewEventBatchEncoderBuilder(protocol, credential, opts) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + // pre-flight verification of encoder parameters + if _, err := encoderBuilder.Build(ctx); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + partitionNum := mqProducer.GetPartitionNum() + partitionInput := make([]chan mqEvent, partitionNum) + for i := 0; i < int(partitionNum); i++ { + partitionInput[i] = make(chan mqEvent, defaultPartitionInputChSize) + } + + notifier := new(notify.Notifier) + resolvedReceiver, err := notifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, errors.Trace(err) + } + + s := &mqSink{ + mqProducer: mqProducer, + encoderBuilder: encoderBuilder, + protocol: protocol, + + partitionNum: partitionNum, + partitionInput: partitionInput, + partitionResolvedTs: make([]uint64, partitionNum), + resolvedNotifier: notifier, + resolvedReceiver: resolvedReceiver, + + statistics: NewStatistics(ctx, "MQ", opts), + } + + go func() { + if err := s.run(ctx); err != nil && errors.Cause(err) != context.Canceled { + select { + case <-ctx.Done(): + return + case errCh <- err: + default: + log.Error("error channel is full", zap.Error(err)) + } + } + }() + return s, nil +} + +func (k *mqSink) dispatch(entry *model.RawKVEntry) uint32 { + hasher := murmur3.New32() + hasher.Write(entry.Key) + return hasher.Sum32() % uint32(k.partitionNum) +} + +func (k *mqSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error { + entriesCount := 0 + + for _, entry := range rawKVEntries { + partition := k.dispatch(entry) + select { + case <-ctx.Done(): + return ctx.Err() + case k.partitionInput[partition] <- struct { + entry *model.RawKVEntry + resolvedTs uint64 + }{entry: entry}: + } + entriesCount++ + } + k.statistics.AddEntriesCount(entriesCount) + return nil +} + +func (k *mqSink) FlushChangedEvents(ctx context.Context, keyspanID model.KeySpanID, resolvedTs uint64) (uint64, error) { + if resolvedTs <= k.checkpointTs { + return k.checkpointTs, nil + } + + for i := 0; i < int(k.partitionNum); i++ { + select { + case <-ctx.Done(): + return 0, ctx.Err() + case k.partitionInput[i] <- struct { + entry *model.RawKVEntry + resolvedTs uint64 + }{resolvedTs: resolvedTs}: + } + } + + // waiting for all events are sent to mq producer +flushLoop: + for { + select { + case <-ctx.Done(): + return 0, ctx.Err() + case <-k.resolvedReceiver.C: + for i := 0; i < int(k.partitionNum); i++ { + if resolvedTs > atomic.LoadUint64(&k.partitionResolvedTs[i]) { + continue flushLoop + } + } + break flushLoop + } + } + err := k.mqProducer.Flush(ctx) + if err != nil { + return 0, errors.Trace(err) + } + k.checkpointTs = resolvedTs + k.statistics.PrintStatus(ctx) + return k.checkpointTs, nil +} + +func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + encoder, err := k.encoderBuilder.Build(ctx) + if err != nil { + return errors.Trace(err) + } + msg, err := encoder.EncodeCheckpointEvent(ts) + if err != nil { + return errors.Trace(err) + } + if msg == nil { + return nil + } + err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1) + return errors.Trace(err) +} + +func (k *mqSink) Close(ctx context.Context) error { + err := k.mqProducer.Close() + return errors.Trace(err) +} + +func (k *mqSink) Barrier(cxt context.Context, keyspanID model.KeySpanID) error { + // Barrier does nothing because FlushChangedEvents in mq sink has flushed + // all buffered events by force. + return nil +} + +func (k *mqSink) run(ctx context.Context) error { + defer k.resolvedReceiver.Stop() + wg, ctx := errgroup.WithContext(ctx) + for i := int32(0); i < k.partitionNum; i++ { + partition := i + wg.Go(func() error { + return k.runWorker(ctx, partition) + }) + } + return wg.Wait() +} + +const batchSizeLimit = 4 * 1024 * 1024 // 4MB + +func (k *mqSink) runWorker(ctx context.Context, partition int32) error { + log.Info("mqSink worker start", zap.Int32("partition", partition)) + + input := k.partitionInput[partition] + encoder, err := k.encoderBuilder.Build(ctx) + if err != nil { + return errors.Trace(err) + } + tick := time.NewTicker(500 * time.Millisecond) + defer tick.Stop() + + flushToProducer := func(op codec.EncoderResult) error { + return k.statistics.RecordBatchExecution(func() (int, error) { + messages := encoder.Build() + thisBatchSize := 0 + if len(messages) == 0 { + return 0, nil + } + + for _, msg := range messages { + err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition) + if err != nil { + return 0, err + } + thisBatchSize += msg.GetEntriesCount() + } + + if op == codec.EncoderNeedSyncWrite { + err := k.mqProducer.Flush(ctx) + if err != nil { + return 0, err + } + } + log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize)) + return thisBatchSize, nil + }) + } + for { + var e mqEvent + select { + case <-ctx.Done(): + return ctx.Err() + case <-tick.C: + if err := flushToProducer(codec.EncoderNeedAsyncWrite); err != nil { + return errors.Trace(err) + } + continue + case e = <-input: + } + if e.entry == nil { + if e.resolvedTs != 0 { + op, err := encoder.AppendResolvedEvent(e.resolvedTs) + if err != nil { + return errors.Trace(err) + } + + if err := flushToProducer(op); err != nil { + return errors.Trace(err) + } + + atomic.StoreUint64(&k.partitionResolvedTs[partition], e.resolvedTs) + k.resolvedNotifier.Notify() + } + continue + } + op, err := encoder.AppendChangedEvent(e.entry) + if err != nil { + return errors.Trace(err) + } + + if encoder.Size() >= batchSizeLimit { + op = codec.EncoderNeedAsyncWrite + } + + if encoder.Size() >= batchSizeLimit || op != codec.EncoderNoOperation { + if err := flushToProducer(op); err != nil { + return errors.Trace(err) + } + } + } +} + +func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, op codec.EncoderResult, partition int32) error { + switch op { + case codec.EncoderNeedAsyncWrite: + if partition >= 0 { + return k.mqProducer.AsyncSendMessage(ctx, message, partition) + } + return cerror.ErrAsyncBroadcastNotSupport.GenWithStackByArgs() + case codec.EncoderNeedSyncWrite: + if partition >= 0 { + err := k.mqProducer.AsyncSendMessage(ctx, message, partition) + if err != nil { + return err + } + return k.mqProducer.Flush(ctx) + } + return k.mqProducer.SyncBroadcastMessage(ctx, message) + } + + log.Warn("writeToProducer called with no-op", + zap.ByteString("key", message.Key), + zap.ByteString("value", message.Value), + zap.Int32("partition", partition)) + return nil +} + +func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { + producerConfig, topic, err := parseKafkaSinkConfig(sinkURI, replicaConfig, opts) + if err != nil { + return nil, err + } + + sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, opts, errCh) + if err != nil { + return nil, errors.Trace(err) + } + sink, err := newMqSink(ctx, producerConfig.Credential, sProducer, replicaConfig, opts, errCh) + if err != nil { + return nil, errors.Trace(err) + } + return sink, nil +} + +func parseKafkaSinkConfig(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) (producerConfig *kafka.Config, topic string, err error) { + producerConfig = kafka.NewConfig() + if err := kafka.CompleteConfigsAndOpts(sinkURI, producerConfig, replicaConfig, opts); err != nil { + return nil, "", cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + // NOTICE: Please check after the completion, as we may get the configuration from the sinkURI. + err = replicaConfig.Validate() + if err != nil { + return nil, "", err + } + + topic = strings.TrimFunc(sinkURI.Path, func(r rune) bool { + return r == '/' + }) + if topic == "" { + return nil, "", cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") + } + + return producerConfig, topic, nil +} diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go new file mode 100644 index 00000000..e337e5c6 --- /dev/null +++ b/cdc/cdc/sink/mq_test.go @@ -0,0 +1,212 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "fmt" + "net/url" + + "github.com/Shopify/sarama" + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/tikv/migration/cdc/cdc/model" + "github.com/tikv/migration/cdc/cdc/sink/codec" + kafkap "github.com/tikv/migration/cdc/cdc/sink/producer/kafka" + "github.com/tikv/migration/cdc/pkg/config" + + "github.com/tikv/migration/cdc/pkg/kafka" + "github.com/tikv/migration/cdc/pkg/util/testleak" +) + +type mqSinkSuite struct{} + +var _ = check.Suite(&mqSinkSuite{}) + +func (s mqSinkSuite) TestKafkaSink(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithCancel(context.Background()) + + topic := kafka.DefaultMockTopicName + leader := sarama.NewMockBroker(c, 1) + defer leader.Close() + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + leader.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) + + uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + + "&max-message-bytes=1048576&partition-num=1" + + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" + uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + replicaConfig := config.GetDefaultReplicaConfig() + opts := map[string]string{} + errCh := make(chan error, 1) + + kafkap.NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient + }() + + sink, err := newKafkaSaramaSink(ctx, sinkURI, replicaConfig, opts, errCh) + c.Assert(err, check.IsNil) + + encoder, err := sink.encoderBuilder.Build(ctx) + c.Assert(err, check.IsNil) + + c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{}) + c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1) + c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 1048576) + + // mock kafka broker processes 1 row changed event + leader.Returns(prodSuccess) + keyspaceID := model.KeySpanID(1) + kv := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("key"), + Value: []byte("value"), + StartTs: 100, + CRTs: 120, + } + err = sink.EmitChangedEvents(ctx, kv) + c.Assert(err, check.IsNil) + checkpointTs, err := sink.FlushChangedEvents(ctx, keyspaceID, uint64(120)) + c.Assert(err, check.IsNil) + c.Assert(checkpointTs, check.Equals, uint64(120)) + // flush older resolved ts + checkpointTs, err = sink.FlushChangedEvents(ctx, keyspaceID, uint64(110)) + c.Assert(err, check.IsNil) + c.Assert(checkpointTs, check.Equals, uint64(120)) + + // mock kafka broker processes 1 checkpoint ts event + leader.Returns(prodSuccess) + err = sink.EmitCheckpointTs(ctx, uint64(120)) + c.Assert(err, check.IsNil) + + cancel() + err = sink.EmitChangedEvents(ctx, kv) + if err != nil { + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + } + err = sink.EmitCheckpointTs(ctx, uint64(140)) + if err != nil { + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + } + + err = sink.Close(ctx) + if err != nil { + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + } +} + +func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + topic := kafka.DefaultMockTopicName + leader := sarama.NewMockBroker(c, 1) + defer leader.Close() + + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + leader.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) + + uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + + "&max-message-bytes=1048576&partition-num=1" + + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" + uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + replicaConfig := config.GetDefaultReplicaConfig() + opts := map[string]string{} + errCh := make(chan error, 1) + + kafkap.NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient + }() + + sink, err := newKafkaSaramaSink(ctx, sinkURI, replicaConfig, opts, errCh) + c.Assert(err, check.IsNil) + + // mock kafka broker processes 1 row changed event + leader.Returns(prodSuccess) + keyspanID1 := model.KeySpanID(1) + kv1 := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("key1"), + Value: []byte("value1"), + StartTs: 100, + CRTs: 120, + } + err = sink.EmitChangedEvents(ctx, kv1) + c.Assert(err, check.IsNil) + + kv2 := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("key"), + Value: []byte("value"), + StartTs: 90, + CRTs: 125, + } + err = sink.EmitChangedEvents(ctx, kv2) + c.Assert(err, check.IsNil) + + kv3 := &model.RawKVEntry{ + OpType: model.OpTypeDelete, + Key: []byte("key3"), + Value: []byte("value3"), + StartTs: 110, + CRTs: 130, + } + + err = sink.EmitChangedEvents(ctx, kv3) + c.Assert(err, check.IsNil) + + // mock kafka broker processes 1 row resolvedTs event + leader.Returns(prodSuccess) + checkpointTs1, err := sink.FlushChangedEvents(ctx, keyspanID1, kv1.CRTs) + c.Assert(err, check.IsNil) + c.Assert(checkpointTs1, check.Equals, kv1.CRTs) + + checkpointTs2, err := sink.FlushChangedEvents(ctx, keyspanID1, kv2.CRTs) + c.Assert(err, check.IsNil) + c.Assert(checkpointTs2, check.Equals, kv2.CRTs) + + checkpointTs3, err := sink.FlushChangedEvents(ctx, keyspanID1, kv3.CRTs) + c.Assert(err, check.IsNil) + c.Assert(checkpointTs3, check.Equals, kv3.CRTs) + + // flush older resolved ts + checkpointTsOld, err := sink.FlushChangedEvents(ctx, keyspanID1, uint64(110)) + c.Assert(err, check.IsNil) + c.Assert(checkpointTsOld, check.Equals, kv3.CRTs) + + err = sink.Close(ctx) + if err != nil { + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + } +} diff --git a/cdc/cdc/sink/producer/kafka/config.go b/cdc/cdc/sink/producer/kafka/config.go new file mode 100644 index 00000000..e33395df --- /dev/null +++ b/cdc/cdc/sink/producer/kafka/config.go @@ -0,0 +1,285 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/migration/cdc/pkg/config" + cerror "github.com/tikv/migration/cdc/pkg/errors" + "github.com/tikv/migration/cdc/pkg/security" + "github.com/tikv/migration/cdc/pkg/util" + "go.uber.org/zap" +) + +func init() { + sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB +} + +// Config stores user specified Kafka producer configuration +type Config struct { + BrokerEndpoints []string + PartitionNum int32 + + // User should make sure that `replication-factor` not greater than the number of kafka brokers. + ReplicationFactor int16 + + Version string + MaxMessageBytes int + Compression string + ClientID string + Credential *security.Credential + SaslScram *security.SaslScram + // control whether to create topic + AutoCreate bool +} + +// NewConfig returns a default Kafka configuration +func NewConfig() *Config { + return &Config{ + Version: "2.4.0", + // MaxMessageBytes will be used to initialize producer + MaxMessageBytes: config.DefaultMaxMessageBytes, + ReplicationFactor: 1, + Compression: "none", + Credential: &security.Credential{}, + SaslScram: &security.SaslScram{}, + AutoCreate: true, + } +} + +// set the partition-num by the topic's partition count. +func (c *Config) setPartitionNum(realPartitionCount int32) error { + // user does not specify the `partition-num` in the sink-uri + if c.PartitionNum == 0 { + c.PartitionNum = realPartitionCount + return nil + } + + if c.PartitionNum < realPartitionCount { + log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+ + "Some partitions will not have messages dispatched to", + zap.Int32("sink-uri partitions", c.PartitionNum), + zap.Int32("topic partitions", realPartitionCount)) + return nil + } + + // Make sure that the user-specified `partition-num` is not greater than + // the real partition count, since messages would be dispatched to different + // partitions, this could prevent potential correctness problems. + if c.PartitionNum > realPartitionCount { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStack( + "the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)", + c.PartitionNum, realPartitionCount) + } + return nil +} + +// CompleteConfigsAndOpts the kafka producer configuration, replication configuration and opts. +func CompleteConfigsAndOpts(sinkURI *url.URL, producerConfig *Config, replicaConfig *config.ReplicaConfig, opts map[string]string) error { + producerConfig.BrokerEndpoints = strings.Split(sinkURI.Host, ",") + params := sinkURI.Query() + s := params.Get("partition-num") + if s != "" { + a, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return err + } + producerConfig.PartitionNum = int32(a) + if producerConfig.PartitionNum <= 0 { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(producerConfig.PartitionNum) + } + } + + s = params.Get("replication-factor") + if s != "" { + a, err := strconv.ParseInt(s, 10, 16) + if err != nil { + return err + } + producerConfig.ReplicationFactor = int16(a) + } + + s = params.Get("kafka-version") + if s != "" { + producerConfig.Version = s + } + + s = params.Get("max-message-bytes") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + producerConfig.MaxMessageBytes = a + opts["max-message-bytes"] = s + } + + s = params.Get("max-batch-size") + if s != "" { + opts["max-batch-size"] = s + } + + s = params.Get("compression") + if s != "" { + producerConfig.Compression = s + } + + producerConfig.ClientID = params.Get("kafka-client-id") + + s = params.Get("ca") + if s != "" { + producerConfig.Credential.CAPath = s + } + + s = params.Get("cert") + if s != "" { + producerConfig.Credential.CertPath = s + } + + s = params.Get("key") + if s != "" { + producerConfig.Credential.KeyPath = s + } + + s = params.Get("sasl-user") + if s != "" { + producerConfig.SaslScram.SaslUser = s + } + + s = params.Get("sasl-password") + if s != "" { + producerConfig.SaslScram.SaslPassword = s + } + + s = params.Get("sasl-mechanism") + if s != "" { + producerConfig.SaslScram.SaslMechanism = s + } + + s = params.Get("auto-create-topic") + if s != "" { + autoCreate, err := strconv.ParseBool(s) + if err != nil { + return err + } + producerConfig.AutoCreate = autoCreate + } + + s = params.Get(config.ProtocolKey) + if s != "" { + replicaConfig.Sink.Protocol = s + } + + return nil +} + +// newSaramaConfig return the default config and set the according version and metrics +func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { + config := sarama.NewConfig() + + version, err := sarama.ParseKafkaVersion(c.Version) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) + } + var role string + if util.IsOwnerFromCtx(ctx) { + role = "owner" + } else { + role = "processor" + } + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + + config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID) + if err != nil { + return nil, errors.Trace(err) + } + config.Version = version + // See: https://kafka.apache.org/documentation/#replication + // When one of the brokers in a Kafka cluster is down, the partition leaders + // in this broker is broken, Kafka will election a new partition leader and + // replication logs, this process will last from a few seconds to a few minutes. + // Kafka cluster will not provide a writing service in this process. + // Time out in one minute. + config.Metadata.Retry.Max = 120 + config.Metadata.Retry.Backoff = 500 * time.Millisecond + // If it is not set, this means a metadata request against an unreachable + // cluster (all brokers are unreachable or unresponsive) can take up to + // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + + // Metadata.Retry.Backoff * Metadata.Retry.Max` + // to fail. + // See: https://github.com/Shopify/sarama/issues/765 + // and https://github.com/tikv/migration/cdc/issues/3352. + config.Metadata.Timeout = 1 * time.Minute + + config.Producer.Partitioner = sarama.NewManualPartitioner + config.Producer.MaxMessageBytes = c.MaxMessageBytes + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Producer.RequiredAcks = sarama.WaitForAll + // Time out in five minutes(600 * 500ms). + config.Producer.Retry.Max = 600 + config.Producer.Retry.Backoff = 500 * time.Millisecond + switch strings.ToLower(strings.TrimSpace(c.Compression)) { + case "none": + config.Producer.Compression = sarama.CompressionNone + case "gzip": + config.Producer.Compression = sarama.CompressionGZIP + case "snappy": + config.Producer.Compression = sarama.CompressionSnappy + case "lz4": + config.Producer.Compression = sarama.CompressionLZ4 + case "zstd": + config.Producer.Compression = sarama.CompressionZSTD + default: + log.Warn("Unsupported compression algorithm", zap.String("compression", c.Compression)) + config.Producer.Compression = sarama.CompressionNone + } + + // Time out in one minute(120 * 500ms). + config.Admin.Retry.Max = 120 + config.Admin.Retry.Backoff = 500 * time.Millisecond + config.Admin.Timeout = 1 * time.Minute + + if c.Credential != nil && len(c.Credential.CAPath) != 0 { + config.Net.TLS.Enable = true + config.Net.TLS.Config, err = c.Credential.ToTLSConfig() + if err != nil { + return nil, errors.Trace(err) + } + } + if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 { + config.Net.SASL.Enable = true + config.Net.SASL.User = c.SaslScram.SaslUser + config.Net.SASL.Password = c.SaslScram.SaslPassword + config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism) + if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} } + } else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} } + } else { + return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512") + } + } + + return config, err +} diff --git a/cdc/cdc/sink/producer/kafka/config_test.go b/cdc/cdc/sink/producer/kafka/config_test.go new file mode 100644 index 00000000..c83c2ea9 --- /dev/null +++ b/cdc/cdc/sink/producer/kafka/config_test.go @@ -0,0 +1,162 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + "net/url" + + "github.com/Shopify/sarama" + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/tikv/migration/cdc/pkg/config" + cerror "github.com/tikv/migration/cdc/pkg/errors" + "github.com/tikv/migration/cdc/pkg/security" + "github.com/tikv/migration/cdc/pkg/util" + "github.com/tikv/migration/cdc/pkg/util/testleak" +) + +func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { + defer testleak.AfterTest(c)() + ctx := context.Background() + config := NewConfig() + config.Version = "invalid" + _, err := newSaramaConfigImpl(ctx, config) + c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") + + ctx = util.SetOwnerInCtx(ctx) + config.Version = "2.6.0" + config.ClientID = "^invalid$" + _, err = newSaramaConfigImpl(ctx, config) + c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue) + + config.ClientID = "test-kafka-client" + compressionCases := []struct { + algorithm string + expected sarama.CompressionCodec + }{ + {"none", sarama.CompressionNone}, + {"gzip", sarama.CompressionGZIP}, + {"snappy", sarama.CompressionSnappy}, + {"lz4", sarama.CompressionLZ4}, + {"zstd", sarama.CompressionZSTD}, + {"others", sarama.CompressionNone}, + } + for _, cc := range compressionCases { + config.Compression = cc.algorithm + cfg, err := newSaramaConfigImpl(ctx, config) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.Compression, check.Equals, cc.expected) + } + + config.Credential = &security.Credential{ + CAPath: "/invalid/ca/path", + } + _, err = newSaramaConfigImpl(ctx, config) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") + + saslConfig := NewConfig() + saslConfig.Version = "2.6.0" + saslConfig.ClientID = "test-sasl-scram" + saslConfig.SaslScram = &security.SaslScram{ + SaslUser: "user", + SaslPassword: "password", + SaslMechanism: sarama.SASLTypeSCRAMSHA256, + } + + cfg, err := newSaramaConfigImpl(ctx, saslConfig) + c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + c.Assert(cfg.Net.SASL.User, check.Equals, "user") + c.Assert(cfg.Net.SASL.Password, check.Equals, "password") + c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256")) +} + +func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) { + defer testleak.AfterTest(c) + cfg := NewConfig() + + // Normal config. + uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + + "&max-message-bytes=%s&partition-num=1&replication-factor=3" + + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + maxMessageSize := "4096" // 4kb + uri := fmt.Sprintf(uriTemplate, maxMessageSize) + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + opts := make(map[string]string) + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(1)) + c.Assert(cfg.ReplicationFactor, check.Equals, int16(3)) + c.Assert(cfg.Version, check.Equals, "2.6.0") + c.Assert(cfg.MaxMessageBytes, check.Equals, 4096) + expectedOpts := map[string]string{ + "max-message-bytes": maxMessageSize, + "max-batch-size": "5", + } + for k, v := range opts { + c.Assert(v, check.Equals, expectedOpts[k]) + } + + // Illegal replication-factor. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&replication-factor=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Illegal max-message-bytes. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&max-message-bytes=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Illegal partition-num. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Out of range partition-num. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*") +} + +func (s *kafkaSuite) TestSetPartitionNum(c *check.C) { + defer testleak.AfterTest(c)() + cfg := NewConfig() + err := cfg.setPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(2)) + + cfg.PartitionNum = 1 + err = cfg.setPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(1)) + + cfg.PartitionNum = 3 + err = cfg.setPartitionNum(2) + c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) +} diff --git a/cdc/cdc/sink/producer/kafka/kafka.go b/cdc/cdc/sink/producer/kafka/kafka.go new file mode 100644 index 00000000..d1a937b8 --- /dev/null +++ b/cdc/cdc/sink/producer/kafka/kafka.go @@ -0,0 +1,470 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + "regexp" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/migration/cdc/cdc/sink/codec" + cerror "github.com/tikv/migration/cdc/pkg/errors" + "github.com/tikv/migration/cdc/pkg/kafka" + "github.com/tikv/migration/cdc/pkg/notify" + "go.uber.org/zap" +) + +const ( + // defaultPartitionNum specifies the default number of partitions when we create the topic. + defaultPartitionNum = 3 +) + +const ( + kafkaProducerRunning = 0 + kafkaProducerClosing = 1 +) + +type kafkaSaramaProducer struct { + // clientLock is used to protect concurrent access of asyncClient and syncClient. + // Since we don't close these two clients (which have an input chan) from the + // sender routine, data race or send on closed chan could happen. + clientLock sync.RWMutex + asyncClient sarama.AsyncProducer + syncClient sarama.SyncProducer + // producersReleased records whether asyncClient and syncClient have been closed properly + producersReleased bool + topic string + partitionNum int32 + + partitionOffset []struct { + flushed uint64 + sent uint64 + } + flushedNotifier *notify.Notifier + flushedReceiver *notify.Receiver + + failpointCh chan error + + closeCh chan struct{} + // atomic flag indicating whether the producer is closing + closing kafkaProducerClosingFlag +} + +type kafkaProducerClosingFlag = int32 + +func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error { + k.clientLock.RLock() + defer k.clientLock.RUnlock() + + // Checks whether the producer is closing. + // The atomic flag must be checked under `clientLock.RLock()` + if atomic.LoadInt32(&k.closing) == kafkaProducerClosing { + return nil + } + + msg := &sarama.ProducerMessage{ + Topic: k.topic, + Key: sarama.ByteEncoder(message.Key), + Value: sarama.ByteEncoder(message.Value), + Partition: partition, + } + msg.Metadata = atomic.AddUint64(&k.partitionOffset[partition].sent, 1) + + failpoint.Inject("KafkaSinkAsyncSendError", func() { + // simulate sending message to input channel successfully but flushing + // message to Kafka meets error + log.Info("failpoint error injected") + k.failpointCh <- errors.New("kafka sink injected error") + failpoint.Return(nil) + }) + + failpoint.Inject("SinkFlushDMLPanic", func() { + time.Sleep(time.Second) + log.Panic("SinkFlushDMLPanic") + }) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-k.closeCh: + return nil + case k.asyncClient.Input() <- msg: + } + return nil +} + +func (k *kafkaSaramaProducer) SyncBroadcastMessage(ctx context.Context, message *codec.MQMessage) error { + k.clientLock.RLock() + defer k.clientLock.RUnlock() + msgs := make([]*sarama.ProducerMessage, k.partitionNum) + for i := 0; i < int(k.partitionNum); i++ { + msgs[i] = &sarama.ProducerMessage{ + Topic: k.topic, + Key: sarama.ByteEncoder(message.Key), + Value: sarama.ByteEncoder(message.Value), + Partition: int32(i), + } + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-k.closeCh: + return nil + default: + err := k.syncClient.SendMessages(msgs) + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) + } +} + +func (k *kafkaSaramaProducer) Flush(ctx context.Context) error { + targetOffsets := make([]uint64, k.partitionNum) + for i := 0; i < len(k.partitionOffset); i++ { + targetOffsets[i] = atomic.LoadUint64(&k.partitionOffset[i].sent) + } + + noEventsToFLush := true + for i, target := range targetOffsets { + if target > atomic.LoadUint64(&k.partitionOffset[i].flushed) { + noEventsToFLush = false + break + } + } + if noEventsToFLush { + // no events to flush + return nil + } + + // checkAllPartitionFlushed checks whether data in each partition is flushed + checkAllPartitionFlushed := func() bool { + for i, target := range targetOffsets { + if target > atomic.LoadUint64(&k.partitionOffset[i].flushed) { + return false + } + } + return true + } + +flushLoop: + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-k.closeCh: + if checkAllPartitionFlushed() { + return nil + } + return cerror.ErrKafkaFlushUnfinished.GenWithStackByArgs() + case <-k.flushedReceiver.C: + if !checkAllPartitionFlushed() { + continue flushLoop + } + return nil + } + } +} + +func (k *kafkaSaramaProducer) GetPartitionNum() int32 { + return k.partitionNum +} + +// stop closes the closeCh to signal other routines to exit +// It SHOULD NOT be called under `clientLock`. +func (k *kafkaSaramaProducer) stop() { + if atomic.SwapInt32(&k.closing, kafkaProducerClosing) == kafkaProducerClosing { + return + } + log.Info("kafka producer closing...") + close(k.closeCh) +} + +// Close closes the sync and async clients. +func (k *kafkaSaramaProducer) Close() error { + k.stop() + + k.clientLock.Lock() + defer k.clientLock.Unlock() + + if k.producersReleased { + // We need to guard against double closing the clients, + // which could lead to panic. + return nil + } + k.producersReleased = true + // In fact close sarama sync client doesn't return any error. + // But close async client returns error if error channel is not empty, we + // don't populate this error to the upper caller, just add a log here. + err1 := k.syncClient.Close() + err2 := k.asyncClient.Close() + if err1 != nil { + log.Error("close sync client with error", zap.Error(err1)) + } + if err2 != nil { + log.Error("close async client with error", zap.Error(err2)) + } + return nil +} + +func (k *kafkaSaramaProducer) run(ctx context.Context) error { + defer func() { + k.flushedReceiver.Stop() + k.stop() + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-k.closeCh: + return nil + case err := <-k.failpointCh: + log.Warn("receive from failpoint chan", zap.Error(err)) + return err + case msg := <-k.asyncClient.Successes(): + if msg == nil || msg.Metadata == nil { + continue + } + flushedOffset := msg.Metadata.(uint64) + atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) + k.flushedNotifier.Notify() + case err := <-k.asyncClient.Errors(): + // We should not wrap a nil pointer if the pointer is of a subtype of `error` + // because Go would store the type info and the resulted `error` variable would not be nil, + // which will cause the pkg/error library to malfunction. + if err == nil { + return nil + } + return cerror.WrapError(cerror.ErrKafkaAsyncSendMessage, err) + } + } +} + +var ( + newSaramaConfigImpl = newSaramaConfig + NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient +) + +// NewKafkaSaramaProducer creates a kafka sarama producer +func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { + log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) + cfg, err := newSaramaConfigImpl(ctx, config) + if err != nil { + return nil, err + } + + admin, err := NewAdminClientImpl(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + defer func() { + if err := admin.Close(); err != nil { + log.Warn("close kafka cluster admin failed", zap.Error(err)) + } + }() + + if err := validateMaxMessageBytesAndCreateTopic(admin, topic, config, cfg, opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + notifier := new(notify.Notifier) + flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, err + } + k := &kafkaSaramaProducer{ + asyncClient: asyncClient, + syncClient: syncClient, + topic: topic, + partitionNum: config.PartitionNum, + partitionOffset: make([]struct { + flushed uint64 + sent uint64 + }, config.PartitionNum), + flushedNotifier: notifier, + flushedReceiver: flushedReceiver, + closeCh: make(chan struct{}), + failpointCh: make(chan error, 1), + closing: kafkaProducerRunning, + } + go func() { + if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled { + select { + case <-ctx.Done(): + return + case errCh <- err: + default: + log.Error("error channel is full", zap.Error(err)) + } + } + }() + return k, nil +} + +var ( + validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) + commonInvalidChar = regexp.MustCompile(`[\?:,"]`) +) + +func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) { + if configuredClientID != "" { + clientID = configuredClientID + } else { + clientID = fmt.Sprintf("TiKVCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID) + clientID = commonInvalidChar.ReplaceAllString(clientID, "_") + } + if !validClientID.MatchString(clientID) { + return "", cerror.ErrKafkaInvalidClientID.GenWithStackByArgs(clientID) + } + return +} + +func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config, opts map[string]string) error { + topics, err := admin.ListTopics() + if err != nil { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + info, exists := topics[topic] + // once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid. + if exists { + // make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes` + topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + if topicMaxMessageBytes < config.MaxMessageBytes { + log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+ + "use topic's `max.message.bytes` to initialize the Kafka producer", + zap.Int("max.message.bytes", topicMaxMessageBytes), + zap.Int("max-message-bytes", config.MaxMessageBytes)) + saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes + } + opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes) + + // no need to create the topic, but we would have to log user if they found enter wrong topic name later + if config.AutoCreate { + log.Warn("topic already exist, TiKV CDC will not create the topic", + zap.String("topic", topic), zap.Any("detail", info)) + } + + if err := config.setPartitionNum(info.NumPartitions); err != nil { + return errors.Trace(err) + } + + return nil + } + + if !config.AutoCreate { + return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found") + } + + brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin) + if err != nil { + log.Warn("TiKV CDC cannot find `message.max.bytes` from broker's configuration") + return errors.Trace(err) + } + + // when create the topic, `max.message.bytes` is decided by the broker, + // it would use broker's `message.max.bytes` to set topic's `max.message.bytes`. + // TiKV CDC need to make sure that the producer's `MaxMessageBytes` won't larger than + // broker's `message.max.bytes`. + if brokerMessageMaxBytes < config.MaxMessageBytes { + log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+ + "use broker's `message.max.bytes` to initialize the Kafka producer", + zap.Int("message.max.bytes", brokerMessageMaxBytes), + zap.Int("max-message-bytes", config.MaxMessageBytes)) + saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes + } + opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes) + + // topic not exists yet, and user does not specify the `partition-num` in the sink uri. + if config.PartitionNum == 0 { + config.PartitionNum = defaultPartitionNum + log.Warn("partition-num is not set, use the default partition count", + zap.String("topic", topic), zap.Int32("partitions", config.PartitionNum)) + } + + err = admin.CreateTopic(topic, &sarama.TopicDetail{ + NumPartitions: config.PartitionNum, + ReplicationFactor: config.ReplicationFactor, + }, false) + // TODO identify the cause of "Topic with this name already exists" + if err != nil && !strings.Contains(err.Error(), "already exists") { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + log.Info("TiKV CDC create the topic", + zap.Int32("partition-num", config.PartitionNum), + zap.Int16("replication-factor", config.ReplicationFactor)) + + return nil +} + +func getBrokerMessageMaxBytes(admin kafka.ClusterAdminClient) (int, error) { + _, controllerID, err := admin.DescribeCluster() + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + configEntries, err := admin.DescribeConfig(sarama.ConfigResource{ + Type: sarama.BrokerResource, + Name: strconv.Itoa(int(controllerID)), + ConfigNames: []string{kafka.BrokerMessageMaxBytesConfigName}, + }) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + if len(configEntries) == 0 || configEntries[0].Name != kafka.BrokerMessageMaxBytesConfigName { + return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack( + "since cannot find the `message.max.bytes` from the broker's configuration, " + + "tikv cdc decline to create the topic and changefeed to prevent potential error") + } + + result, err := strconv.Atoi(configEntries[0].Value) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + return result, nil +} + +func getTopicMaxMessageBytes(admin kafka.ClusterAdminClient, info sarama.TopicDetail) (int, error) { + if a, ok := info.ConfigEntries[kafka.TopicMaxMessageBytesConfigName]; ok { + result, err := strconv.Atoi(*a) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + return result, nil + } + + return getBrokerMessageMaxBytes(admin) +} diff --git a/cdc/cdc/sink/producer/kafka/kafka_test.go b/cdc/cdc/sink/producer/kafka/kafka_test.go new file mode 100644 index 00000000..98bc6c39 --- /dev/null +++ b/cdc/cdc/sink/producer/kafka/kafka_test.go @@ -0,0 +1,445 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/tikv/migration/cdc/cdc/sink/codec" + "github.com/tikv/migration/cdc/pkg/kafka" + "github.com/tikv/migration/cdc/pkg/util/testleak" +) + +type kafkaSuite struct{} + +var _ = check.Suite(&kafkaSuite{}) + +func Test(t *testing.T) { check.TestingT(t) } + +func (s *kafkaSuite) TestClientID(c *check.C) { + defer testleak.AfterTest(c)() + testCases := []struct { + role string + addr string + changefeedID string + configuredID string + hasError bool + expected string + }{ + {"owner", "domain:1234", "123-121-121-121", "", false, "TiKVCDC_sarama_producer_owner_domain_1234_123-121-121-121"}, + {"owner", "127.0.0.1:1234", "123-121-121-121", "", false, "TiKVCDC_sarama_producer_owner_127.0.0.1_1234_123-121-121-121"}, + {"owner", "127.0.0.1:1234?:,\"", "123-121-121-121", "", false, "TiKVCDC_sarama_producer_owner_127.0.0.1_1234_____123-121-121-121"}, + {"owner", "中文", "123-121-121-121", "", true, ""}, + {"owner", "127.0.0.1:1234", "123-121-121-121", "cdc-changefeed-1", false, "cdc-changefeed-1"}, + } + for _, tc := range testCases { + id, err := kafkaClientID(tc.role, tc.addr, tc.changefeedID, tc.configuredID) + if tc.hasError { + c.Assert(err, check.NotNil) + } else { + c.Assert(err, check.IsNil) + c.Assert(id, check.Equals, tc.expected) + } + } +} + +func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithCancel(context.Background()) + + topic := kafka.DefaultMockTopicName + leader := sarama.NewMockBroker(c, 2) + defer leader.Close() + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + leader.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) + prodSuccess.AddTopicPartition(topic, 1, sarama.ErrNoError) + // 200 async messages and 2 sync message, Kafka flush could be in batch, + // we can set flush.max.messages to 1 to control message count exactly. + for i := 0; i < 202; i++ { + leader.Returns(prodSuccess) + } + + errCh := make(chan error, 1) + config := NewConfig() + // Because the sarama mock broker is not compatible with version larger than 1.0.0 + // We use a smaller version in the following producer tests. + // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 + config.Version = "0.9.0.0" + config.PartitionNum = int32(2) + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") + + newSaramaConfigImplBak := newSaramaConfigImpl + newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { + cfg, err := newSaramaConfigImplBak(ctx, config) + c.Assert(err, check.IsNil) + cfg.Producer.Flush.MaxMessages = 1 + return cfg, err + } + NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + NewAdminClientImpl = kafka.NewSaramaAdminClient + }() + + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(err, check.IsNil) + c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) + c.Assert(opts, check.HasKey, "max-message-bytes") + for i := 0; i < 100; i++ { + err = producer.AsyncSendMessage(ctx, &codec.MQMessage{ + Key: []byte("test-key-1"), + Value: []byte("test-value"), + }, int32(0)) + c.Assert(err, check.IsNil) + err = producer.AsyncSendMessage(ctx, &codec.MQMessage{ + Key: []byte("test-key-1"), + Value: []byte("test-value"), + }, int32(1)) + c.Assert(err, check.IsNil) + } + + // In TiCDC logic, resolved ts event will always notify the flush loop. Here we + // trigger the flushedNotifier periodically to prevent the flush loop block. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Millisecond * 100): + producer.flushedNotifier.Notify() + } + } + }() + + err = producer.Flush(ctx) + c.Assert(err, check.IsNil) + expected := []struct { + flushed uint64 + sent uint64 + }{ + {100, 100}, + {100, 100}, + } + c.Assert(producer.partitionOffset, check.DeepEquals, expected) + select { + case err := <-errCh: + c.Fatalf("unexpected err: %s", err) + default: + } + // check no events to flush + err = producer.Flush(ctx) + c.Assert(err, check.IsNil) + + err = producer.SyncBroadcastMessage(ctx, &codec.MQMessage{ + Key: []byte("test-broadcast"), + Value: nil, + }) + c.Assert(err, check.IsNil) + + err = producer.Close() + c.Assert(err, check.IsNil) + // check reentrant close + err = producer.Close() + c.Assert(err, check.IsNil) + cancel() + wg.Wait() + + // check send messages when context is canceled or producer closed + err = producer.AsyncSendMessage(ctx, &codec.MQMessage{ + Key: []byte("cancel"), + Value: nil, + }, int32(0)) + if err != nil { + c.Assert(err, check.Equals, context.Canceled) + } + err = producer.SyncBroadcastMessage(ctx, &codec.MQMessage{ + Key: []byte("cancel"), + Value: nil, + }) + if err != nil { + c.Assert(err, check.Equals, context.Canceled) + } +} + +func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { + defer testleak.AfterTest(c) + config := NewConfig() + adminClient := kafka.NewClusterAdminClientMockImpl() + defer func() { + _ = adminClient.Close() + }() + + // When topic exists and max message bytes is set correctly. + config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() + cfg, err := newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + opts := make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) + c.Assert(err, check.IsNil) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) + + // When topic exists and max message bytes is not set correctly. + // use the smaller one. + defaultMaxMessageBytes := adminClient.GetDefaultMaxMessageBytes() + config.MaxMessageBytes = defaultMaxMessageBytes + 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) + + config.MaxMessageBytes = defaultMaxMessageBytes - 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) + + // When topic does not exist and auto-create is not enabled. + config.AutoCreate = false + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "non-exist", config, cfg, opts) + c.Assert( + errors.Cause(err), + check.ErrorMatches, + ".*auto-create-topic` is false, and topic not found.*", + ) + + // When the topic does not exist, use the broker's configuration to create the topic. + // It is less than the value of broker. + config.AutoCreate = true + config.MaxMessageBytes = defaultMaxMessageBytes - 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-success", config, cfg, opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) + + // When the topic does not exist, use the broker's configuration to create the topic. + // It is larger than the value of broker. + config.MaxMessageBytes = defaultMaxMessageBytes + 1 + config.AutoCreate = true + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-fail", config, cfg, opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) + + // When the topic exists, but the topic does not store max message bytes info, + // the check of parameter succeeds. + // It is less than the value of broker. + config.MaxMessageBytes = defaultMaxMessageBytes - 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + detail := &sarama.TopicDetail{ + NumPartitions: 3, + // Does not contain max message bytes information. + ConfigEntries: make(map[string]*string), + } + err = adminClient.CreateTopic("test-topic", detail, false) + c.Assert(err, check.IsNil) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg, opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) + + // When the topic exists, but the topic does not store max message bytes info, + // the check of parameter fails. + // It is larger than the value of broker. + config.MaxMessageBytes = defaultMaxMessageBytes + 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg, opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) +} + +func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { + defer testleak.AfterTest(c)() + ctx := context.Background() + errCh := make(chan error, 1) + config := NewConfig() + config.Version = "invalid" + config.BrokerEndpoints = []string{"127.0.0.1:1111"} + topic := "topic" + NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + NewAdminClientImpl = kafka.NewSaramaAdminClient + }() + opts := make(map[string]string) + _, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") +} + +func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { + defer testleak.AfterTest(c)() + topic := kafka.DefaultMockTopicName + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + leader := sarama.NewMockBroker(c, 2) + defer leader.Close() + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + leader.Returns(metadataResponse) + + config := NewConfig() + // Because the sarama mock broker is not compatible with version larger than 1.0.0 + // We use a smaller version in the following producer tests. + // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 + config.Version = "0.9.0.0" + config.PartitionNum = int32(2) + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") + + NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + NewAdminClientImpl = kafka.NewSaramaAdminClient + }() + + newSaramaConfigImplBak := newSaramaConfigImpl + newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { + cfg, err := newSaramaConfigImplBak(ctx, config) + c.Assert(err, check.IsNil) + cfg.Producer.Flush.MaxMessages = 1 + cfg.Producer.Retry.Max = 2 + cfg.Producer.MaxMessageBytes = 8 + return cfg, err + } + defer func() { + newSaramaConfigImpl = newSaramaConfigImplBak + }() + + errCh := make(chan error, 1) + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(opts, check.HasKey, "max-message-bytes") + defer func() { + err := producer.Close() + c.Assert(err, check.IsNil) + }() + + c.Assert(err, check.IsNil) + c.Assert(producer, check.NotNil) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 20; i++ { + err = producer.AsyncSendMessage(ctx, &codec.MQMessage{ + Key: []byte("test-key-1"), + Value: []byte("test-value"), + }, int32(0)) + c.Assert(err, check.IsNil) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + c.Fatal("TestProducerSendMessageFailed timed out") + case err := <-errCh: + c.Assert(err, check.ErrorMatches, ".*too large.*") + } + }() + + wg.Wait() +} + +func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { + defer testleak.AfterTest(c)() + topic := kafka.DefaultMockTopicName + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + leader := sarama.NewMockBroker(c, 2) + defer leader.Close() + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + leader.Returns(metadataResponse) + + config := NewConfig() + // Because the sarama mock broker is not compatible with version larger than 1.0.0 + // We use a smaller version in the following producer tests. + // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 + config.Version = "0.9.0.0" + config.PartitionNum = int32(2) + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") + + NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + NewAdminClientImpl = kafka.NewSaramaAdminClient + }() + + errCh := make(chan error, 1) + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(opts, check.HasKey, "max-message-bytes") + defer func() { + err := producer.Close() + c.Assert(err, check.IsNil) + }() + + c.Assert(err, check.IsNil) + c.Assert(producer, check.NotNil) + + err = producer.Close() + c.Assert(err, check.IsNil) + + err = producer.Close() + c.Assert(err, check.IsNil) +} diff --git a/cdc/cdc/sink/producer/mq_producer.go b/cdc/cdc/sink/producer/mq_producer.go new file mode 100644 index 00000000..e406af7d --- /dev/null +++ b/cdc/cdc/sink/producer/mq_producer.go @@ -0,0 +1,35 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package producer + +import ( + "context" + + "github.com/tikv/migration/cdc/cdc/sink/codec" +) + +// Producer is an interface of mq producer +type Producer interface { + // AsyncSendMessage sends a message asynchronously. + AsyncSendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error + // SyncBroadcastMessage broadcasts a message synchronously. + SyncBroadcastMessage(ctx context.Context, message *codec.MQMessage) error + // Flush all the messages buffered in the client and wait until all messages have been successfully + // persisted. + Flush(ctx context.Context) error + // GetPartitionNum gets partition number of topic. + GetPartitionNum() int32 + // Close closes the producer and client(s). + Close() error +} diff --git a/cdc/cdc/sink/sink.go b/cdc/cdc/sink/sink.go index 3e4153bd..224ce840 100644 --- a/cdc/cdc/sink/sink.go +++ b/cdc/cdc/sink/sink.go @@ -69,7 +69,10 @@ type Sink interface { Barrier(ctx context.Context, keyspanID model.KeySpanID) error } -var sinkIniterMap = make(map[string]sinkInitFunc) +var ( + sinkIniterMap = make(map[string]sinkInitFunc) + sinkURICheckerMap = make(map[string]sinkInitFunc) +) type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *config.ReplicaConfig, map[string]string, chan error) (Sink, error) @@ -81,11 +84,33 @@ func init() { return newBlackHoleSink(ctx, opts), nil } + // register tikv sink sinkIniterMap["tikv"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, config *config.ReplicaConfig, opts map[string]string, errCh chan error, ) (Sink, error) { return newTiKVSink(ctx, sinkURI, config, opts, errCh) } + sinkURICheckerMap["tikv"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { + _, _, err := parseTiKVUri(sinkURI, opts) + return nil, err + } + + // register kafka sink + sinkIniterMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { + return newKafkaSaramaSink(ctx, sinkURI, config, opts, errCh) + } + sinkURICheckerMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { + _, _, err := parseKafkaSinkConfig(sinkURI, config, opts) + return nil, err + } + sinkIniterMap["kafka+ssl"] = sinkIniterMap["kafka"] + sinkURICheckerMap["kafka+ssl"] = sinkURICheckerMap["kafka"] } // New creates a new sink with the sink-uri @@ -103,17 +128,33 @@ func New(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string } // Validate sink if given valid parameters. -func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig, opts map[string]string) error { +func Validate(ctx context.Context, sinkURIStr string, cfg *config.ReplicaConfig, opts map[string]string) error { errCh := make(chan error) - // TODO: find a better way to verify a sinkURI is valid - s, err := New(ctx, "sink-verify", sinkURI, cfg, opts, errCh) + + // parse sinkURI as a URI + sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return err + return cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := strings.ToLower(sinkURI.Scheme) + newSink, ok := sinkURICheckerMap[scheme] + if !ok { + newSink, ok = sinkIniterMap[scheme] + if !ok { + return cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", sinkURI.Scheme) + } } - err = s.Close(ctx) + + s, err := newSink(ctx, "sink-verify", sinkURI, cfg, opts, errCh) if err != nil { return err } + if s != nil { + err = s.Close(ctx) + if err != nil { + return err + } + } select { case err = <-errCh: if err != nil { diff --git a/cdc/cdc/sink/sink_test.go b/cdc/cdc/sink/sink_test.go index 5dfafa54..f9877fee 100644 --- a/cdc/cdc/sink/sink_test.go +++ b/cdc/cdc/sink/sink_test.go @@ -17,12 +17,15 @@ import ( "context" "testing" + "github.com/pingcap/check" "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/tikv/migration/cdc/pkg/config" "github.com/tikv/migration/cdc/pkg/util/testleak" ) +func TestSink(t *testing.T) { check.TestingT(t) } + func TestValidateSink(t *testing.T) { defer testleak.AfterTestT(t)() @@ -38,6 +41,7 @@ func TestValidateSink(t *testing.T) { "parse \"tikv://127.0.0.1:3306a/\": invalid port \":3306a\" after host", "parse \"tikv://127.0.0.1:3306, tikv://127.0.0.1:3307/\": invalid character \" \" in host name", "parse \"tikv://hostname:3306x\": invalid port \":3306x\" after host", + "[CDC:ErrKafkaInvalidConfig]no topic is specified in sink-uri", } testCases := []struct { @@ -45,21 +49,35 @@ func TestValidateSink(t *testing.T) { hasError bool expectedErr string }{ - {"tikv://127.0.0.1:3306/", true, expectedErrs[0]}, - {"tikv://127.0.0.1:3306/?concurrency=4", true, expectedErrs[0]}, + // tikv + {"tikv://127.0.0.1:3306/", false, ""}, + {"tikv://127.0.0.1:3306/?concurrency=4", false, ""}, {"blackhole://", false, ""}, - {"tikv://127.0.0.1:3306,127.0.0.1:3307/", true, expectedErrs[0]}, - {"tikv://hostname:3306", true, expectedErrs[0]}, + {"tikv://127.0.0.1:3306,127.0.0.1:3307/", false, ""}, + {"tikv://hostname:3306", false, ""}, {"tikv://http://127.0.0.1:3306/", true, expectedErrs[1]}, {"tikv://127.0.0.1:3306a/", true, expectedErrs[2]}, {"tikv://127.0.0.1:3306, tikv://127.0.0.1:3307/", true, expectedErrs[3]}, {"tikv://hostname:3306x", true, expectedErrs[4]}, + // kafka + {"kafka://127.0.0.1:9092/topic-name?protocol=open-protocol&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1", false, ""}, + {"kafka://127.0.0.1:9092/?partition-num=6", true, expectedErrs[5]}, } for _, tc := range testCases { err := Validate(ctx, tc.sinkURI, replicateConfig, opts) if tc.hasError { - require.Equal(t, tc.expectedErr, errors.Cause(err).Error()) + require.Error(t, err) + + // Test case kafka://127.0.0.1:9092/?partition-num=6 return an error with errors.Cause(err) == nil, which should not happen. + // TODO: fix this case + var msg string + if errors.Cause(err) != nil { + msg = errors.Cause(err).Error() + } else { + msg = err.Error() + } + require.Equal(t, tc.expectedErr, msg) } else { require.NoError(t, err) } diff --git a/cdc/errors.toml b/cdc/errors.toml index 77899d95..cc51956b 100755 --- a/cdc/errors.toml +++ b/cdc/errors.toml @@ -391,9 +391,9 @@ error = ''' json codec invalid data ''' -["CDC:ErrJSONCodecRowTooLarge"] +["CDC:ErrJSONCodecKvTooLarge"] error = ''' -json codec single row too large +json codec single key-value too large ''' ["CDC:ErrKVStorageBackoffFailed"] diff --git a/cdc/pkg/errors/errors.go b/cdc/pkg/errors/errors.go index 1f5b2413..abaf9394 100644 --- a/cdc/pkg/errors/errors.go +++ b/cdc/pkg/errors/errors.go @@ -108,7 +108,7 @@ var ( ErrMaxwellDecodeFailed = errors.Normalize("maxwell decode failed", errors.RFCCodeText("CDC:ErrMaxwellDecodeFailed")) ErrMaxwellInvalidData = errors.Normalize("maxwell invalid data", errors.RFCCodeText("CDC:ErrMaxwellInvalidData")) ErrJSONCodecInvalidData = errors.Normalize("json codec invalid data", errors.RFCCodeText("CDC:ErrJSONCodecInvalidData")) - ErrJSONCodecRowTooLarge = errors.Normalize("json codec single row too large", errors.RFCCodeText("CDC:ErrJSONCodecRowTooLarge")) + ErrJSONCodecKvTooLarge = errors.Normalize("json codec single key-value too large", errors.RFCCodeText("CDC:ErrJSONCodecKvTooLarge")) ErrCanalDecodeFailed = errors.Normalize("canal decode failed", errors.RFCCodeText("CDC:ErrCanalDecodeFailed")) ErrCanalEncodeFailed = errors.Normalize("canal encode failed", errors.RFCCodeText("CDC:ErrCanalEncodeFailed")) ErrOldValueNotEnabled = errors.Normalize("old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled"))