Skip to content

Commit

Permalink
test: fix, refactor, cleanup (#65)
Browse files Browse the repository at this point in the history
* test: fix, refactor, cleanup

* test: fix, refactor, cleanup part2

* upgrade junit minor version

* refactor: test for package protohandler (#73)

Co-authored-by: kevin.bheda <[email protected]>

* refactor: test for package processors.external.pg (#70)

* refactor: test for package processors.external.pg

* refactor: fix checkstyle whitespace error

Co-authored-by: kevin.bheda <[email protected]>

* refactor: test for package processors.external.grpc (#69)

Co-authored-by: kevin.bheda <[email protected]>

* refactor: test for package processors.internal (#71)

Co-authored-by: kevin.bheda <[email protected]>

* refactor: test for package source and core (#75)

* refactor: test for package source and core

* refactor: fix test and remove expected exception convention

Co-authored-by: kevin.bheda <[email protected]>

* refactor: test for package sink (#74)

* refactor: test for package sink

* refactor: move field to test

Co-authored-by: kevin.bheda <[email protected]>

* refactor: test for package processors.longbow (#72)

* refactor: test for package processors.longbow

* refactor: implement todo

Co-authored-by: kevin.bheda <[email protected]>

* refactor: tests for es package (#68)

* refactor: test for package processors.external.es

* refactor: fix checkstyle errors in test package

* refactor: use asserthrows instead of expectedException

Co-authored-by: kevin.bheda <[email protected]>

* refactor: test cleanup for package metrics and processors.common (#67)

* test: fix, refactor, cleanup for package config

* refactor: use asserthrows instead of expectedException

Co-authored-by: kevin.bheda <[email protected]>

* test: fix, refactor, cleanup for package config (#66)

Co-authored-by: kevin.bheda <[email protected]>

* refactor: optimize imports

* refactor: use static imports for metrics package

* remove already implemented todo comment

* refactor: implement todo's for processors.common package

* refactor: remove todo as the object needs to be mocked

* refactor: implement todo for external.grpc package

* refactor: implement todo for external.http package

* refactor: remove implemented todo

* refactor: remove unused rules in package external.pg

* refactor: implement todo for external package

* refactor: implement todo for telemetry package

* refactor: implement todo for transformers package

* refactor: implement todo and remove unwanted todos

* bump up the version

* fix: ignore the failing postgres integration test

Co-authored-by: kevin.bheda <[email protected]>
Co-authored-by: Gaurav Singhania <[email protected]>
  • Loading branch information
3 people authored Oct 1, 2021
1 parent f00e3c7 commit 028b917
Show file tree
Hide file tree
Showing 107 changed files with 2,067 additions and 1,753 deletions.
3 changes: 2 additions & 1 deletion dagger-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ dependencies {
dependenciesJar 'org.apache.commons:commons-pool2:2.4.3'

testImplementation project(':dagger-common').sourceSets.test.output
testImplementation 'junit:junit:4.12'
testImplementation 'junit:junit:4.13'
testImplementation 'org.apache.flink:flink-test-utils_2.11:' + flinkVersion
testImplementation 'org.apache.kafka:kafka-clients:2.5.0'
testImplementation 'com.github.tomakehurst:wiremock-standalone:2.27.0'
Expand All @@ -95,6 +95,7 @@ dependencies {
testImplementation 'org.powermock:powermock-api-mockito2:2.0.0-beta.5'
testImplementation 'com.google.guava:guava:27.0.1-jre'
testImplementation 'org.grpcmock:grpcmock-junit5:0.5.0'
testImplementation 'com.github.stefanbirkner:system-rules:1.19.0'
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.odpf.dagger.core.processors.external.es;

import io.odpf.dagger.core.processors.common.OutputMapping;

import java.util.Map;

public class EsSourceConfigBuilder {
private String host;
private String port;
private String user;
private String password;
private String endpointPattern;
private String endpointVariables;
private String type;
private String capacity;
private String connectTimeout;
private String retryTimeout;
private String socketTimeout;
private String streamTimeout;
private boolean failOnErrors;
private Map<String, OutputMapping> outputMapping;
private String metricId;
private boolean retainResponseType;

public EsSourceConfigBuilder setHost(String host) {
this.host = host;
return this;
}

public EsSourceConfigBuilder setPort(String port) {
this.port = port;
return this;
}

public EsSourceConfigBuilder setUser(String user) {
this.user = user;
return this;
}

public EsSourceConfigBuilder setPassword(String password) {
this.password = password;
return this;
}

public EsSourceConfigBuilder setEndpointPattern(String endpointPattern) {
this.endpointPattern = endpointPattern;
return this;
}

public EsSourceConfigBuilder setEndpointVariables(String endpointVariables) {
this.endpointVariables = endpointVariables;
return this;
}

public EsSourceConfigBuilder setType(String type) {
this.type = type;
return this;
}

public EsSourceConfigBuilder setCapacity(String capacity) {
this.capacity = capacity;
return this;
}

public EsSourceConfigBuilder setConnectTimeout(String connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

public EsSourceConfigBuilder setRetryTimeout(String retryTimeout) {
this.retryTimeout = retryTimeout;
return this;
}

public EsSourceConfigBuilder setSocketTimeout(String socketTimeout) {
this.socketTimeout = socketTimeout;
return this;
}

public EsSourceConfigBuilder setStreamTimeout(String streamTimeout) {
this.streamTimeout = streamTimeout;
return this;
}

public EsSourceConfigBuilder setFailOnErrors(boolean failOnErrors) {
this.failOnErrors = failOnErrors;
return this;
}

public EsSourceConfigBuilder setOutputMapping(Map<String, OutputMapping> outputMapping) {
this.outputMapping = outputMapping;
return this;
}

public EsSourceConfigBuilder setMetricId(String metricId) {
this.metricId = metricId;
return this;
}

public EsSourceConfigBuilder setRetainResponseType(boolean retainResponseType) {
this.retainResponseType = retainResponseType;
return this;
}

public EsSourceConfig createEsSourceConfig() {
return new EsSourceConfig(host, port, user, password, endpointPattern, endpointVariables, type, capacity, connectTimeout, retryTimeout, socketTimeout, streamTimeout, failOnErrors, outputMapping, metricId, retainResponseType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.odpf.dagger.core.processors.external.grpc;

import io.odpf.dagger.core.processors.common.OutputMapping;

import java.util.Map;

public class GrpcSourceConfigBuilder {
private String endpoint;
private int servicePort;
private String grpcRequestProtoSchema;
private String grpcResponseProtoSchema;
private String grpcMethodUrl;
private String requestPattern;
private String requestVariables;
private Map<String, OutputMapping> outputMapping;
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
private String grpcStencilUrl;
private String type;
private boolean retainResponseType;
private Map<String, String> headers;
private String metricId;
private int capacity;

public GrpcSourceConfigBuilder setEndpoint(String endpoint) {
this.endpoint = endpoint;
return this;
}

public GrpcSourceConfigBuilder setServicePort(int servicePort) {
this.servicePort = servicePort;
return this;
}

public GrpcSourceConfigBuilder setGrpcRequestProtoSchema(String grpcRequestProtoSchema) {
this.grpcRequestProtoSchema = grpcRequestProtoSchema;
return this;
}

public GrpcSourceConfigBuilder setGrpcResponseProtoSchema(String grpcResponseProtoSchema) {
this.grpcResponseProtoSchema = grpcResponseProtoSchema;
return this;
}

public GrpcSourceConfigBuilder setGrpcMethodUrl(String grpcMethodUrl) {
this.grpcMethodUrl = grpcMethodUrl;
return this;
}

public GrpcSourceConfigBuilder setRequestPattern(String requestPattern) {
this.requestPattern = requestPattern;
return this;
}

public GrpcSourceConfigBuilder setRequestVariables(String requestVariables) {
this.requestVariables = requestVariables;
return this;
}

public GrpcSourceConfigBuilder setOutputMapping(Map<String, OutputMapping> outputMapping) {
this.outputMapping = outputMapping;
return this;
}

public GrpcSourceConfigBuilder setStreamTimeout(String streamTimeout) {
this.streamTimeout = streamTimeout;
return this;
}

public GrpcSourceConfigBuilder setConnectTimeout(String connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

public GrpcSourceConfigBuilder setFailOnErrors(boolean failOnErrors) {
this.failOnErrors = failOnErrors;
return this;
}

public GrpcSourceConfigBuilder setGrpcStencilUrl(String grpcStencilUrl) {
this.grpcStencilUrl = grpcStencilUrl;
return this;
}

public GrpcSourceConfigBuilder setType(String type) {
this.type = type;
return this;
}

public GrpcSourceConfigBuilder setRetainResponseType(boolean retainResponseType) {
this.retainResponseType = retainResponseType;
return this;
}

public GrpcSourceConfigBuilder setHeaders(Map<String, String> headers) {
this.headers = headers;
return this;
}

public GrpcSourceConfigBuilder setMetricId(String metricId) {
this.metricId = metricId;
return this;
}

public GrpcSourceConfigBuilder setCapacity(int capacity) {
this.capacity = capacity;
return this;
}

public GrpcSourceConfig createGrpcSourceConfig() {
return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, requestPattern, requestVariables,
streamTimeout, connectTimeout, failOnErrors, grpcStencilUrl, type, retainResponseType, headers, outputMapping, metricId, capacity);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package io.odpf.dagger.core.processors.external.pg;

import java.util.Map;

public class PgSourceConfigBuilder {
private String host;
private String port;
private String user;
private String password;
private String database;
private String type;
private String capacity;
private String streamTimeout;
private Map<String, String> outputMapping;
private String connectTimeout;
private String idleTimeout;
private String queryVariables;
private String queryPattern;
private boolean failOnErrors;
private String metricId;
private boolean retainResponseType;

public PgSourceConfigBuilder setHost(String host) {
this.host = host;
return this;
}

public PgSourceConfigBuilder setPort(String port) {
this.port = port;
return this;
}

public PgSourceConfigBuilder setUser(String user) {
this.user = user;
return this;
}

public PgSourceConfigBuilder setPassword(String password) {
this.password = password;
return this;
}

public PgSourceConfigBuilder setDatabase(String database) {
this.database = database;
return this;
}

public PgSourceConfigBuilder setType(String type) {
this.type = type;
return this;
}

public PgSourceConfigBuilder setCapacity(String capacity) {
this.capacity = capacity;
return this;
}

public PgSourceConfigBuilder setStreamTimeout(String streamTimeout) {
this.streamTimeout = streamTimeout;
return this;
}

public PgSourceConfigBuilder setOutputMapping(Map<String, String> outputMapping) {
this.outputMapping = outputMapping;
return this;
}

public PgSourceConfigBuilder setConnectTimeout(String connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

public PgSourceConfigBuilder setIdleTimeout(String idleTimeout) {
this.idleTimeout = idleTimeout;
return this;
}

public PgSourceConfigBuilder setQueryVariables(String queryVariables) {
this.queryVariables = queryVariables;
return this;
}

public PgSourceConfigBuilder setQueryPattern(String queryPattern) {
this.queryPattern = queryPattern;
return this;
}

public PgSourceConfigBuilder setFailOnErrors(boolean failOnErrors) {
this.failOnErrors = failOnErrors;
return this;
}

public PgSourceConfigBuilder setMetricId(String metricId) {
this.metricId = metricId;
return this;
}

public PgSourceConfigBuilder setRetainResponseType(boolean retainResponseType) {
this.retainResponseType = retainResponseType;
return this;
}

public PgSourceConfig createPgSourceConfig() {
return new PgSourceConfig(host, port, user, password, database, type, capacity, streamTimeout, outputMapping, connectTimeout, idleTimeout, queryVariables, queryPattern, failOnErrors, metricId, retainResponseType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class LongbowReadColumnModifier implements ColumnModifier {
public String[] modifyColumnNames(String[] inputColumnNames) {
ArrayList<String> inputColumnList = new ArrayList<>(Arrays.asList(inputColumnNames));
inputColumnList.add(inputColumnList.size(), Constants.LONGBOW_PROTO_DATA_KEY);

return inputColumnList.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.core.source.FlinkKafkaConsumerCustom;
import org.apache.flink.configuration.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static io.odpf.dagger.common.core.Constants.*;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -171,6 +174,6 @@ public void shouldReturnProtoClassName() {
configuration.setString("STREAMS", configString);
Streams streams = new Streams(configuration, "rowtime", stencilClientOrchestrator, false, 0);

Assert.assertEquals(protoClassForTable, streams.getProtos());
assertEquals(protoClassForTable, streams.getProtos());
}
}
Loading

0 comments on commit 028b917

Please sign in to comment.