Skip to content

Commit

Permalink
feat: change naming of properties
Browse files Browse the repository at this point in the history
  • Loading branch information
Emrehzl94 committed Nov 27, 2024
1 parent dc98f1d commit 3ff3d4b
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ public class BigQuerySpec {
private final String readDescription;
private final String castDescription;
private final String sql;
private final String tempProjectId;
private final String tempDatasetId;
private final String tempDataBeamProject;
private final String tempDataBeamDataset;

public BigQuerySpec(
String readDescription,
String castDescription,
String sql,
String tempProjectId,
String tempDatasetId) {
String tempDataBeamProject,
String tempDataBeamDataset) {
this.readDescription = readDescription;
this.castDescription = castDescription;
this.sql = sql;
this.tempProjectId = tempProjectId;
this.tempDatasetId = tempDatasetId;
this.tempDataBeamProject = tempDataBeamProject;
this.tempDataBeamDataset = tempDataBeamDataset;
}

public String getReadDescription() {
Expand All @@ -52,21 +52,21 @@ public String getSql() {
return sql;
}

public String getTempProjectId() {
return tempProjectId;
public String getTempDataBeamProject() {
return tempDataBeamProject;
}

public String getTempDatasetId() {
return tempDatasetId;
public String getTempDataBeamDataset() {
return tempDataBeamDataset;
}

public static class BigQuerySpecBuilder {

private String readDescription;
private String castDescription;
private String sql;
private String tempProjectId;
private String tempDatasetId;
private String tempDataBeamProject;
private String tempDataBeamDataset;

public BigQuerySpecBuilder readDescription(String readDescription) {
this.readDescription = readDescription;
Expand All @@ -83,18 +83,19 @@ public BigQuerySpecBuilder sql(String sql) {
return this;
}

public BigQuerySpecBuilder tempProjectId(String projectId) {
this.tempProjectId = projectId;
public BigQuerySpecBuilder tempDataBeamProject(String tempDataBeamProject) {
this.tempDataBeamProject = tempDataBeamProject;
return this;
}

public BigQuerySpecBuilder tempDatasetId(String datasetId) {
this.tempDatasetId = datasetId;
public BigQuerySpecBuilder tempDataBeamDataset(String tempDataBeamDataset) {
this.tempDataBeamDataset = tempDataBeamDataset;
return this;
}

public BigQuerySpec build() {
return new BigQuerySpec(readDescription, castDescription, sql, tempProjectId, tempDatasetId);
return new BigQuerySpec(
readDescription, castDescription, sql, tempDataBeamProject, tempDataBeamDataset);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@ public class BigQuerySource implements Source {

private final String name;
private final String query;
private final String tempProjectId;
private final String tempDatasetId;
private final String tempDataBeamProject;
private final String tempDataBeamDataset;

public BigQuerySource(String name, String query) {
this(name, query, null, null);
}

public BigQuerySource(String name, String query, String tempProjectId, String tempDatasetId) {
public BigQuerySource(
String name, String query, String tempDataBeamProject, String tempDataBeamDataset) {
this.name = name;
this.query = query;
this.tempProjectId = tempProjectId;
this.tempDatasetId = tempDatasetId;
this.tempDataBeamProject = tempDataBeamProject;
this.tempDataBeamDataset = tempDataBeamDataset;
}

@Override
Expand All @@ -50,12 +51,12 @@ public String getQuery() {
return query;
}

public String getTempProjectId() {
return tempProjectId;
public String getTempDataBeamProject() {
return tempDataBeamProject;
}

public String getTempDatasetId() {
return tempDatasetId;
public String getTempDataBeamDataset() {
return tempDataBeamDataset;
}

@Override
Expand Down Expand Up @@ -84,11 +85,11 @@ public String toString() {
+ ", query='"
+ query
+ '\''
+ ", tempProjectId='"
+ tempProjectId
+ ", tempDataBeamProject='"
+ tempDataBeamProject
+ '\''
+ ", tempDatasetId='"
+ tempDatasetId
+ ", tempDataBeamDataset='"
+ tempDataBeamDataset
+ '\''
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ public BigQuerySource provide(ObjectNode node) {
return new BigQuerySource(
node.get("name").textValue(),
node.get("query").textValue(),
Optional.ofNullable(node.get("tempProjectId")).map(JsonNode::textValue).orElse(null),
Optional.ofNullable(node.get("tempDatasetId")).map(JsonNode::textValue).orElse(null));
Optional.ofNullable(node.get("temp_data_beam_project"))
.map(JsonNode::textValue)
.orElse(null),
Optional.ofNullable(node.get("temp_data_beam_dataset"))
.map(JsonNode::textValue)
.orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ public void visitSource(int index, Source source) {
return;
}

var tempProjectId = ((BigQuerySource) source).getTempProjectId();
var tempDatasetId = ((BigQuerySource) source).getTempDatasetId();
var tempDataBeamProject = ((BigQuerySource) source).getTempDataBeamProject();
var tempDataBeamDataset = ((BigQuerySource) source).getTempDataBeamDataset();

if (tempProjectId != null && tempDatasetId == null) {
if (tempDataBeamProject != null && tempDataBeamDataset == null) {
paths.add(String.format("$.sources[%d]", index));
}
}
Expand All @@ -48,7 +48,9 @@ public boolean report(SpecificationValidationResult.Builder builder) {
builder.addError(
path,
ERROR_CODE,
String.format("%s tempProjectId is provided, but tempDatasetId is missing", path)));
String.format(
"%s temp_data_beam_project is provided, but temp_data_beam_dataset is missing",
path)));
return paths.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public BigQuerySpec getMetadataQueryBeamSpec() {
.readDescription("Read from BQ " + source.getName())
.castDescription("Cast to BeamRow " + source.getName())
.sql(zeroRowSql)
.tempProjectId(source.getTempProjectId())
.tempDatasetId(source.getTempDatasetId())
.tempDataBeamProject(source.getTempDataBeamProject())
.tempDataBeamDataset(source.getTempDataBeamDataset())
.build();
}

Expand All @@ -104,8 +104,8 @@ private BigQuerySpec getSourceQueryBeamSpec() {
.castDescription("Cast to BeamRow " + source.getName())
.readDescription("Read from BQ " + source.getName())
.sql(source.getQuery())
.tempProjectId(source.getTempProjectId())
.tempDatasetId(source.getTempDatasetId())
.tempDataBeamProject(source.getTempDataBeamProject())
.tempDataBeamDataset(source.getTempDataBeamDataset())
.build();
}

Expand All @@ -128,8 +128,8 @@ private BigQuerySpec getTargetQueryBeamSpec(TargetQuerySpec spec) {
.castDescription(
targetSequence.getSequenceNumber(target) + ": Cast to BeamRow " + target.getName())
.sql(sql)
.tempProjectId(source.getTempProjectId())
.tempDatasetId(source.getTempDatasetId())
.tempDataBeamProject(source.getTempDataBeamProject())
.tempDataBeamDataset(source.getTempDataBeamDataset())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ public PCollection<Row> expand(PBegin input) {
.usingStandardSql()
.withTemplateCompatibility();

var tempProjectId = this.bqQuerySpec.getTempProjectId();
var tempDatasetId = this.bqQuerySpec.getTempDatasetId();
var tempDataBeamProject = this.bqQuerySpec.getTempDataBeamProject();
var tempDataBeamDataset = this.bqQuerySpec.getTempDataBeamDataset();

if (tempProjectId != null && tempDatasetId != null) {
read.withQueryTempProjectAndDataset(tempProjectId, tempDatasetId);
} else if (tempDatasetId != null) {
read.withQueryTempDataset(tempDatasetId);
if (tempDataBeamProject != null && tempDataBeamDataset != null) {
read = read.withQueryTempProjectAndDataset(tempDataBeamProject, tempDataBeamDataset);
} else if (tempDataBeamDataset != null) {
read = read.withQueryTempDataset(tempDataBeamDataset);
}

PCollection<TableRow> sourceRows = input.apply(bqQuerySpec.getReadDescription(), read);

Schema beamSchema = sourceRows.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void fails_if_bigquery_source_only_has_temp_project_id_but_not_temp_datas
+ " \"type\": \"bigquery\",\n"
+ " \"name\": \"a-source\",\n"
+ " \"query\": \"SELECT field_1 FROM project.dataset.table\",\n"
+ " \"tempProjectId\": \"temp-project-id\"\n"
+ " \"temp_data_beam_project\": \"project\"\n"
+ " }],\n"
+ " \"targets\": {\n"
+ " \"nodes\": [{\n"
Expand Down Expand Up @@ -61,6 +61,7 @@ public void fails_if_bigquery_source_only_has_temp_project_id_but_not_temp_datas
assertThat(exception).hasMessageThat().contains("0 warning(s)");
assertThat(exception)
.hasMessageThat()
.contains("$.sources[0] tempProjectId is provided, but tempDatasetId is missing");
.contains(
"$.sources[0] temp_data_beam_project is provided, but temp_data_beam_dataset is missing");
}
}

0 comments on commit 3ff3d4b

Please sign in to comment.