Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for maxcompute as source type (batch) #130

Merged
merged 28 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
84cc988
add support for BATCH_MAXCOMPUTE
vinoth-gojek Jan 14, 2025
bc0658b
modify caraml-store-sdk to enable maxcompute as data source type
vinoth-gojek Jan 16, 2025
054cf37
Add support for maxcompute - read maxcompute tables in spark jobs
vinoth-gojek Jan 21, 2025
fa0fb7a
read data from maxcompute using read.format - similar to bq
vinoth-gojek Jan 22, 2025
a4e018d
add maxcompute related jars
vinoth-gojek Jan 28, 2025
c648478
add custom dialect jar
vinoth-gojek Jan 28, 2025
58f3659
add odps jdbc related class paths
vinoth-gojek Jan 30, 2025
5b1007e
add odps jdbc jar with dependencies
vinoth-gojek Jan 31, 2025
a74c65c
format files
vinoth-gojek Jan 31, 2025
78b4dc1
fix format
vinoth-gojek Jan 31, 2025
6813a58
fix format
vinoth-gojek Jan 31, 2025
085195e
switch to use implementation to add dependency
vinoth-gojek Feb 4, 2025
fc758c3
try fix the test by adding antlr as dependency
vinoth-gojek Feb 5, 2025
5e4ed17
trial and error - change antlr version to 4.8
vinoth-gojek Feb 5, 2025
19a9dcd
fix test failures
vinoth-gojek Feb 6, 2025
a9853b8
remove comments
vinoth-gojek Feb 6, 2025
52f3ea7
changes to fetch records
vinoth-gojek Feb 7, 2025
1a3c7c9
add interactive mode as true and enable limit as false
vinoth-gojek Feb 7, 2025
9f47e8c
print number of rows
vinoth-gojek Feb 7, 2025
3143b99
refactor|
vinoth-gojek Feb 10, 2025
ce502e1
apply lint and format
vinoth-gojek Feb 10, 2025
4f61ad0
changes to trigger batch job from registry
vinoth-gojek Feb 3, 2025
62b6579
fix lint
vinoth-gojek Feb 3, 2025
2bf0605
fix lint
vinoth-gojek Feb 3, 2025
83fd1f9
add secrets to access maxcompute
vinoth-gojek Feb 4, 2025
1729f9a
fix failing tests
vinoth-gojek Feb 6, 2025
031f46d
use the pre built custom dialect jar
vinoth-gojek Feb 10, 2025
b0d52bc
remove artifacts
vinoth-gojek Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ message DataSource {
BATCH_FILE = 1;
BATCH_BIGQUERY = 2;
STREAM_KAFKA = 3;
BATCH_MAXCOMPUTE = 4;
}
SourceType type = 1;

Expand Down Expand Up @@ -56,6 +57,11 @@ message DataSource {
SparkOverride spark_override = 2;
}

// Defines options for DataSource that sources features from a MaxCompute Query
message MaxComputeOptions {
string table_ref = 1;
}

