Skip to content

Commit

Permalink
#102 | Refactor. Use BigQueryClientNew to get rows from BigQuery. Flu…
Browse files Browse the repository at this point in the history
…sh out LahiStudent
  • Loading branch information
vinayvenu committed Nov 6, 2023
1 parent 612610e commit 2c9872a
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 201 deletions.
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
package org.avni_integration_service.glific.bigQuery;

import com.google.cloud.bigquery.*;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.log4j.Logger;
import org.avni_integration_service.glific.bigQuery.config.BigQueryConnector;
import org.avni_integration_service.glific.bigQuery.mapper.BigQueryResultMapper;
import org.avni_integration_service.glific.bigQuery.mapper.BigQueryResultsMapper;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.Iterator;
import java.util.UUID;

@Component
public class BigQueryClient {
public static final String RESULTS = "results";
private final BigQueryConnector bigQueryConnector;
private static final Logger logger = Logger.getLogger(BigQueryClient.class);

public BigQueryClient(BigQueryConnector bigQueryConnector) {
this.bigQueryConnector = bigQueryConnector;
}

public List<Map<String, Object>> queryWithPagination(String query, String date, int limit, List<String> fields) {
public <T> Iterator<T> getResults(String query, String date, int limit, BigQueryResultMapper<T> resultMapper) {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.addNamedParameter("updated_at", QueryParameterValue.string(date))
.addNamedParameter("limit_count", QueryParameterValue.int64(limit))
.build();
TableResult tableResult = queryCall(queryConfig);
return this.filterData(tableResult, fields);
TableResult tableResult = run(queryConfig);
return new BigQueryResultsMapper<T>().map(tableResult, resultMapper);
}

private TableResult queryCall(QueryJobConfiguration queryJobConfiguration) {
private TableResult run(QueryJobConfiguration queryJobConfiguration) {
try {
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigQueryConnector.getBigQuery().create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());
Expand All @@ -50,34 +50,4 @@ private TableResult queryCall(QueryJobConfiguration queryJobConfiguration) {
throw new RuntimeException(e);
}
}

private List<Map<String, Object>> filterData(TableResult response, List<String> resultFields) {
Schema schema = response.getSchema();
List<Map<String, Object>> list1 = new LinkedList<>();
for (FieldValueList row : response.iterateAll()) {
Map<String, Object> resultMap = new HashMap<>();
for (int i = 0; i < schema.getFields().size(); i++) {
Field field = schema.getFields().get(i);
FieldValue fieldValue = row.get(i);
String fieldName = field.getName();
if (fieldName.equals(RESULTS)) {
getResultData(resultMap, fieldValue.getStringValue(), resultFields);
}
resultMap.put(fieldName, fieldValue.getStringValue());
}
list1.add(resultMap);
}
return list1;
}

private void getResultData(Map<String, Object> map, String result, List<String> resultFields) {
JsonObject jsonObject = new JsonParser().parse(result).getAsJsonObject();
resultFields.forEach(field -> {
map.put(field, getDataFromJson(jsonObject, field));
});
}

private String getDataFromJson(JsonObject jsonObject, String field) {
return (jsonObject.has(field)) ? jsonObject.getAsJsonObject(field).get("input").getAsString() : null;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ public String getCategory(String key) {
return (String) getResult(key).get("category");
}

public String getFlowResultId() {
return (String) fields.get("flowresult_id");
}

public String lastUpdatedAt() {
return (String) fields.get("updated_at");
}

public String insertedAt() {
return (String) fields.get("inserted_at");
}

private Map<String, Object> getResult(String key) {
return (Map<String, Object>) results.getOrDefault(key, new HashMap<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class BigQueryClientNewTest {
public class BigQueryClientTest {

@Test
public void shouldMapResponseToMapper() throws InterruptedException {
Expand All @@ -31,7 +31,7 @@ public void shouldMapResponseToMapper() throws InterruptedException {
when(job.getStatus()).thenReturn(jobStatus);
when(jobStatus.getError()).thenReturn(null);

Iterator<FlowResult> results = new BigQueryClientNew(bqConnector).getResults("query", "2023-01-01", 5, flowResultMapper);
Iterator<FlowResult> results = new BigQueryClient(bqConnector).getResults("query", "2023-01-01", 5, flowResultMapper);

FlowResult firstFlowResult = results.next();
assertEquals("919317217785", firstFlowResult.getContactPhone());
Expand Down
2 changes: 2 additions & 0 deletions integrator/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
spring.config.import=classpath:/integrator-application.properties

## Dummy value to enable service start. Set up environment variable GOOGLE_APPLICATION_CREDENTIALS
## if you need it
spring.cloud.gcp.credentials.location=classpath:gcp.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,80 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.log4j.Logger;
import org.avni_integration_service.glific.bigQuery.domain.FlowResult;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.*;

@JsonIgnoreProperties(ignoreUnknown = true)
public class LahiStudent extends LahiEntity implements LahiStudentConstants {
public class LahiStudent implements LahiStudentConstants {
private static final Logger logger = Logger.getLogger(LahiStudent.class);
private static final List<String> Core_Fields = Arrays.asList(FIRST_NAME,LAST_NAME,DATE_OF_BIRTH, DATE_OF_REGISTRATION,GENDER);
private static final List<String> Core_Fields = Arrays.asList(FIRST_NAME,LAST_NAME,DATE_OF_BIRTH, GENDER);
private static final List<String> PRIMITIVE_OBS_FIELDS = Arrays.asList(OTHER_STATE, DISTRICT, CITY_NAME, SCHOOL,
// STUDENT_CONTACT_NUMBER, ALTERNATE_NUMBER,
EMAIL, OTHER_QUALIFICATION, FATHER_NAME, QUALIFICATION_STREAM);
private static final List<String> CODED_OBS_FIELDS = Arrays.asList(STATE, HIGHEST_QUALIFICATION,
QUALIFICATION_STATUS, ACADEMIC_YEAR, VOCATIONAL, TRADE, STREAM);

public LahiStudent(Map<String, Object> response) {
super(response);
this.response = response;
private FlowResult flowResult;

public LahiStudent(FlowResult flowResult) {
this.flowResult = flowResult;
}

public String getContactPhone() {
return flowResult.getContactPhone();
}

public String getAlternatePhoneNumber() {
return getInput(ALTERNATE_NUMBER);
}

public String getFirstName() {
return getInput(FIRST_NAME);
}

public String getLastName() {
return getInput(LAST_NAME);
}

public String getGender() {
return getCategory(GENDER);
}

public Map<String, Object> getResponse() {
return response;
public String getDateOfRegistration() {
return getInsertedAt();
}

public void setResponse(Map<String, Object> response) {
this.response = response;
public String getDateOfBirth() {
return getInput(DATE_OF_BIRTH);
}

@Override
public List<String> getObservationFields() {
return response.keySet().stream().filter(s -> !Core_Fields.contains(s)).collect(Collectors.toList());
public String getInput(String key) {
return flowResult.getInput(key);
}

@Override
public Object getValue(String responseField) {
return this.response.get(responseField);
public String getCategory(String key) {
return flowResult.getCategory(key);
}

public String getLastUpdatedAt() {
return flowResult.getUpdatedAt();
}

public String getInsertedAt() {
return flowResult.getInsertedAt();
}

public String getFlowResultId() {
return flowResult.getFlowResultId();
}

public Map<String, String> getObservations() {
HashMap<String, String> observations = new HashMap<>();

PRIMITIVE_OBS_FIELDS.forEach(fieldName -> observations.put(fieldName, getInput(fieldName)));
CODED_OBS_FIELDS.forEach(fieldName -> observations.put(fieldName, getCategory(fieldName)));

return observations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ public interface LahiStudentConstants {
String FIRST_NAME = "avni_first_name";
String LAST_NAME = "avni_last_name";
String DATE_OF_BIRTH = "avni_date_of_birth";
String DATE_OF_REGISTRATION = "inserted_at";
String FLOW_RESULT_UPDATED_AT = "updated_at";
String GENDER = "avni_gender";
String STATE = "avni_state";
String OTHER_STATE = "avni_other_state";
Expand All @@ -25,38 +23,11 @@ public interface LahiStudentConstants {
String VOCATIONAL = "avni_vocational";
String TRADE = "avni_trade";
String STUDENT_ADDRESS = "Other, Other, Other, Other";
String AVNI_OPTIN = "avni_optin";
String FATHER_NAME = "avni_father_name";
String STREAM = "avni_stream";
String QUALIFICATION_STREAM = "avni_other_qualification_stream";
String FLOWRESULT_ID = "flowresult_id";

List<String> ResultFieldList =
Arrays.asList(
FIRST_NAME,
LAST_NAME,
FATHER_NAME,
DATE_OF_BIRTH,
GENDER,
STATE,
OTHER_STATE,
DISTRICT,
CITY_NAME,
SCHOOL,
ALTERNATE_NUMBER,
EMAIL,
HIGHEST_QUALIFICATION,
OTHER_QUALIFICATION,
QUALIFICATION_STATUS,
ACADEMIC_YEAR,
VOCATIONAL,
TRADE,
AVNI_OPTIN,
STREAM,
QUALIFICATION_STREAM,
FLOWRESULT_ID
);

List<String> MandatoryFields =
Arrays.asList(
FIRST_NAME,
Expand All @@ -71,5 +42,4 @@ public interface LahiStudentConstants {
OTHER_QUALIFICATION,
VOCATIONAL
);

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.avni_integration_service.lahi.domain;

import org.apache.log4j.Logger;
import org.avni_integration_service.glific.bigQuery.BigQueryClient;
import org.avni_integration_service.lahi.util.DateTimeUtil;
import org.springframework.stereotype.Component;

Expand All @@ -11,7 +10,7 @@

@Component
public class StudentValidator {
private static final Logger logger = Logger.getLogger(BigQueryClient.class);
private static final Logger logger = Logger.getLogger(StudentValidator.class);

public void validateMandatoryField(Map<String, Object> map) {
long count = LahiStudentConstants.MandatoryFields.stream().filter(field -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import java.util.Date;

import static org.avni_integration_service.lahi.domain.LahiStudentConstants.FLOW_RESULT_UPDATED_AT;
import static org.avni_integration_service.lahi.service.LahiStudentService.ENTITYTYPE;

@Service
Expand All @@ -22,7 +21,7 @@ public LahiIntegrationDataService(IntegratingEntityStatusRepository integratingE
}

public void studentProcessed(LahiStudent student) {
Date date = DateTimeUtil.lastUpdatedDate(student.getResponse().get(FLOW_RESULT_UPDATED_AT).toString());
Date date = DateTimeUtil.lastUpdatedDate(student.getLastUpdatedAt());
IntegratingEntityStatus integratingEntityStatus = integratingEntityStatusRepository.findByEntityType(ENTITYTYPE);
integratingEntityStatus.setReadUptoDateTime(date);
integratingEntityStatusRepository.save(integratingEntityStatus);
Expand Down
Loading

0 comments on commit 2c9872a

Please sign in to comment.