Skip to content
This repository has been archived by the owner on Jan 23, 2025. It is now read-only.

Commit

Permalink
DBZ-923 Misc. clean-up;
Browse files Browse the repository at this point in the history
* Exposing new option via MySqlConnectorConfig
* Formatting
* Using GtidSet type in signatures
* Adding new option to configDef() and ALL_FIELDS
  • Loading branch information
gunnarmorling committed Nov 22, 2018
1 parent 972c08f commit 2fd16a6
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 38 deletions.
1 change: 1 addition & 0 deletions COPYRIGHT.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Denis Mikhaylov
Dennis Persson
Duncan Sands
Echo Xu
Eero Koplimets
Emrul Islam
Eric S. Kreiseir
Ewen Cheslack-Postava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ protected void doStart() {
GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);

// also take into account purged GTID logs
String purgedServerGtidStr = connectionContext.purgedGtidSet();
GtidSet purgedServerGtidSet = new GtidSet(purgedServerGtidStr);
GtidSet purgedServerGtidSet = connectionContext.purgedGtidSet();
logger.info("GTID set purged on server: {}", purgedServerGtidSet);

GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* A set of MySQL GTIDs. This is an improvement of {@link com.github.shyiko.mysql.binlog.GtidSet} that is immutable,
* and more properly supports comparisons.
*
*
* @author Randall Hauch
*/
@Immutable
Expand Down Expand Up @@ -52,7 +52,7 @@ public GtidSet(String gtids) {
/**
* Obtain a copy of this {@link GtidSet} except with only the GTID ranges that have server UUIDs that match the given
* predicate.
*
*
* @param sourceFilter the predicate that returns whether a server UUID is to be included
* @return the new GtidSet, or this object if {@code sourceFilter} is null; never null
*/
Expand All @@ -67,7 +67,7 @@ public GtidSet retainAll(Predicate<String> sourceFilter) {

/**
* Get an immutable collection of the {@link UUIDSet range of GTIDs for a single server}.
*
*
* @return the {@link UUIDSet GTID ranges for each server}; never null
*/
public Collection<UUIDSet> getUUIDSets() {
Expand All @@ -76,7 +76,7 @@ public Collection<UUIDSet> getUUIDSets() {

/**
* Find the {@link UUIDSet} for the server with the specified Uuid.
*
*
* @param uuid the Uuid of the server
* @return the {@link UUIDSet} for the identified server, or {@code null} if there are no GTIDs from that server.
*/
Expand All @@ -86,7 +86,7 @@ public UUIDSet forServerWithId(String uuid) {

/**
* Determine if the GTIDs represented by this object are contained completely within the supplied set of GTIDs.
*
*
* @param other the other set of GTIDs; may be null
* @return {@code true} if all of the GTIDs in this set are completely contained within the supplied set of GTIDs, or
* {@code false} otherwise
Expand Down Expand Up @@ -118,14 +118,13 @@ public GtidSet with(GtidSet other) {
* Returns a copy with all intervals set to beginning
* @return
*/
public GtidSet getGTIDSetBeginning() {
public GtidSet getGtidSetBeginning() {
Map<String, UUIDSet> newSet = new HashMap<>();

for (UUIDSet uuidSet : uuidSetsByServerId.values()) {
newSet.put(uuidSet.getUUID(), uuidSet.asIntervalBeginning());
}


return new GtidSet(newSet);
}

Expand Down Expand Up @@ -159,8 +158,8 @@ public String toString() {
@Immutable
public static class UUIDSet {

private String uuid;
private LinkedList<Interval> intervals = new LinkedList<>();
private final String uuid;
private final LinkedList<Interval> intervals = new LinkedList<>();

protected UUIDSet(com.github.shyiko.mysql.binlog.GtidSet.UUIDSet uuidSet) {
this.uuid = uuidSet.getUUID();
Expand Down Expand Up @@ -192,7 +191,7 @@ public UUIDSet asIntervalBeginning() {

/**
* Get the Uuid for the server that generated the GTIDs.
*
*
* @return the server's Uuid; never null
*/
public String getUUID() {
Expand All @@ -201,7 +200,7 @@ public String getUUID() {

/**
* Get the intervals of transaction numbers.
*
*
* @return the immutable transaction intervals; never null
*/
public List<Interval> getIntervals() {
Expand All @@ -211,7 +210,7 @@ public List<Interval> getIntervals() {
/**
* Determine if the set of transaction numbers from this server is completely within the set of transaction numbers from
* the set of transaction numbers in the supplied set.
*
*
* @param other the set to compare with this set
* @return {@code true} if this server's transaction numbers are a subset of the transaction numbers of the supplied set,
* or false otherwise
Expand Down Expand Up @@ -284,7 +283,7 @@ public Interval(long start, long end) {

/**
* Get the starting transaction number in this interval.
*
*
* @return this interval's first transaction number
*/
public long getStart() {
Expand All @@ -293,7 +292,7 @@ public long getStart() {

/**
* Get the ending transaction number in this interval.
*
*
* @return this interval's last transaction number
*/
public long getEnd() {
Expand All @@ -302,7 +301,7 @@ public long getEnd() {

/**
* Determine if this interval is completely within the supplied interval.
*
*
* @param other the interval to compare with
* @return {@code true} if the {@link #getStart() start} is greater than or equal to the supplied interval's
* {@link #getStart() start} and the {@link #getEnd() end} is less than or equal to the supplied interval's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ public static DdlParsingMode parse(String value, String defaultValue) {
/**
* If set to 'latest', connector when encountering new GTID channel after job restart will start reading it from the
* latest executed position (default). When set to 'earliest' the connector will start reading new GTID channels from the first available position.
* This is useful when in active-passive mysql setup during failover new GTID channel starts receiving writes, see #DBZ-923
* This is useful when in active-passive mysql setup during failover new GTID channel starts receiving writes, see DBZ-923.
*
* Defaults to latest.
*/
Expand All @@ -757,7 +757,6 @@ public static DdlParsingMode parse(String value, String defaultValue) {
.withImportance(Importance.MEDIUM)
.withDescription("If set to 'latest', when connector sees new GTID, it will start consuming gtid channel from the server latest executed gtid position. If 'earliest' connector starts reading channel from first available (not purged) gtid position on the server.");


public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms")
.withDisplayName("Connection Timeout (ms)")
.withType(Type.INT)
Expand Down Expand Up @@ -1008,6 +1007,7 @@ public static final Field MASK_COLUMN(int length) {
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, SNAPSHOT_LOCKING_MODE,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
GTID_SOURCE_FILTER_DML_EVENTS,
GTID_NEW_CHANNEL_POSITION,
TIME_PRECISION_MODE, RelationalDatabaseConnectorConfig.DECIMAL_HANDLING_MODE,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER,
Expand All @@ -1034,6 +1034,7 @@ public static final Field MASK_COLUMN(int length) {

private final SnapshotLockingMode snapshotLockingMode;
private final DdlParsingMode ddlParsingMode;
private final GtidNewChannelPosition gitIdNewChannelPosition;

public MySqlConnectorConfig(Configuration config) {
super(
Expand All @@ -1058,6 +1059,9 @@ public MySqlConnectorConfig(Configuration config) {

String ddlParsingModeStr = config.getString(MySqlConnectorConfig.DDL_PARSER_MODE);
this.ddlParsingMode = DdlParsingMode.parse(ddlParsingModeStr, MySqlConnectorConfig.DDL_PARSER_MODE.defaultValueAsString());

String gitIdNewChannelPosition = config.getString(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION);
this.gitIdNewChannelPosition = GtidNewChannelPosition.parse(gitIdNewChannelPosition, MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION.defaultValueAsString());
}

public SnapshotLockingMode getSnapshotLockingMode() {
Expand All @@ -1068,6 +1072,10 @@ public DdlParsingMode getDdlParsingMode() {
return ddlParsingMode;
}

public GtidNewChannelPosition gtidNewChannelPosition() {
return gitIdNewChannelPosition;
}

protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID,
Expand All @@ -1079,7 +1087,7 @@ protected static ConfigDef configDef() {
DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, INCLUDE_SQL_QUERY, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, BUFFER_SIZE_FOR_BINLOG_READER,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, GTID_NEW_CHANNEL_POSITION, BUFFER_SIZE_FOR_BINLOG_READER,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX, EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,24 +178,29 @@ public String knownGtidSet() {
}

/**
* Get the purged gtid values from MySQL (gtid_purged value)
* Get the purged GTID values from MySQL (gtid_purged value)
*
* @return string representation of GTID set or empty string
* @return A GTID set; may be empty if not using GTIDs or none have been purged yet
*/
public String purgedGtidSet() {
public GtidSet purgedGtidSet() {
AtomicReference<String> gtidSetStr = new AtomicReference<String>();
try {
jdbc.query("SHOW GLOBAL VARIABLES LIKE \"gtid_purged\"", rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 1) {
gtidSetStr.set(rs.getString(2));// GTID set, may be null, blank, or contain a GTID set
}
});
} catch (SQLException e) {
}
catch (SQLException e) {
throw new ConnectException("Unexpected error while connecting to MySQL and looking at gtid_purged variable: ", e);
}

String result = gtidSetStr.get();
return result != null ? result : "";
if (result == null) {
result = "";
}

return new GtidSet(result);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.function.Predicates;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
Expand Down Expand Up @@ -241,12 +241,6 @@ protected SnapshotMode snapshotMode() {
return SnapshotMode.parse(value, MySqlConnectorConfig.SNAPSHOT_MODE.defaultValueAsString());
}

protected GtidNewChannelPosition gtidNewChannelPosition() {
String value = config.getString(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION);
return GtidNewChannelPosition.parse(value, MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION.defaultValueAsString());
}


public String getSnapshotSelectOverrides() {
return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
Expand Down Expand Up @@ -321,13 +315,14 @@ public GtidSet filterGtidSet(GtidSet availableServerGtidSet, GtidSet purgedServe

GtidSet mergedGtidSet;

if (this.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
LOGGER.info("Using first available positions for new GTID channels");
mergedGtidSet = availableServerGtidSet
.getGTIDSetBeginning()
.getGtidSetBeginning()
.with(purgedServerGtid)
.with(filteredGtidSet);
} else {
}
else {
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import org.junit.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.doc.FixFor;
import io.debezium.document.Document;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecord;
Expand Down Expand Up @@ -238,16 +239,19 @@ public void shouldFilterAndMergeGtidSet() throws Exception {
}

@Test
@FixFor("DBZ-923")
public void shouldMergeToFirstAvailableGtidSetPositions() throws Exception {
String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:5-41";

String availableServerGtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "123e4567-e89b-12d3-a456-426655440000:1-41";

String purgedServerGtidStr = "7145bf69-d1ca-11e5-a588-0242ac110004:1-1234";

config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c")
config = simpleConfig()
.with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, "036d85a9-64e5-11e6-9b48-42010af0000c")
.with(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION, GtidNewChannelPosition.EARLIEST)
.build();

Expand Down

0 comments on commit 2fd16a6

Please sign in to comment.