Skip to content

Commit

Permalink
[Feature-#1904][sqlserver] fixed No suitable driver found for jdbc jtds
Browse files Browse the repository at this point in the history
  • Loading branch information
libailin authored and lihongwei committed Jul 1, 2024
1 parent 2dcd8db commit 1bc5439
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import com.dtstack.chunjun.connector.sqlserver.sink.SqlserverOutputFormat;
import com.dtstack.chunjun.connector.sqlserver.source.SqlserverInputFormat;

import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -38,13 +40,20 @@ public class SqlserverDynamicTableFactory extends JdbcDynamicTableFactory {

private static final String IDENTIFIER = "sqlserver-x";

private JdbcConfig jdbcConfig;

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
protected JdbcDialect getDialect() {
if (jdbcConfig != null) {
return new SqlserverDialect(
jdbcConfig.isWithNoLock(),
jdbcConfig.getJdbcUrl().startsWith("jdbc:jtds:sqlserver"));
}
return new SqlserverDialect();
}

Expand All @@ -60,11 +69,24 @@ protected JdbcOutputFormatBuilder getOutputFormatBuilder() {

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
this.jdbcConfig = getSourceConnectionConfig(helper.getOptions());
Map<String, String> prop = context.getCatalogTable().getOptions();
prop.put("druid.validation-query", "SELECT 1");
return super.createDynamicTableSource(context);
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
this.jdbcConfig =
getSinkConnectionConfig(
helper.getOptions(), context.getCatalogTable().getResolvedSchema());
return super.createDynamicTableSink(context);
}

/** table字段有可能是[schema].[table]格式 需要转换为对应的schema 和 table 字段* */
@Override
protected void resetTableInfo(JdbcConfig jdbcConfig) {
Expand Down

0 comments on commit 1bc5439

Please sign in to comment.