Skip to content

Commit

Permalink
#102 | Refactor. Introduce a new BigQueryClient and domain object for…
Browse files Browse the repository at this point in the history
… FlowResult. Ability to use default BigQuery iterator.
  • Loading branch information
vinayvenu committed Nov 3, 2023
1 parent d51f8e1 commit 23977c4
Show file tree
Hide file tree
Showing 16 changed files with 418 additions and 2 deletions.
7 changes: 5 additions & 2 deletions glific/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,18 @@ repositories {
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'

// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-gcp-starter
implementation 'org.springframework.cloud:spring-cloud-gcp-starter:1.2.6.RELEASE'
// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-gcp-starter-bigquery
implementation 'org.springframework.cloud:spring-cloud-gcp-starter-bigquery:1.2.6.RELEASE'

implementation "log4j:log4j:1.2.17"
implementation project(':util')

testImplementation 'com.fasterxml.jackson.core:jackson-databind:2.0.1'
testImplementation 'org.mockito:mockito-core:3.+'
}

dependencyManagement {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.avni_integration_service.glific.bigQuery;

import com.google.cloud.bigquery.*;
import org.apache.log4j.Logger;
import org.avni_integration_service.glific.bigQuery.config.BigQueryConnector;
import org.avni_integration_service.glific.bigQuery.mapper.BigQueryResultMapper;
import org.avni_integration_service.glific.bigQuery.mapper.BigQueryResultsMapper;
import org.springframework.stereotype.Component;

import java.util.Iterator;
import java.util.UUID;

@Component
public class BigQueryClientNew {
private final BigQueryConnector bigQueryConnector;
private static final Logger logger = Logger.getLogger(BigQueryClientNew.class);

public BigQueryClientNew(BigQueryConnector bigQueryConnector) {
this.bigQueryConnector = bigQueryConnector;
}

public <T> Iterator<T> getResults(String query, String date, int limit, BigQueryResultMapper<T> resultMapper) {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.addNamedParameter("updated_at", QueryParameterValue.string(date))
.addNamedParameter("limit_count", QueryParameterValue.int64(limit))
.build();
TableResult tableResult = run(queryConfig);
return new BigQueryResultsMapper<T>().map(tableResult, resultMapper);
}

private TableResult run(QueryJobConfiguration queryJobConfiguration) {
try {
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigQueryConnector.getBigQuery().create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());
queryJob = queryJob.waitFor();

if (queryJob == null) {
logger.info("query job is null");
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
logger.info(queryJob.getStatus().getError().toString());
throw new RuntimeException(queryJob.getStatus().getError().toString());
}

return queryJob.getQueryResults();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.avni_integration_service.glific.bigQuery.domain;

import java.util.HashMap;
import java.util.Map;

import static org.avni_integration_service.util.ObjectJsonMapper.readValue;

public class FlowResult {
private final Map<String, Object> fields;
private final Map<String, Object> results;

public FlowResult(Map<String, Object> fields) {
this.fields = fields;
results = readValue((String)this.fields.get("results"), Map.class);
}

public String getContactPhone() {
return (String) fields.get("contact_phone");
}

public String getInsertedAt() {
return (String) fields.get("inserted_at");
}

public String getUpdatedAt() {
return (String) fields.get("updated_at");
}

public String getInput(String key) {
return (String) getResult(key).get("input");
}

public String getCategory(String key) {
return (String) getResult(key).get("category");
}

private Map<String, Object> getResult(String key) {
return (Map<String, Object>) results.getOrDefault(key, new HashMap<>());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.avni_integration_service.glific.bigQuery.mapper;

import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Schema;

public interface BigQueryResultMapper<T> {
T map(Schema schema, FieldValueList fieldValues);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.avni_integration_service.glific.bigQuery.mapper;

import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableResult;

import java.util.Iterator;

public class BigQueryResultsMapper<T> {

public Iterator<T> map(TableResult response, BigQueryResultMapper<T> resultMapper) {
Schema schema = response.getSchema();
Iterator<FieldValueList> iterator = response.iterateAll().iterator();

return new Iterator<T>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public T next() {
return resultMapper.map(schema, iterator.next());
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.avni_integration_service.glific.bigQuery.mapper;

import com.google.cloud.bigquery.*;
import org.avni_integration_service.glific.bigQuery.domain.FlowResult;

import java.util.*;

public class FlowResultMapper implements BigQueryResultMapper<FlowResult> {

@Override
public FlowResult map(Schema schema, FieldValueList fieldValues) {
HashMap<String, Object> fields = schema.getFields().stream()
.reduce(new HashMap<>(),
(hashMap, field) -> {
hashMap.merge(field.getName(), fieldValues.get(field.getName()).getValue(), Objects::equals);
return hashMap;
}, (first, second) -> {first.putAll(second);return first;});
return new FlowResult(fields);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.avni_integration_service.glific.bigQuery;

import com.google.cloud.bigquery.*;
import org.avni_integration_service.glific.bigQuery.builder.TableResultBuilder;
import org.avni_integration_service.glific.bigQuery.config.BigQueryConnector;
import org.avni_integration_service.glific.bigQuery.domain.FlowResult;
import org.avni_integration_service.glific.bigQuery.mapper.FlowResultMapper;
import org.junit.jupiter.api.Test;

import java.util.Iterator;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class BigQueryClientNewTest {

@Test
public void shouldMapResponseToMapper() throws InterruptedException {
FlowResultMapper flowResultMapper = new FlowResultMapper();
TableResult tableResult = new TableResultBuilder().build();
BigQueryConnector bqConnector = mock(BigQueryConnector.class);
BigQuery bigQuery = mock(BigQuery.class);
when(bqConnector.getBigQuery()).thenReturn(bigQuery);
Job job = mock(Job.class);
when(bigQuery.create(any(JobInfo.class))).thenReturn(job);
when(job.getQueryResults()).thenReturn(tableResult);
when(job.waitFor()).thenReturn(job);
JobStatus jobStatus = mock(JobStatus.class);
when(job.getStatus()).thenReturn(jobStatus);
when(jobStatus.getError()).thenReturn(null);

Iterator<FlowResult> results = new BigQueryClientNew(bqConnector).getResults("query", "2023-01-01", 5, flowResultMapper);

FlowResult firstFlowResult = results.next();
assertEquals("919317217785", firstFlowResult.getContactPhone());
assertEquals("919317217785", firstFlowResult.getContactPhone());
assertEquals("Suresh", firstFlowResult.getInput("avni_first_name"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.avni_integration_service.glific.bigQuery.builder;

import com.google.cloud.bigquery.*;

import java.util.Arrays;

public class FieldValueListBuilder {
public FieldValueList buildFlowResult(String contactPhone, String flowResultId, String results) {
return FieldValueList.of(Arrays.asList(
FieldValue.of(FieldValue.Attribute.PRIMITIVE, contactPhone),
FieldValue.of(FieldValue.Attribute.PRIMITIVE, flowResultId),
FieldValue.of(FieldValue.Attribute.PRIMITIVE, "2023-08-02T14:21:25.695844"),
FieldValue.of(FieldValue.Attribute.PRIMITIVE, results),
FieldValue.of(FieldValue.Attribute.PRIMITIVE, "2023-08-02T14:21:25.695844")),
new SchemaBuilder().flowResultSchema().getFields());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.avni_integration_service.glific.bigQuery.builder;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;

public class SchemaBuilder {

public Schema flowResultSchema() {
return Schema.of(Field.of("contact_phone", LegacySQLTypeName.STRING),
Field.of("flowresult_id", LegacySQLTypeName.STRING),
Field.of("inserted_at", LegacySQLTypeName.DATETIME),
Field.of("results", LegacySQLTypeName.STRING),
Field.of("updated_at", LegacySQLTypeName.DATETIME));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.avni_integration_service.glific.bigQuery.builder;

import com.google.api.gax.paging.Page;
import com.google.cloud.PageImpl;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableList;

public class TableResultBuilder {
public TableResult build() {
FieldValueListBuilder fieldValueListBuilder = new FieldValueListBuilder();
ImmutableList<FieldValueList> queryResults = ImmutableList.of(
fieldValueListBuilder.buildFlowResult(
"919317217785",
"11041268",
"{\"avni_first_name\":{\"category\":\"^[A-Za-z ]+$\",\"input\":\"Suresh\",\"inserted_at\":\"2023-09-07T06:09:52.961071Z\",\"intent\":null},\"avni_last_name\":{\"category\":\"^[A-Za-z ]+$\",\"input\":\"Yadav\",\"inserted_at\":\"2023-09-07T06:09:59.008525Z\",\"intent\":null},\"avni_optin\":{\"category\":\"Yes\",\"input\":\"Yes\",\"inserted_at\":\"2023-09-07T06:09:44.764400Z\",\"intent\":null,\"interactive\":{\"id\":\"\",\"postbackText\":\"\",\"reply\":\"Yes 1\",\"title\":\"Yes\"}},\"child\":{\"Language\":{\"category\":\"English\",\"input\":\"English\",\"inserted_at\":\"2023-09-07T06:09:37.461253Z\",\"intent\":null,\"interactive\":{\"description\":\"\",\"id\":\"\",\"postbackText\":\"\",\"reply\":\"English 1\",\"title\":\"English\"}}},\"flow_keyword\":{\"category\":\"registerme\",\"input\":\"registerme\",\"inserted_at\":\"2023-09-07T06:09:28.125929Z\"}}"),
fieldValueListBuilder.buildFlowResult(
"919873249733",
"11041211",
"{\"avni_first_name\":{\"category\":\"^[A-Za-z ]+$\",\"input\":\"Sameer\",\"inserted_at\":\"2023-09-07T05:58:34.653513Z\",\"intent\":null},\"avni_last_name\":{\"category\":\"^[A-Za-z ]+$\",\"input\":\"Kohlapuri\",\"inserted_at\":\"2023-09-07T05:58:51.431716Z\",\"intent\":null},\"avni_optin\":{\"category\":\"Yes\",\"input\":\"Yes\",\"inserted_at\":\"2023-09-07T05:58:23.609434Z\",\"intent\":null,\"interactive\":{\"id\":\"\",\"postbackText\":\"\",\"reply\":\"Yes 1\",\"title\":\"Yes\"}},\"child\":{\"Language\":{\"category\":\"English\",\"input\":\"English\",\"inserted_at\":\"2023-09-07T05:58:09.972092Z\",\"intent\":null,\"interactive\":{\"description\":\"\",\"id\":\"\",\"postbackText\":\"\",\"reply\":\"English 1\",\"title\":\"English\"}}},\"flow_keyword\":{\"category\":\"registerme\",\"input\":\"registerme\",\"inserted_at\":\"2023-09-07T05:57:46.237686Z\"}}"),
fieldValueListBuilder.buildFlowResult(
"919900310515",
"10427672",
"{\"avni_first_name\":{\"category\":\"^[A-Za-z ]+$\",\"input\":\"Utkarsh\",\"inserted_at\":\"2023-08-21T08:46:34.140088Z\",\"intent\":null},\"avni_last_name\":{\"category\":\"^[A-Za-z ]+$\",\"input\":\"Hathi\",\"inserted_at\":\"2023-08-21T08:46:44.420595Z\",\"intent\":null},\"avni_optin\":{\"category\":\"Yes\",\"input\":\"Yes\",\"inserted_at\":\"2023-08-21T08:46:26.843587Z\",\"intent\":null,\"interactive\":{\"id\":\"\",\"postbackText\":\"\",\"reply\":\"Yes 1\",\"title\":\"Yes\"}},\"child\":{\"Language\":{\"category\":\"English\",\"input\":\"English\",\"inserted_at\":\"2023-08-21T08:46:17.715479Z\",\"intent\":null,\"interactive\":{\"description\":\"\",\"id\":\"\",\"postbackText\":\"\",\"reply\":\"English 1\",\"title\":\"English\"}}},\"flow_keyword\":{\"category\":\"registerme\",\"input\":\"registerme\",\"inserted_at\":\"2023-08-21T08:45:54.533500Z\"}}"),
fieldValueListBuilder.buildFlowResult(
"919317217785",
"9843637",
"{\"avni_optin\":{\"category\":\"Other\",\"input\":\"skillsonwheels\",\"inserted_at\":\"2023-08-18T09:53:26.753549Z\",\"intent\":null},\"child\":{\"Language\":{\"category\":\"English\",\"input\":\"English\",\"inserted_at\":\"2023-08-08T05:35:17.430486Z\",\"intent\":null,\"interactive\":{\"description\":\"\",\"id\":\"\",\"postbackText\":\"\",\"reply\":\"English 1\",\"title\":\"English\"}}},\"flow_keyword\":{\"category\":\"registerme\",\"input\":\"registerme\",\"inserted_at\":\"2023-08-08T05:34:43.903087Z\"}}")
);

Page<FieldValueList> resultsPage = new PageImpl<>((PageImpl.NextPageFetcher<FieldValueList>) () -> null, null, queryResults);
return new TableResult(new SchemaBuilder().flowResultSchema(), queryResults.size(), resultsPage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.avni_integration_service.glific.bigQuery.domain;

import org.avni_integration_service.util.ObjectJsonMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class FlowResultTest {
private FlowResult flowResult;

@BeforeEach
public void setup() {
Map<String, Object> json = ObjectJsonMapper.readValue(
this.getClass().getResourceAsStream("/sampleFlowResult.json"),
Map.class);
flowResult = new FlowResult(json);
}

@Test
public void shouldGetTopLevelFields() {
assertEquals("913652322176", flowResult.getContactPhone());
}

@Test
public void shouldGetResultItemInputByKey() throws IOException {
assertEquals("Singh", flowResult.getInput("avni_last_name"));
}

@Test
public void shouldGetResultItemCategoryByKey() throws IOException {
assertEquals("Telangana", flowResult.getCategory("avni_state"));
}

@Test
public void shouldReturnNullIfNotAvailable() throws IOException {
assertNull(flowResult.getCategory("non-existent-key"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.avni_integration_service.glific.bigQuery.mapper;

import com.google.cloud.bigquery.TableResult;
import org.avni_integration_service.glific.bigQuery.builder.TableResultBuilder;
import org.avni_integration_service.glific.bigQuery.domain.FlowResult;
import org.junit.jupiter.api.Test;

import java.util.Iterator;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class BigQueryResultsMapperTest {

@Test
public void shouldCleanlyIterateOverABigQueryResult() {
TableResult tableResult = new TableResultBuilder().build();

Iterator<FlowResult> flowResults = new BigQueryResultsMapper<FlowResult>().map(tableResult, new FlowResultMapper());

FlowResult firstFlowResult = flowResults.next();
assertEquals("919317217785", firstFlowResult.getContactPhone());
assertEquals("919317217785", firstFlowResult.getContactPhone());
assertEquals("Suresh", firstFlowResult.getInput("avni_first_name"));

assertEquals(3, remainingItemsIn(flowResults));
}

private int remainingItemsIn(Iterator<FlowResult> iterator) {
int count = 0;
while (iterator.hasNext()) {
count++;
iterator.next();
}
return count;
}

}
Loading

0 comments on commit 23977c4

Please sign in to comment.