diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java index 801e27d6b89..2299259c08c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -233,7 +233,7 @@ protected void setSystemProperty(String property, Field field, boolean showValue * * @return the session variables that are related to sessions ssl version */ - protected String getSessionVariableForSslVersion() { + public String getSessionVariableForSslVersion() { final String sslVersion = "Ssl_version"; LOGGER.debug("Reading MySQL Session variable for Ssl Version"); Map sessionVariables = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 4e512a81c67..abdf2d721f1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -27,6 +27,9 @@ import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory; +import com.github.shyiko.mysql.binlog.network.SSLMode; +import com.github.shyiko.mysql.binlog.network.SSLSocketFactory; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnectorConfig; @@ -46,7 +49,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; + import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.X509Certificate; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -56,6 +72,8 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.function.Predicate; +import static io.debezium.util.Strings.isNullOrEmpty; + /** Utilities related to Debezium. */ public class DebeziumUtils { private static final String QUOTED_CHARACTER = "`"; @@ -93,13 +111,27 @@ public static MySqlConnection createMySqlConnection( } /** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */ - public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) { + public static BinaryLogClient createBinaryClient( + Configuration dbzConfiguration, MySqlConnection connection) { final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dbzConfiguration); - return new BinaryLogClient( - connectorConfig.hostname(), - connectorConfig.port(), - connectorConfig.username(), - connectorConfig.password()); + BinaryLogClient client = + new BinaryLogClient( + connectorConfig.hostname(), + connectorConfig.port(), + connectorConfig.username(), + connectorConfig.password()); + SSLMode sslMode = sslModeFor(connectorConfig.sslMode()); + if (sslMode != null) { + client.setSSLMode(sslMode); + } + if (connectorConfig.sslModeEnabled()) { + SSLSocketFactory sslSocketFactory = + getBinlogSslSocketFactory(connectorConfig, connection); + if (sslSocketFactory != null) { + client.setSslSocketFactory(sslSocketFactory); + } + } + return client; } /** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */ @@ -242,12 +274,101 @@ private static Map querySystemVariables( return variables; } + static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) { + try { + return mode == null ? null : SSLMode.valueOf(mode.name()); + } catch (IllegalArgumentException e) { + throw new FlinkRuntimeException( + String.format("Invalid SecureConnectionMode provided: %s ", mode.name()), e); + } + } + + // see + // flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource#getBinlogSslSocketFactory + static SSLSocketFactory getBinlogSslSocketFactory( + MySqlConnectorConfig connectorConfig, MySqlConnection connection) { + String acceptedTlsVersion = connection.getSessionVariableForSslVersion(); + if (!isNullOrEmpty(acceptedTlsVersion)) { + SSLMode sslMode = sslModeFor(connectorConfig.sslMode()); + LOG.info( + "Enable ssl {} mode for connector {}", + sslMode, + connectorConfig.getLogicalName()); + + final char[] keyPasswordArray = connection.connectionConfig().sslKeyStorePassword(); + final String keyFilename = connection.connectionConfig().sslKeyStore(); + final char[] trustPasswordArray = connection.connectionConfig().sslTrustStorePassword(); + final String trustFilename = connection.connectionConfig().sslTrustStore(); + KeyManager[] keyManagers = null; + if (keyFilename != null) { + try { + KeyStore ks = connection.loadKeyStore(keyFilename, keyPasswordArray); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509"); + kmf.init(ks, keyPasswordArray); + + keyManagers = kmf.getKeyManagers(); + } catch (KeyStoreException + | NoSuchAlgorithmException + | UnrecoverableKeyException e) { + throw new FlinkRuntimeException("Could not load keystore", e); + } + } + TrustManager[] trustManagers; + try { + KeyStore ks = null; + if (trustFilename != null) { + ks = connection.loadKeyStore(trustFilename, trustPasswordArray); + } + + if (ks == null && (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED)) { + trustManagers = + new TrustManager[] { + new X509TrustManager() { + + @Override + public void checkClientTrusted( + X509Certificate[] x509Certificates, String s) {} + + @Override + public void checkServerTrusted( + X509Certificate[] x509Certificates, String s) {} + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } + }; + } else { + TrustManagerFactory tmf = + TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ks); + trustManagers = tmf.getTrustManagers(); + } + } catch (KeyStoreException | NoSuchAlgorithmException e) { + throw new FlinkRuntimeException("Could not load truststore", e); + } + // DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that + // the accepted TLS version is passed to the constructed factory + final KeyManager[] finalKMS = keyManagers; + return new DefaultSSLSocketFactory(acceptedTlsVersion) { + + @Override + protected void initSSLContext(SSLContext sc) throws GeneralSecurityException { + sc.init(finalKMS, trustManagers, null); + } + }; + } + + return null; + } + public static BinlogOffset findBinlogOffset( long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { - MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig(); BinaryLogClient client = - new BinaryLogClient( - config.hostname(), config.port(), config.username(), config.password()); + createBinaryClient(mySqlSourceConfig.getDbzConfiguration(), connection); if (mySqlSourceConfig.getServerIdRange() != null) { client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 77682534f55..0d5301f3f84 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -39,6 +39,7 @@ import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventType; import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics; import io.debezium.pipeline.DataChangeEvent; import io.debezium.relational.TableId; @@ -96,12 +97,15 @@ public class BinlogSplitReader implements DebeziumReader sslModeProvider() { + return Stream.of( + Arguments.of(MySqlConnectorConfig.SecureConnectionMode.DISABLED, SSLMode.DISABLED), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.PREFERRED, SSLMode.PREFERRED), + Arguments.of(MySqlConnectorConfig.SecureConnectionMode.REQUIRED, SSLMode.REQUIRED), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA, SSLMode.VERIFY_CA), + Arguments.of( + MySqlConnectorConfig.SecureConnectionMode.VERIFY_IDENTITY, + SSLMode.VERIFY_IDENTITY)); + } + private void assertJdbcUrl(String expected, String actual) { // Compare after splitting to avoid the orderless jdbc parameters in jdbc url at Java 11 String[] expectedParam = expected.split("&"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 0a6417b3e62..1bc0a2ccb2b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -149,8 +149,10 @@ public void after() throws Exception { void testReadSingleBinlogSplit() throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); final DataType dataType = DataTypes.ROW( DataTypes.FIELD("id", DataTypes.BIGINT()), @@ -202,8 +204,10 @@ void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String table throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, true); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); final DataType dataType = DataTypes.ROW( @@ -300,8 +304,10 @@ private SnapshotPhaseHooks getSnapshotPhaseHooksWithPostHighWatermark(String tab void testReadAllBinlogSplitsForOneTable() throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); final DataType dataType = DataTypes.ROW( DataTypes.FIELD("id", DataTypes.BIGINT()), @@ -348,8 +354,10 @@ void testReadAllBinlogSplitsForOneTable() throws Exception { void testReadAllBinlogForTableWithSingleLine() throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); final DataType dataType = DataTypes.ROW( @@ -384,8 +392,10 @@ void testReadAllBinlogSplitsForTables() throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card", "customer_card_single_line"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); final DataType dataType = DataTypes.ROW( DataTypes.FIELD("card_no", DataTypes.BIGINT()), @@ -441,8 +451,10 @@ void testReadBinlogFromLatestOffset() throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(StartupOptions.latest(), new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); // Create reader and submit splits MySqlBinlogSplit split = createBinlogSplit(sourceConfig); @@ -488,8 +500,10 @@ void testReadBinlogWithoutGtidFromLatestOffset() throws Exception { customerDatabaseNoGtid, StartupOptions.latest(), new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); // Create reader and submit splits MySqlBinlogSplit split = createBinlogSplit(sourceConfig); @@ -531,8 +545,10 @@ void testReadBinlogFromEarliestOffset() throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(StartupOptions.earliest(), new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); // Create reader and submit splits MySqlBinlogSplit split = createBinlogSplit(sourceConfig); @@ -595,8 +611,10 @@ void testReadBinlogFromEarliestOffsetAfterSchemaChange() throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(StartupOptions.earliest(), new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); String tableId = customerDatabase.qualifiedTableName("customers"); DataType dataType = DataTypes.ROW( @@ -629,8 +647,10 @@ void testReadBinlogFromBinlogFilePosition() throws Exception { // Preparations customerDatabase.createAndInitialize(); MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + connectionConfig.getDbzConfiguration(), mySqlConnection); DataType dataType = DataTypes.ROW( DataTypes.FIELD("id", DataTypes.BIGINT()), @@ -683,8 +703,10 @@ void testSkippingEvents() throws Exception { // Preparations customerDatabase.createAndInitialize(); MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + connectionConfig.getDbzConfiguration(), mySqlConnection); DataType dataType = DataTypes.ROW( DataTypes.FIELD("id", DataTypes.BIGINT()), @@ -738,8 +760,10 @@ void testReadBinlogFromGtidSet() throws Exception { // Preparations customerDatabase.createAndInitialize(); MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + connectionConfig.getDbzConfiguration(), mySqlConnection); DataType dataType = DataTypes.ROW( DataTypes.FIELD("id", DataTypes.BIGINT()), @@ -791,8 +815,10 @@ void testReadBinlogFromTimestamp() throws Exception { // Preparations customerDatabase.createAndInitialize(); MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + connectionConfig.getDbzConfiguration(), mySqlConnection); DataType dataType = DataTypes.ROW( DataTypes.FIELD("id", DataTypes.BIGINT()), @@ -846,8 +872,10 @@ void testReadBinlogFromTimestampAfterSchemaChange() throws Exception { // Preparations customerDatabase.createAndInitialize(); MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + connectionConfig.getDbzConfiguration(), mySqlConnection); DataType dataType = DataTypes.ROW( DataTypes.FIELD("id", DataTypes.BIGINT()), @@ -928,8 +956,10 @@ void testHeartbeatEvent() throws Exception { .heartbeatInterval(heartbeatInterval) .debeziumProperties(dbzProps) .createConfig(0); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); // Create binlog reader and submit split BinlogSplitReader binlogReader = createBinlogReader(sourceConfig); @@ -962,8 +992,10 @@ void testReadBinlogFromUnavailableBinlog() throws Exception { inventoryDatabase8.createAndInitialize(); MySqlSourceConfig connectionConfig = getConfig(MYSQL8_CONTAINER, inventoryDatabase8, new String[] {"products"}); - binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + connectionConfig.getDbzConfiguration(), mySqlConnection); // Capture the current binlog offset, and we will start the reader from here BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection); @@ -1021,8 +1053,10 @@ void testRestoreFromCheckpointWithTimestampStartingOffset() throws Exception { inventoryDatabase8.createAndInitialize(); MySqlSourceConfig connectionConfig = getConfig(MYSQL8_CONTAINER, inventoryDatabase8, new String[] {"products"}); - binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + connectionConfig.getDbzConfiguration(), mySqlConnection); // Capture the current binlog offset, and use it to mock restoring from checkpoint BinlogOffset checkpointOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection); @@ -1066,8 +1100,10 @@ public void testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception MySqlSourceConfig sourceConfig = getConfig( MYSQL_CONTAINER_NOGTID, customerDatabaseNoGtid, new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); // step-1: split snapshot List snapshotSplits = @@ -1125,8 +1161,10 @@ void testReadBinlogWithException() throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(StartupOptions.latest(), new String[] {"customers"}); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); // Create reader and submit splits StatefulTaskContext statefulTaskContext = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 15713861879..e6e8524a789 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -70,8 +70,10 @@ public static void init() { customer3_0Database.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {"customers"}, 10); - binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); } @AfterAll @@ -115,9 +117,10 @@ void testReadSingleSnapshotSplit() throws Exception { void testReadSingleSnapshotSplitWithDotName() throws Exception { MySqlSourceConfig sourceConfig = getConfig(customer3_0Database, new String[] {"customers3.0"}, 4); - BinaryLogClient binaryLogClient = - DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); MySqlConnection mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + BinaryLogClient binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); StatefulTaskContext statefulTaskContext = new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection); final DataType dataType = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java new file mode 100644 index 00000000000..a133905bd17 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.network.SSLMode; +import io.debezium.connector.mysql.MySqlConnection; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** IT Tests for {@link MySqlSource}. */ +@Timeout(value = 20, unit = TimeUnit.SECONDS) +class MySqlSourceSSLConnectionITCase extends MySqlSourceTestBase { + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); + + private final List initialData = + Arrays.asList( + "{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14}", + "{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1}", + "{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8}", + "{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75}", + "{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875}", + "{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0}", + "{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3}", + "{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1}", + "{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2}"); + + @Test + void testSetupMysqlSourceWithSSL() throws Exception { + inventoryDatabase.createAndInitialize(); + + // Enable SSL requirement on the MySQL side, all future connections must use SSL + inventoryDatabase.enableSSLForUser(); + + Properties jdbcConfig = new Properties(); + jdbcConfig.setProperty("useSSL", "true"); + jdbcConfig.setProperty("requireSSL", "true"); + jdbcConfig.setProperty("verifyServerCertificate", "false"); + + Properties debeziumConfig = new Properties(); + debeziumConfig.setProperty("database.ssl.mode", "required"); + debeziumConfig.setProperty("database.ssl.trustServerCertificate", "true"); + + Instant startTime = Instant.now().minusSeconds(100); + + MySqlSource mySqlSource = + MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + ".products") + .username(inventoryDatabase.getUsername()) + .password(inventoryDatabase.getPassword()) + .jdbcProperties(jdbcConfig) + .debeziumProperties(debeziumConfig) + .serverId("5401-5404") + .deserializer(new JsonDebeziumDeserializationSchema()) + .serverTimeZone("UTC") + .includeSchemaChanges(true) // output the schema changes as well + .startupOptions(StartupOptions.timestamp(startTime.toEpochMilli())) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // enable checkpoint + env.enableCheckpointing(3000); + // set the source parallelism to 4 + DataStreamSource source = + env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource") + .setParallelism(4); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try (CloseableIterator iterator = source.executeAndCollect()) { + List rows = new ArrayList<>(); + int expectedSize = initialData.size(); + long timeoutSeconds = 30; + + while (rows.size() < expectedSize) { + // Wrap the blocking hasNext() call in a CompletableFuture with timeout + CompletableFuture hasNextFuture = + CompletableFuture.supplyAsync(iterator::hasNext, executor); + + try { + Boolean hasNext = hasNextFuture.get(timeoutSeconds, TimeUnit.SECONDS); + if (hasNext) { + String next = iterator.next(); + rows.add(next); + } else { + // No more data available + break; + } + } catch (java.util.concurrent.TimeoutException e) { + throw new TimeoutException( + ("Timeout while waiting for records, application" + + " is likely unable to process data from MySQL over SSL")); + } catch (ExecutionException e) { + throw new RuntimeException( + "Error while checking for next element", e.getCause()); + } + } + + Assertions.assertThat(rows) + .withFailMessage("should read all initial records") + .hasSize(expectedSize); + } finally { + executor.shutdownNow(); + env.close(); + } + } + + @Test + void testCreateBinaryClientWithSSLSocketFactory() throws Exception { + inventoryDatabase.createAndInitialize(); + + // Enable SSL requirement on the MySQL side + inventoryDatabase.enableSSLForUser(); + + Properties jdbcConfig = new Properties(); + jdbcConfig.setProperty("useSSL", "true"); + jdbcConfig.setProperty("requireSSL", "true"); + jdbcConfig.setProperty("verifyServerCertificate", "false"); + + Properties debeziumConfig = new Properties(); + debeziumConfig.setProperty("database.ssl.mode", "required"); + + MySqlSourceConfig sourceConfig = + new MySqlSourceConfigFactory() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + ".products") + .username(inventoryDatabase.getUsername()) + .password(inventoryDatabase.getPassword()) + .serverId("5501-5504") + .serverTimeZone("UTC") + .jdbcProperties(jdbcConfig) + .debeziumProperties(debeziumConfig) + .startupOptions(StartupOptions.initial()) + .createConfig(0); + + MySqlConnection mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + + BinaryLogClient binaryLogClient = + DebeziumUtils.createBinaryClient( + sourceConfig.getDbzConfiguration(), mySqlConnection); + + Assertions.assertThat(binaryLogClient.getSSLMode()) + .as("SSL mode should be REQUIRED") + .isEqualTo(SSLMode.REQUIRED); + + try { + binaryLogClient.connect(5000); + Assertions.assertThat(binaryLogClient.isConnected()) + .as("Binary log client should be able to connect with SSL") + .isTrue(); + } finally { + // Clean up + if (binaryLogClient.isConnected()) { + binaryLogClient.disconnect(); + } + mySqlConnection.close(); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java index a749bca5329..4075e3efbd7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java @@ -162,6 +162,25 @@ public Connection getJdbcConnection() throws SQLException { return DriverManager.getConnection(container.getJdbcUrl(databaseName), username, password); } + /** + * Enable SSL requirement for the database user. This should be called after + * createAndInitialize() to enforce SSL connections for the user. + */ + public void enableSSLForUser() { + try { + try (Connection connection = + DriverManager.getConnection( + container.getJdbcUrl(), username, password); + Statement statement = connection.createStatement()) { + String alterUserSQL = String.format("ALTER USER '%s'@'%%' REQUIRE SSL;", username); + statement.execute(alterUserSQL); + statement.execute("FLUSH PRIVILEGES;"); + } + } catch (final Exception e) { + throw new IllegalStateException("Failed to enable SSL for user: " + username, e); + } + } + private String convertSQL(final String sql) { return sql.replace("$DBNAME$", databaseName); }