Skip to content

Commit

Permalink
Backend supports recalibrating checks that caused a data quality inci…
Browse files Browse the repository at this point in the history
…dent.
  • Loading branch information
piotrczarnas committed May 22, 2024
1 parent 6ee2ef5 commit d3f8660
Show file tree
Hide file tree
Showing 140 changed files with 2,971 additions and 32 deletions.
44 changes: 44 additions & 0 deletions dqops/src/main/java/com/dqops/checks/AbstractCheckSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.dqops.checks.comparison.AbstractComparisonCheckCategorySpecMap;
import com.dqops.core.secrets.SecretValueProvider;
import com.dqops.data.checkresults.normalization.CheckResultsNormalizedResult;
import com.dqops.metadata.basespecs.AbstractSpec;
import com.dqops.metadata.comments.CommentsListSpec;
import com.dqops.metadata.id.ChildHierarchyNodeFieldMapImpl;
Expand All @@ -36,6 +37,7 @@
import com.google.common.base.Strings;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import tech.tablesaw.api.IntColumn;

import java.util.Objects;

Expand Down Expand Up @@ -456,4 +458,46 @@ public boolean isStandard() {
public boolean isDefault() {
return false; // we serialize all checks, even when they have no parameters (because they are too simple to have parameters) and have no alert thresholds (because they are only capturing values)
}

/**
* Checks if any rules (warning, error, fatal) are configured.
* @return True when any severity rule is configured, false otherwise.
*/
public boolean hasAnyRulesEnabled() {
return this.getWarning() != null || this.getError() != null || this.getFatal() != null;
}

/**
* Changes the rule parameters to decrease rule severity and generate less alerts.
* @param checkResultsSingleCheck History of check results for this check for the time period used for analysis.
*/
public void decreaseCheckSensitivity(CheckResultsNormalizedResult checkResultsSingleCheck) {
if (checkResultsSingleCheck.isEmpty()) {
return;
}

if (checkResultsSingleCheck.getActualValueColumn().isNotMissing().isEmpty()) {
return; // no results, most calculations will fail
}

IntColumn severityColumn = checkResultsSingleCheck.getSeverityColumn();

if (this.getFatal() != null) {
if (!severityColumn.isEqualTo(3.0).isEmpty()) {
this.getFatal().decreaseRuleSensitivity(checkResultsSingleCheck);
}
}

if (this.getError() != null) {
if (!severityColumn.isEqualTo(2.0).isEmpty()) {
this.getError().decreaseRuleSensitivity(checkResultsSingleCheck);
}
}

if (this.getWarning() != null) {
if (!severityColumn.isEqualTo(1.0).isEmpty()) {
this.getWarning().decreaseRuleSensitivity(checkResultsSingleCheck);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public class CheckResultsColumnNames extends SensorReadoutsColumnNames {
* List of column names that should be loaded from the parquet files when the recent result overview is needed.
* We only want to read the statuses.
*/
public static final String[] COLUMN_NAMES_FOR_RESULTS_OVERVIEW = new String[] {
@Deprecated
public static final String[] COLUMN_NAMES_FOR_RESULTS_OVERVIEW_OBSOLETE = new String[] {
COLUMN_NAME_COLUMN_NAME,
CHECK_TYPE_COLUMN_NAME,
TIME_GRADIENT_COLUMN_NAME,
Expand Down Expand Up @@ -162,4 +163,10 @@ public class CheckResultsColumnNames extends SensorReadoutsColumnNames {

INCIDENT_HASH_COLUMN_NAME
};

/**
* List of column names that should be loaded from the parquet files when the recent result overview is needed.
* We only want to read the statuses.
*/
public static final String[] COLUMN_NAMES_FOR_RESULTS_OVERVIEW = CHECK_RESULTS_COLUMN_NAMES_FOR_READ_ONLY_ACCESS;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@

package com.dqops.data.incidents.models;

import com.dqops.checks.CheckType;
import com.dqops.data.incidents.factory.IncidentStatus;
import com.dqops.data.incidents.factory.IncidentsColumnNames;
import com.dqops.metadata.search.CheckSearchFilters;
import com.dqops.metadata.search.StringPatternComparer;
import com.dqops.metadata.sources.PhysicalTableName;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import com.google.common.base.Strings;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.parquet.Strings;
import tech.tablesaw.api.Row;

import java.time.Instant;
Expand Down Expand Up @@ -281,4 +284,23 @@ public static Comparator<IncidentModel> makeSortComparator(IncidentSortOrder sor
throw new NoSuchElementException("Unsupported sort order on: " + sortOrder);
}
}

/**
* Creates a check search filter that will find all data quality checks that are covered by this incident.
* @return Check search filter that matches all checks that are related to this incident.
*/
public CheckSearchFilters toCheckSearchFilter() {
CheckSearchFilters checkSearchFilters = new CheckSearchFilters();
checkSearchFilters.setConnection(this.getConnection());
checkSearchFilters.setPhysicalTableName(new PhysicalTableName(this.schema, this.table));
checkSearchFilters.setQualityDimension(this.getQualityDimension());
checkSearchFilters.setCheckCategory(this.getCheckCategory());
checkSearchFilters.setCheckName(this.getCheckName());

if (!Strings.isNullOrEmpty(this.checkType)) {
checkSearchFilters.setCheckType(CheckType.valueOf(this.checkType));
}

return checkSearchFilters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

import com.dqops.data.checkresults.factory.CheckResultsColumnNames;
import com.dqops.data.readouts.factory.SensorReadoutsColumnNames;
import com.dqops.metadata.search.CheckSearchFilters;
import com.dqops.metadata.timeseries.TimePeriodGradient;
import com.dqops.utils.tables.TableColumnUtility;
import com.google.common.base.Strings;
import tech.tablesaw.api.*;
import tech.tablesaw.columns.Column;
import tech.tablesaw.selection.Selection;

import java.time.Instant;

/**
* Describes the dataset (dataframe) returned from the sensor. Identifies the time series column, data stream columns, etc.
Expand Down Expand Up @@ -519,4 +524,84 @@ public TextColumn getUpdatedByColumn() {
public IntColumn getSeverityColumn() {
return severityColumn;
}

/**
* Finds sensor result rows matching check search filters.
* @param searchFilters Check search filters.
* @return Selection (bitmap filter) of rows with results for that check.
*/
public Selection findResults(CheckSearchFilters searchFilters) {
Table table = this.getTable();
Selection selection = Selection.withRange(0, table.rowCount());

if (!Strings.isNullOrEmpty(searchFilters.getConnection())) {
selection = selection.and(this.getConnectionNameColumn().isEqualTo(searchFilters.getConnection()));
}

if (searchFilters.getPhysicalTableName() != null) {
selection = selection.and(this.getSchemaNameColumn().isEqualTo(searchFilters.getPhysicalTableName().getSchemaName()));
selection = selection.and(this.getTableNameColumn().isEqualTo(searchFilters.getPhysicalTableName().getTableName()));
}

if (!Strings.isNullOrEmpty(searchFilters.getColumn())) {
selection = selection.and(this.getColumnNameColumn().isEqualTo(searchFilters.getColumn()));
}

if (searchFilters.getCheckType() != null) {
selection = selection.and(this.getCheckTypeColumn().isEqualTo(searchFilters.getCheckType().getDisplayName()));
}

if (searchFilters.getTimeScale() != null) {
selection = selection.and(this.getTimeGradientColumn().isEqualTo(searchFilters.getTimeScale().toTimeSeriesGradient().toString()));
}

if (!Strings.isNullOrEmpty(searchFilters.getCheckCategory())) {
selection = selection.and(this.getCheckCategoryColumn().isEqualTo(searchFilters.getCheckCategory()));
}

if (!Strings.isNullOrEmpty(searchFilters.getTableComparisonName())) {
selection = selection.and(this.getTableComparisonNameColumn().isEqualTo(searchFilters.getTableComparisonName()));
}

if (!Strings.isNullOrEmpty(searchFilters.getCheckName())) {
selection = selection.and(this.getCheckNameColumn().isEqualTo(searchFilters.getCheckName()));
}

if (!Strings.isNullOrEmpty(searchFilters.getQualityDimension())) {
selection = selection.and(this.getQualityDimensionColumn().isEqualTo(searchFilters.getQualityDimension()));
}

if (!Strings.isNullOrEmpty(searchFilters.getSensorName())) {
selection = selection.and(this.getSensorNameColumn().isEqualTo(searchFilters.getSensorName()));
}

return selection;
}

/**
* Returns true if the table with results is empty.
* @return True when empty, false when there are some rows.
*/
public boolean isEmpty() {
return this.getTable().isEmpty();
}

/**
* Finds the row index of the row that contains the most recent result.
* @return The row index that contains the most recent result or null, when no results are present.
*/
public Integer getRowIndexWithMostRecentResult() {
if (this.isEmpty()) {
return null;
}

Instant mostRecentExecutedAt = this.getExecutedAtColumn().max();
Selection selectionOfMostRecentResults = this.getExecutedAtColumn().isEqualTo(mostRecentExecutedAt);
if (selectionOfMostRecentResults.isEmpty()) {
return null; // rather not possible
}

int firstIndex = selectionOfMostRecentResults.get(selectionOfMostRecentResults.size() - 1);
return firstIndex;
}
}
36 changes: 34 additions & 2 deletions dqops/src/main/java/com/dqops/metadata/id/HierarchyNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,46 @@
import com.dqops.utils.serialization.YamlNotRenderWhenDefault;
import org.apache.commons.collections.IteratorUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;

/**
* Interface implemented by objects that are represented on the hierarchy ID tree.
*/
public interface HierarchyNode extends DirtyStatus, ReadOnlyStatus {
/**
* Find an element in an array that is of a given data type or a derived type.
* @param nodes Array of hierarchy nodes to search inside.
* @param targetType Target type to find or a base type.
* @return Found node or null.
* @param <T> Target type.
*/
static <T extends HierarchyNode> T findNodeOfType(HierarchyNode[] nodes, Class<T> targetType) {
for (HierarchyNode node : nodes) {
if (targetType.isAssignableFrom(node.getClass())) {
return (T)node;
}
}

return null;
}

/**
* Find an element in an array that matches a predicate.
* @param nodes Array of hierarchy nodes to search inside.
* @param filter Filter predicate
* @return Found node or null.
*/
static HierarchyNode findNode(HierarchyNode[] nodes, Predicate<HierarchyNode> filter) {
for (HierarchyNode node : nodes) {
if (filter.test(node)) {
return node;
}
}

return null;
}

/**
* Detach all child nodes that are default (empty) and will not be rendered into YAML anyway.
* The purpose of this method is to get rid of extra nodes that were created for a short time to avoid a serialization/deserialization approach for dropping empty nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package com.dqops.metadata.search;

import com.dqops.checks.CheckTarget;
import com.dqops.checks.CheckTimeScale;
import com.dqops.checks.CheckType;
import com.dqops.checks.*;
import com.dqops.checks.comparison.AbstractComparisonCheckCategorySpec;
import com.dqops.metadata.id.HierarchyId;
import com.dqops.metadata.id.HierarchyIdModel;
import com.dqops.metadata.id.HierarchyNode;
import com.dqops.metadata.search.pattern.SearchPattern;
import com.dqops.metadata.sources.ColumnTypeSnapshotSpec;
import com.dqops.metadata.sources.ConnectionSpec;
import com.dqops.metadata.sources.PhysicalTableName;
import com.dqops.utils.docs.generators.SampleStringsRegistry;
import com.dqops.utils.docs.generators.SampleValueFactory;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand All @@ -31,12 +33,10 @@
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.swagger.annotations.ApiModel;
import lombok.EqualsAndHashCode;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.parquet.Strings;

import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -423,6 +423,42 @@ public CheckSearchFilters clone() {
}
}

/**
* Creates a check search filter given an instance of a check inside a given connection.
* @param connectionSpec Connection specification where the check is applied.
* @param checkSpec Check specification instance, must be inside a user home and the connection that was given.
* @return Check search filters with all filters that identify the check.
*/
public static CheckSearchFilters fromCheckSpecInstance(ConnectionSpec connectionSpec, AbstractCheckSpec<?,?,?,?> checkSpec) {
HierarchyId checkHierarchyId = checkSpec.getHierarchyId();
HierarchyNode[] allNodesToCheck = checkHierarchyId.getNodesOnPath(connectionSpec);
AbstractRootChecksContainerSpec rootChecksContainerSpec = HierarchyNode.findNodeOfType(allNodesToCheck, AbstractRootChecksContainerSpec.class);

CheckSearchFilters checkSearchFilters = new CheckSearchFilters();
checkSearchFilters.setConnection(connectionSpec.getConnectionName());
PhysicalTableName physicalTableName = checkHierarchyId.getPhysicalTableName();
checkSearchFilters.setPhysicalTableName(physicalTableName);
checkSearchFilters.setFullTableName(physicalTableName.toTableSearchFilter());
checkSearchFilters.setColumn(checkHierarchyId.getColumnName());
checkSearchFilters.setCheckCategory(checkSearchFilters.getCheckCategory());
checkSearchFilters.setCheckName(checkSpec.getCheckName());
checkSearchFilters.setCheckTarget(rootChecksContainerSpec.getCheckTarget());
checkSearchFilters.setCheckType(rootChecksContainerSpec.getCheckType());
checkSearchFilters.setTimeScale(rootChecksContainerSpec.getCheckTimeScale());

if (checkSpec.isTableComparisonCheck()) {
AbstractComparisonCheckCategorySpec comparisonCheckCategorySpec = HierarchyNode.findNodeOfType(allNodesToCheck, AbstractComparisonCheckCategorySpec.class);
checkSearchFilters.setTableComparisonName(comparisonCheckCategorySpec.getComparisonName());
}

return checkSearchFilters;
}

/**
* Create a check search filter from a table search filter object, copying shared attributes.
* @param tableSearchFilters Source table search filter.
* @return Check search filter.
*/
public static CheckSearchFilters fromTableSearchFilters(TableSearchFilters tableSearchFilters) {
return new CheckSearchFilters() {{
setConnection(tableSearchFilters.getConnection());
Expand Down
Loading

0 comments on commit d3f8660

Please sign in to comment.