// Defines options for DataSource that sources features from Kafka messages.
// Each message should be a Protobuf that can be decoded with the generated
// Java Protobuf class at the given class path
Expand All @@ -78,5 +84,6 @@ message DataSource {
FileOptions file_options = 11;
BigQueryOptions bigquery_options = 12;
KafkaOptions kafka_options = 13;
MaxComputeOptions maxcompute_options = 14;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.BigQueryOptions;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.FileOptions;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.KafkaOptions;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.MaxComputeOptions;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.SourceType;
import dev.caraml.store.protobuf.core.SparkOverrideProto.SparkOverride;
import java.util.HashMap;
Expand Down Expand Up @@ -86,6 +87,9 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) {
populateDatasourceConfigMapWithSparkOverride(
dataSourceConfigMap, spec.getBigqueryOptions().getSparkOverride());
}
case BATCH_MAXCOMPUTE -> {
dataSourceConfigMap.put("table_ref", spec.getMaxcomputeOptions().getTableRef());
}
case STREAM_KAFKA -> {
dataSourceConfigMap.put("bootstrap_servers", spec.getKafkaOptions().getBootstrapServers());
dataSourceConfigMap.put(
Expand Down Expand Up @@ -167,6 +171,11 @@ public DataSourceProto.DataSource toProto() {
parseDatasourceConfigMapToSparkOverride(dataSourceConfigMap));
spec.setBigqueryOptions(bigQueryOptions.build());
}
case BATCH_MAXCOMPUTE -> {
MaxComputeOptions.Builder maxComputeOptions = MaxComputeOptions.newBuilder();
maxComputeOptions.setTableRef(dataSourceConfigMap.get("table_ref"));
spec.setMaxcomputeOptions(maxComputeOptions.build());
}
case STREAM_KAFKA -> {
KafkaOptions.Builder kafkaOptions = KafkaOptions.newBuilder();
kafkaOptions.setBootstrapServers(dataSourceConfigMap.get("bootstrap_servers"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static void validate(DataSource spec) {
}
case BATCH_BIGQUERY -> Matchers.checkValidBigQueryTableRef(
spec.getBigqueryOptions().getTableRef(), "FeatureTable");
case BATCH_MAXCOMPUTE -> {} // validation for table_ref to be added
deadlycoconuts marked this conversation as resolved.
Show resolved Hide resolved
case STREAM_KAFKA -> {
StreamFormat.FormatCase messageFormat =
spec.getKafkaOptions().getMessageFormat().getFormatCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ private Job createOrUpdateIngestionJob(
case BATCH_FILE -> dataSource.getFileOptions().getSparkOverride();
case BATCH_BIGQUERY -> dataSource.getBigqueryOptions().getSparkOverride();
case STREAM_KAFKA -> dataSource.getKafkaOptions().getSparkOverride();
case BATCH_MAXCOMPUTE -> dataSource.getBigqueryOptions().getSparkOverride();
default -> throw new IllegalArgumentException(
String.format("%s is not a valid data source", dataSource.getType()));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ static class BigQuerySource extends DataSource {
@Nullable String datePartitionColumn;
}

@RequiredArgsConstructor
@Getter
@Setter
static class MaxComputeSource extends DataSource {
private final String project;
private final String dataset;
private final String table;
private final String eventTimestampColumn;
private final Map<String, String> fieldMapping;
@Nullable String createdTimestampColumn;
@Nullable String datePartitionColumn;
}

@Getter
@Setter
@AllArgsConstructor
Expand Down Expand Up @@ -117,6 +130,37 @@ public Map<String, DataSource> sourceToArgument(DataSourceProto.DataSource sourc
yield Map.of("bq", bqSource);
}

case MAXCOMPUTE_OPTIONS -> {
deadlycoconuts marked this conversation as resolved.
Show resolved Hide resolved
DataSourceProto.DataSource.MaxComputeOptions options =
sourceProtobuf.getMaxcomputeOptions();
Pattern pattern = Pattern.compile("(?<project>[^:]+):(?<dataset>[^.]+).(?<table>.+)");
Matcher matcher = pattern.matcher(options.getTableRef());
matcher.find();
if (!matcher.matches()) {
throw new IllegalArgumentException(
String.format(
"Table ref '%s' is not in the form of <project>:<dataset>.<table>",
options.getTableRef()));
}
String project = matcher.group("project");
String dataset = matcher.group("dataset");
String table = matcher.group("table");
MaxComputeSource maxComputeSource =
new MaxComputeSource(
project,
dataset,
table,
sourceProtobuf.getEventTimestampColumn(),
sourceProtobuf.getFieldMappingMap());
if (!sourceProtobuf.getDatePartitionColumn().isEmpty()) {
maxComputeSource.setDatePartitionColumn(sourceProtobuf.getDatePartitionColumn());
}
if (!sourceProtobuf.getCreatedTimestampColumn().isEmpty()) {
maxComputeSource.setCreatedTimestampColumn(sourceProtobuf.getCreatedTimestampColumn());
}
yield Map.of("maxCompute", maxComputeSource);
}

case KAFKA_OPTIONS -> {
DataSourceProto.DataSource.KafkaOptions options = sourceProtobuf.getKafkaOptions();
StreamFormat messageFormat = options.getMessageFormat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ public class SparkDriverSpec {
private Map<String, String> annotations;
private Map<String, String> serviceAnnotations;
private List<SecretInfo> secrets;
private Map<String, Map<String, String>> envSecretKeyRefs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public class SparkExecutorSpec {
private Map<String, String> labels;
private Map<String, String> annotations;
private List<SecretInfo> secrets;
private Map<String, Map<String, String>> envSecretKeyRefs;
}
2 changes: 1 addition & 1 deletion caraml-store-sdk/go/protos/feast/core/CoreService.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions caraml-store-sdk/go/protos/feast/core/CoreService_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion caraml-store-sdk/go/protos/feast/core/DataFormat.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading