Skip to content

Commit

Permalink
Enable finer-grained control over the postgres indexing and searching (
Browse files Browse the repository at this point in the history
…#93)

* Enable the searching of the JSON field in Postgres to be configured

* Enable the indexing of tasks to be enabled / disabled
  • Loading branch information
bjpirt authored Mar 3, 2024
1 parent 9892516 commit fd675a1
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public class ConductorProperties {
@DurationUnit(ChronoUnit.SECONDS)
private Duration taskExecutionPostponeDuration = Duration.ofSeconds(60);

/** Used to enable/disable the indexing of tasks. */
private boolean taskIndexingEnabled = true;

/** Used to enable/disable the indexing of task execution logs. */
private boolean taskExecLogIndexingEnabled = true;

Expand Down Expand Up @@ -333,6 +336,14 @@ public void setTaskExecLogIndexingEnabled(boolean taskExecLogIndexingEnabled) {
this.taskExecLogIndexingEnabled = taskExecLogIndexingEnabled;
}

public boolean isTaskIndexingEnabled() {
return taskIndexingEnabled;
}

public void setTaskIndexingEnabled(boolean taskIndexingEnabled) {
this.taskIndexingEnabled = taskIndexingEnabled;
}

public boolean isAsyncIndexingEnabled() {
return asyncIndexingEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public void updateTask(TaskModel taskModel) {
* of tasks on a system failure. So only index for each update if async indexing is not enabled.
* If it *is* enabled, tasks will be indexed only when a workflow is in terminal state.
*/
if (!properties.isAsyncIndexingEnabled()) {
if (!properties.isAsyncIndexingEnabled() && properties.isTaskIndexingEnabled()) {
indexDAO.indexTask(new TaskSummary(taskModel.toTask()));
}
} catch (TerminateWorkflowException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ public PostgresQueueDAO postgresQueueDAO(
@ConditionalOnProperty(name = "conductor.indexing.type", havingValue = "postgres")
public PostgresIndexDAO postgresIndexDAO(
@Qualifier("postgresRetryTemplate") RetryTemplate retryTemplate,
ObjectMapper objectMapper) {
return new PostgresIndexDAO(retryTemplate, objectMapper, dataSource);
ObjectMapper objectMapper,
PostgresProperties properties) {
return new PostgresIndexDAO(retryTemplate, objectMapper, dataSource, properties);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class PostgresProperties {

public String schema = "public";

public boolean allowFullTextQueries = true;

public boolean allowJsonQueries = true;

public Duration getTaskDefCacheRefreshInterval() {
return taskDefCacheRefreshInterval;
}
Expand All @@ -52,4 +56,20 @@ public String getSchema() {
public void setSchema(String schema) {
this.schema = schema;
}

public boolean getAllowFullTextQueries() {
return allowFullTextQueries;
}

public void setAllowFullTextQueries(boolean allowFullTextQueries) {
this.allowFullTextQueries = allowFullTextQueries;
}

public boolean getAllowJsonQueries() {
return allowJsonQueries;
}

public void setAllowJsonQueries(boolean allowJsonQueries) {
this.allowJsonQueries = allowJsonQueries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,22 @@
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.postgres.config.PostgresProperties;
import com.netflix.conductor.postgres.util.PostgresIndexQueryBuilder;

import com.fasterxml.jackson.databind.ObjectMapper;

public class PostgresIndexDAO extends PostgresBaseDAO implements IndexDAO {

private final PostgresProperties properties;

public PostgresIndexDAO(
RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) {
RetryTemplate retryTemplate,
ObjectMapper objectMapper,
DataSource dataSource,
PostgresProperties properties) {
super(retryTemplate, objectMapper, dataSource);
this.properties = properties;
}

@Override
Expand Down Expand Up @@ -69,7 +76,7 @@ public SearchResult<WorkflowSummary> searchWorkflowSummary(
String query, String freeText, int start, int count, List<String> sort) {
PostgresIndexQueryBuilder queryBuilder =
new PostgresIndexQueryBuilder(
"workflow_index", query, freeText, start, count, sort);
"workflow_index", query, freeText, start, count, sort, properties);

List<WorkflowSummary> results =
queryWithTransaction(
Expand Down Expand Up @@ -117,7 +124,8 @@ public void indexTask(TaskSummary task) {
public SearchResult<TaskSummary> searchTaskSummary(
String query, String freeText, int start, int count, List<String> sort) {
PostgresIndexQueryBuilder queryBuilder =
new PostgresIndexQueryBuilder("task_index", query, freeText, start, count, sort);
new PostgresIndexQueryBuilder(
"task_index", query, freeText, start, count, sort, properties);

List<TaskSummary> results =
queryWithTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.commons.lang3.StringUtils;

import com.netflix.conductor.postgres.config.PostgresProperties;

public class PostgresIndexQueryBuilder {

private final String table;
Expand All @@ -33,6 +35,10 @@ public class PostgresIndexQueryBuilder {
private final List<String> sort;
private final List<Condition> conditions = new ArrayList<>();

private boolean allowJsonQueries;

private boolean allowFullTextQueries;

private static final String[] VALID_FIELDS = {
"workflow_id",
"correlation_id",
Expand Down Expand Up @@ -128,12 +134,20 @@ public void setValues(List<String> values) {
}

public PostgresIndexQueryBuilder(
String table, String query, String freeText, int start, int count, List<String> sort) {
String table,
String query,
String freeText,
int start,
int count,
List<String> sort,
PostgresProperties properties) {
this.table = table;
this.freeText = freeText;
this.start = start;
this.count = count;
this.sort = sort;
this.allowFullTextQueries = properties.getAllowFullTextQueries();
this.allowJsonQueries = properties.getAllowJsonQueries();
this.parseQuery(query);
this.parseFreeText(freeText);
}
Expand Down Expand Up @@ -177,14 +191,14 @@ private void parseQuery(String query) {

private void parseFreeText(String freeText) {
if (!StringUtils.isEmpty(freeText) && !freeText.equals("*")) {
if (freeText.startsWith("{") && freeText.endsWith("}")) {
if (allowJsonQueries && freeText.startsWith("{") && freeText.endsWith("}")) {
Condition cond = new Condition();
cond.setAttribute("json_data");
cond.setOperator("@>");
String[] values = {freeText};
cond.setValues(Arrays.asList(values));
conditions.add(cond);
} else {
} else if (allowFullTextQueries) {
Condition cond = new Condition();
cond.setAttribute("jsonb_to_tsvector('english', json_data, '[\"all\"]')");
cond.setOperator("@@");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@
import org.mockito.InOrder;
import org.mockito.Mockito;

import com.netflix.conductor.postgres.config.PostgresProperties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;

public class PostgresIndexQueryBuilderTest {

private PostgresProperties properties = new PostgresProperties();

@Test
void shouldGenerateQueryForEmptyString() throws SQLException {
String inputQuery = "";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals("SELECT json_data::TEXT FROM table_name LIMIT ? OFFSET ?", generatedQuery);
Query mockQuery = mock(Query.class);
Expand All @@ -46,7 +51,7 @@ void shouldGenerateQueryForNull() throws SQLException {
String inputQuery = null;
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals("SELECT json_data::TEXT FROM table_name LIMIT ? OFFSET ?", generatedQuery);
Query mockQuery = mock(Query.class);
Expand All @@ -62,7 +67,7 @@ void shouldGenerateQueryForWorkflowId() throws SQLException {
String inputQuery = "workflowId=\"abc123\"";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals(
"SELECT json_data::TEXT FROM table_name WHERE workflow_id = ? LIMIT ? OFFSET ?",
Expand All @@ -81,7 +86,7 @@ void shouldGenerateQueryForMultipleInClause() throws SQLException {
String inputQuery = "status IN (COMPLETED,RUNNING)";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals(
"SELECT json_data::TEXT FROM table_name WHERE status = ANY(?) LIMIT ? OFFSET ?",
Expand All @@ -100,7 +105,7 @@ void shouldGenerateQueryForSingleInClause() throws SQLException {
String inputQuery = "status IN (COMPLETED)";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals(
"SELECT json_data::TEXT FROM table_name WHERE status = ? LIMIT ? OFFSET ?",
Expand All @@ -119,7 +124,7 @@ void shouldGenerateQueryForStartTimeGt() throws SQLException {
String inputQuery = "startTime>1675702498000";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals(
"SELECT json_data::TEXT FROM table_name WHERE start_time > ?::TIMESTAMPTZ LIMIT ? OFFSET ?",
Expand All @@ -138,7 +143,7 @@ void shouldGenerateQueryForStartTimeLt() throws SQLException {
String inputQuery = "startTime<1675702498000";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals(
"SELECT json_data::TEXT FROM table_name WHERE start_time < ?::TIMESTAMPTZ LIMIT ? OFFSET ?",
Expand All @@ -157,7 +162,7 @@ void shouldGenerateQueryForUpdateTimeGt() throws SQLException {
String inputQuery = "updateTime>1675702498000";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals(
"SELECT json_data::TEXT FROM table_name WHERE update_time > ?::TIMESTAMPTZ LIMIT ? OFFSET ?",
Expand All @@ -176,7 +181,7 @@ void shouldGenerateQueryForUpdateTimeLt() throws SQLException {
String inputQuery = "updateTime<1675702498000";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals(
"SELECT json_data::TEXT FROM table_name WHERE update_time < ?::TIMESTAMPTZ LIMIT ? OFFSET ?",
Expand All @@ -196,7 +201,7 @@ void shouldGenerateQueryForMultipleConditions() throws SQLException {
"workflowId=\"abc123\" AND workflowType IN (one,two) AND status IN (COMPLETED,RUNNING) AND startTime>1675701498000 AND startTime<1675702498000";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String generatedQuery = builder.getQuery();
assertEquals(
"SELECT json_data::TEXT FROM table_name WHERE start_time < ?::TIMESTAMPTZ AND start_time > ?::TIMESTAMPTZ AND status = ANY(?) AND workflow_id = ? AND workflow_type = ANY(?) LIMIT ? OFFSET ?",
Expand All @@ -220,7 +225,7 @@ void shouldGenerateOrderBy() throws SQLException {
String[] query = {"updateTime:DESC"};
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, Arrays.asList(query));
"table_name", inputQuery, "", 0, 15, Arrays.asList(query), properties);
String expectedQuery =
"SELECT json_data::TEXT FROM table_name WHERE update_time < ?::TIMESTAMPTZ ORDER BY update_time DESC LIMIT ? OFFSET ?";
assertEquals(expectedQuery, builder.getQuery());
Expand All @@ -232,7 +237,7 @@ void shouldGenerateOrderByMultiple() throws SQLException {
String[] query = {"updateTime:DESC", "correlationId:ASC"};
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, Arrays.asList(query));
"table_name", inputQuery, "", 0, 15, Arrays.asList(query), properties);
String expectedQuery =
"SELECT json_data::TEXT FROM table_name WHERE update_time < ?::TIMESTAMPTZ ORDER BY update_time DESC, correlation_id ASC LIMIT ? OFFSET ?";
assertEquals(expectedQuery, builder.getQuery());
Expand All @@ -243,7 +248,7 @@ void shouldNotAllowInvalidColumns() throws SQLException {
String inputQuery = "sqlInjection<1675702498000";
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, new ArrayList<>());
"table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties);
String expectedQuery = "SELECT json_data::TEXT FROM table_name LIMIT ? OFFSET ?";
assertEquals(expectedQuery, builder.getQuery());
}
Expand All @@ -254,7 +259,7 @@ void shouldNotAllowInvalidSortColumn() throws SQLException {
String[] query = {"sqlInjection:DESC"};
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", inputQuery, "", 0, 15, Arrays.asList(query));
"table_name", inputQuery, "", 0, 15, Arrays.asList(query), properties);
String expectedQuery =
"SELECT json_data::TEXT FROM table_name WHERE update_time < ?::TIMESTAMPTZ LIMIT ? OFFSET ?";
assertEquals(expectedQuery, builder.getQuery());
Expand All @@ -266,7 +271,7 @@ void shouldAllowFullTextSearch() throws SQLException {
String[] query = {"sqlInjection:DESC"};
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", "", freeText, 0, 15, Arrays.asList(query));
"table_name", "", freeText, 0, 15, Arrays.asList(query), properties);
String expectedQuery =
"SELECT json_data::TEXT FROM table_name WHERE jsonb_to_tsvector('english', json_data, '[\"all\"]') @@ to_tsquery(?) LIMIT ? OFFSET ?";
assertEquals(expectedQuery, builder.getQuery());
Expand All @@ -278,7 +283,7 @@ void shouldAllowJsonSearch() throws SQLException {
String[] query = {"sqlInjection:DESC"};
PostgresIndexQueryBuilder builder =
new PostgresIndexQueryBuilder(
"table_name", "", freeText, 0, 15, Arrays.asList(query));
"table_name", "", freeText, 0, 15, Arrays.asList(query), properties);
String expectedQuery =
"SELECT json_data::TEXT FROM table_name WHERE json_data @> ?::JSONB LIMIT ? OFFSET ?";
assertEquals(expectedQuery, builder.getQuery());
Expand Down

0 comments on commit fd675a1

Please sign in to comment.