Skip to content

Commit

Permalink
#000 - removing comments and unnecessary methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
petmongrels committed Oct 27, 2023
1 parent e9f6ca7 commit bb87dcf
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 171 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.avni_integration_service.lahi.domain;

import org.apache.log4j.Logger;
import org.avni_integration_service.lahi.service.DataExtractorService;
import org.avni_integration_service.lahi.service.BigQueryClient;
import org.avni_integration_service.lahi.util.DateTimeUtil;
import org.springframework.stereotype.Component;

Expand All @@ -12,7 +12,7 @@

@Component
public class StudentValidator {
private static final Logger logger = Logger.getLogger(DataExtractorService.class);
private static final Logger logger = Logger.getLogger(BigQueryClient.class);
public void validateMandatoryField(Map<String,Object> map){
long count = StudentConstants.MandatoryField.stream().filter(field->{
if(map.getOrDefault(field,null)==null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void execute() {
try {
logger.info("Lahi Main Job Started !!!!!");
avniHttpClient.setAvniSession(lahiAvniSessionFactory.createSession());
studentWorker.fetchDetails();
// TODO: 08/09/23
studentWorker.processStudent();
// TODO: 08/09/23
healthCheckService.success(HEALTHCHECK_SLUG);
logger.info("Lahi Main Job Ended !!!!!");
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.avni_integration_service.lahi.service;

import com.google.cloud.bigquery.*;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.log4j.Logger;
import org.avni_integration_service.lahi.config.BigQueryConnector;
import org.avni_integration_service.lahi.domain.StudentConstants;
import org.springframework.stereotype.Component;

import java.util.*;

@Component
public class BigQueryClient {
public static final String RESULTS = "results";
private final BigQueryConnector bigQueryConnector;
private static final Logger logger = Logger.getLogger(BigQueryClient.class);

public BigQueryClient(BigQueryConnector bigQueryConnector) {
this.bigQueryConnector = bigQueryConnector;
}

public TableResult queryWithPagination(String query, String date, int limit) {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.addNamedParameter("updated_at", QueryParameterValue.string(date))
.addNamedParameter("limit_count", QueryParameterValue.int64(limit))
.build();
return queryCall(queryConfig);
}

public TableResult queryCall(QueryJobConfiguration queryJobConfiguration) {
try {
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigQueryConnector.getBigQuery().create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());
queryJob = queryJob.waitFor();

if (queryJob == null) {
logger.info("query job is null");
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
logger.info(queryJob.getStatus().getError().toString());
throw new RuntimeException(queryJob.getStatus().getError().toString());
}

return queryJob.getQueryResults();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public List<Map<String, Object>> filterData(TableResult response) {
Schema schema = response.getSchema();
List<Map<String, Object>> list1 = new LinkedList<>();
for (FieldValueList row : response.iterateAll()) {
Map<String, Object> resultMap = new HashMap<>();
for (int i = 0; i < schema.getFields().size(); i++) {
Field field = schema.getFields().get(i);
FieldValue fieldValue = row.get(i);
String fieldName = field.getName();
if (fieldName.equals(RESULTS)) {
getResultData(resultMap, fieldValue.getStringValue());
}
resultMap.put(fieldName, fieldValue.getStringValue());
}
list1.add(resultMap);
}
return list1;
}

public void getResultData(Map<String, Object> map, String result) {
JsonObject jsonObject = new JsonParser().parse(result).getAsJsonObject();
StudentConstants.ResultFieldList.forEach(field -> {
map.put(field, getDataFromJson(jsonObject, field));
});
}

public String getDataFromJson(JsonObject jsonObject, String field) {
return (jsonObject.has(field)) ? jsonObject.getAsJsonObject(field).get("input").getAsString() : null;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
public class StudentService {
public static final String ENTITYTYPE = "Student";
private final StudentMappingService studentMappingService;
private final DataExtractorService dataExtractorService;
private final BigQueryClient dataExtractorService;
private final StudentValidator studentValidator;
private final StudentRepository studentRepository;
private final IntegratingEntityStatusRepository integratingEntityStatusRepository;
private final StudentErrorService studentErrorService;

public StudentService(StudentMappingService studentMappingService,
DataExtractorService dataExtractorService,
BigQueryClient dataExtractorService,
StudentValidator studentValidator,
StudentRepository studentRepository,
IntegratingEntityStatusRepository integratingEntityStatusRepository,
Expand Down Expand Up @@ -59,111 +59,73 @@ public StudentService(StudentMappingService studentMappingService,
public static final int LIMIT = 1000;
private static final Logger logger = Logger.getLogger(StudentService.class);

public void extractDataFromBigdata(){
try {
// TODO: 10/10/23 get date avni_entity_status
String fetchtime = getIntegratingEntityStatus().getReadUptoDateTime().toString();
TableResult response = dataExtractorService.queryWithPagination(BULK_FETCH_QUERY,fetchtime, LIMIT);
List<Map<String,Object>> filterData = dataExtractorService.filterData(response);
logger.info(String.format("%s Data get after fetching from glific",filterData.size()));
splitAndProcess(filterData);
} catch (Throwable t) {
//TODO invoke LAHI Failure HealthCheck
}
}

private void splitAndProcess(List<Map<String,Object>> filterData){
public void extractDataFromBigdata() {
String fetchtime = getIntegratingEntityStatus().getReadUptoDateTime().toString();
TableResult response = dataExtractorService.queryWithPagination(BULK_FETCH_QUERY, fetchtime, LIMIT);
List<Map<String, Object>> filterData = dataExtractorService.filterData(response);
logger.info(String.format("%s Data get after fetching from glific", filterData.size()));
logger.info("Splitting the record and doing next step !!!");
filterData.forEach(this::processing);
}

private void processing(Map<String,Object> data){
private void processing(Map<String, Object> data) {
try {
logger.info("record preprocessing started");
preprocessing(data);
logger.info("record syncprocessing started");
syncprocessing(data);
syncProcessing(data);
logger.info("record postprocessing started");
postprocessing();
throw new RuntimeException("error log testing");
} catch (Throwable t) {
//TODO handle error by creating errorRecord
String entity_id = data.get(FLOWRESULT_ID).toString();
studentErrorService.errorOccurred(entity_id, StudentErrorType.CommonError, AvniEntityType.Subject,t.getMessage());
studentErrorService.errorOccurred(entity_id, StudentErrorType.CommonError, AvniEntityType.Subject, t.getMessage());
}
}


/*
in preprocessing we will handle
mandatory field check
validation of age
*/
private void preprocessing(Map<String,Object> data){
private void preprocessing(Map<String, Object> data) {
checkAge(data);
//TODO introduce additional validations
}

private void checkMandatory(Map<String,Object> data){
studentValidator.validateMandatoryField(data);
private void checkMandatory(Map<String, Object> data) {
studentValidator.validateMandatoryField(data);
}

private void checkAge(Map<String,Object> data) {
private void checkAge(Map<String, Object> data) {
studentValidator.checkAge(data.get(DATE_OF_BIRTH).toString());
}


/*
In syncprocessing we are doing following task
set subject field
set observation field
set other field
*/
private void syncprocessing(Map<String,Object> data){
Student student = Student.from(data);
Subject subject = student.subjectWithoutObservations();
studentMappingService.populateObservations(subject,student, LahiMappingDbConstants.MAPPINGGROUP_STUDENT);
studentMappingService.setOtherObservation(subject,student);
insert(subject,student);
private void syncProcessing(Map<String, Object> data) {
Student student = Student.from(data);
Subject subject = student.subjectWithoutObservations();
studentMappingService.populateObservations(subject, student, LahiMappingDbConstants.MAPPINGGROUP_STUDENT);
studentMappingService.setOtherObservation(subject, student);
insert(subject, student);
}

/*
In postprocessing
updating integrating_entity_status
*/
private void postprocessing() {
// updateIntegrationStatus(new Date());
}

private void updateIntegrationStatus(Date readUptoDateTime){
private void updateIntegrationStatus(Date readUptoDateTime) {
IntegratingEntityStatus integratingEntityStatus = getIntegratingEntityStatus();
// TODO: 10/10/23 getFetching record
integratingEntityStatus.setReadUptoDateTime(readUptoDateTime);
integratingEntityStatusRepository.save(integratingEntityStatus);
logger.info(String.format("Updating integrating_entity_status with %s date",integratingEntityStatus.getReadUptoDateTime()));
logger.info(String.format("Updating integrating_entity_status with %s date", integratingEntityStatus.getReadUptoDateTime()));
}


private void insert(Subject subject, Student student){
studentRepository.insert(subject);
Date date = DateTimeUtil.lastUpdatedDate(student.getResponse().get(FLOW_RESULT_UPDATED_AT).toString());
updateIntegrationStatus(date);
private void insert(Subject subject, Student student) {
studentRepository.insert(subject);
Date date = DateTimeUtil.lastUpdatedDate(student.getResponse().get(FLOW_RESULT_UPDATED_AT).toString());
updateIntegrationStatus(date);
}

private IntegratingEntityStatus getIntegratingEntityStatus(){
private IntegratingEntityStatus getIntegratingEntityStatus() {
IntegratingEntityStatus integratingEntityStatus = integratingEntityStatusRepository.findByEntityType(ENTITYTYPE);
if(integratingEntityStatus == null) {
if (integratingEntityStatus == null) {
throw new RuntimeException("unable to find IntegratingEntityStatus");
}
return integratingEntityStatus;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@

@Component
public class StudentWorker {

private static final Logger logger = Logger.getLogger(StudentWorker.class);
private final StudentService studentService;

public StudentWorker(StudentService studentService) {
this.studentService = studentService;
}

public void fetchDetails() throws InterruptedException {
logger.info("fetch detail starting !!!!!!!!!!");
public void processStudent() {
studentService.extractDataFromBigdata();
logger.info("fetching ended");
}


}

0 comments on commit bb87dcf

Please sign in to comment.