From fd675a1d3ff07fd161940198da53a56efd176182 Mon Sep 17 00:00:00 2001 From: Ben Pirt Date: Sun, 3 Mar 2024 23:05:16 +0000 Subject: [PATCH] Enable finer-grained control over the postgres indexing and searching (#93) * Enable the searching of the JSON field in Postgres to be configured * Enable the indexing of tasks to be enabled / disabled --- .../core/config/ConductorProperties.java | 11 ++++++ .../core/dal/ExecutionDAOFacade.java | 2 +- .../config/PostgresConfiguration.java | 5 ++- .../postgres/config/PostgresProperties.java | 20 ++++++++++ .../postgres/dao/PostgresIndexDAO.java | 14 +++++-- .../util/PostgresIndexQueryBuilder.java | 20 ++++++++-- .../util/PostgresIndexQueryBuilderTest.java | 37 +++++++++++-------- 7 files changed, 84 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index 80b127372..b1bfb4adb 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -81,6 +81,9 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration taskExecutionPostponeDuration = Duration.ofSeconds(60); + /** Used to enable/disable the indexing of tasks. */ + private boolean taskIndexingEnabled = true; + /** Used to enable/disable the indexing of task execution logs. */ private boolean taskExecLogIndexingEnabled = true; @@ -333,6 +336,14 @@ public void setTaskExecLogIndexingEnabled(boolean taskExecLogIndexingEnabled) { this.taskExecLogIndexingEnabled = taskExecLogIndexingEnabled; } + public boolean isTaskIndexingEnabled() { + return taskIndexingEnabled; + } + + public void setTaskIndexingEnabled(boolean taskIndexingEnabled) { + this.taskIndexingEnabled = taskIndexingEnabled; + } + public boolean isAsyncIndexingEnabled() { return asyncIndexingEnabled; } diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index da843222d..3180dfc5b 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -513,7 +513,7 @@ public void updateTask(TaskModel taskModel) { * of tasks on a system failure. So only index for each update if async indexing is not enabled. * If it *is* enabled, tasks will be indexed only when a workflow is in terminal state. */ - if (!properties.isAsyncIndexingEnabled()) { + if (!properties.isAsyncIndexingEnabled() && properties.isTaskIndexingEnabled()) { indexDAO.indexTask(new TaskSummary(taskModel.toTask())); } } catch (TerminateWorkflowException e) { diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java index 0309df43f..ecb62941e 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java @@ -98,8 +98,9 @@ public PostgresQueueDAO postgresQueueDAO( @ConditionalOnProperty(name = "conductor.indexing.type", havingValue = "postgres") public PostgresIndexDAO postgresIndexDAO( @Qualifier("postgresRetryTemplate") RetryTemplate retryTemplate, - ObjectMapper objectMapper) { - return new PostgresIndexDAO(retryTemplate, objectMapper, dataSource); + ObjectMapper objectMapper, + PostgresProperties properties) { + return new PostgresIndexDAO(retryTemplate, objectMapper, dataSource, properties); } @Bean diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java index 5c392cb52..daae7766c 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java @@ -29,6 +29,10 @@ public class PostgresProperties { public String schema = "public"; + public boolean allowFullTextQueries = true; + + public boolean allowJsonQueries = true; + public Duration getTaskDefCacheRefreshInterval() { return taskDefCacheRefreshInterval; } @@ -52,4 +56,20 @@ public String getSchema() { public void setSchema(String schema) { this.schema = schema; } + + public boolean getAllowFullTextQueries() { + return allowFullTextQueries; + } + + public void setAllowFullTextQueries(boolean allowFullTextQueries) { + this.allowFullTextQueries = allowFullTextQueries; + } + + public boolean getAllowJsonQueries() { + return allowJsonQueries; + } + + public void setAllowJsonQueries(boolean allowJsonQueries) { + this.allowJsonQueries = allowJsonQueries; + } } diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java index cbd36da28..a169eb152 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java @@ -30,15 +30,22 @@ import com.netflix.conductor.common.run.WorkflowSummary; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.dao.IndexDAO; +import com.netflix.conductor.postgres.config.PostgresProperties; import com.netflix.conductor.postgres.util.PostgresIndexQueryBuilder; import com.fasterxml.jackson.databind.ObjectMapper; public class PostgresIndexDAO extends PostgresBaseDAO implements IndexDAO { + private final PostgresProperties properties; + public PostgresIndexDAO( - RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) { + RetryTemplate retryTemplate, + ObjectMapper objectMapper, + DataSource dataSource, + PostgresProperties properties) { super(retryTemplate, objectMapper, dataSource); + this.properties = properties; } @Override @@ -69,7 +76,7 @@ public SearchResult searchWorkflowSummary( String query, String freeText, int start, int count, List sort) { PostgresIndexQueryBuilder queryBuilder = new PostgresIndexQueryBuilder( - "workflow_index", query, freeText, start, count, sort); + "workflow_index", query, freeText, start, count, sort, properties); List results = queryWithTransaction( @@ -117,7 +124,8 @@ public void indexTask(TaskSummary task) { public SearchResult searchTaskSummary( String query, String freeText, int start, int count, List sort) { PostgresIndexQueryBuilder queryBuilder = - new PostgresIndexQueryBuilder("task_index", query, freeText, start, count, sort); + new PostgresIndexQueryBuilder( + "task_index", query, freeText, start, count, sort, properties); List results = queryWithTransaction( diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/PostgresIndexQueryBuilder.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/PostgresIndexQueryBuilder.java index d73ef943f..706716a8f 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/PostgresIndexQueryBuilder.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/PostgresIndexQueryBuilder.java @@ -24,6 +24,8 @@ import org.apache.commons.lang3.StringUtils; +import com.netflix.conductor.postgres.config.PostgresProperties; + public class PostgresIndexQueryBuilder { private final String table; @@ -33,6 +35,10 @@ public class PostgresIndexQueryBuilder { private final List sort; private final List conditions = new ArrayList<>(); + private boolean allowJsonQueries; + + private boolean allowFullTextQueries; + private static final String[] VALID_FIELDS = { "workflow_id", "correlation_id", @@ -128,12 +134,20 @@ public void setValues(List values) { } public PostgresIndexQueryBuilder( - String table, String query, String freeText, int start, int count, List sort) { + String table, + String query, + String freeText, + int start, + int count, + List sort, + PostgresProperties properties) { this.table = table; this.freeText = freeText; this.start = start; this.count = count; this.sort = sort; + this.allowFullTextQueries = properties.getAllowFullTextQueries(); + this.allowJsonQueries = properties.getAllowJsonQueries(); this.parseQuery(query); this.parseFreeText(freeText); } @@ -177,14 +191,14 @@ private void parseQuery(String query) { private void parseFreeText(String freeText) { if (!StringUtils.isEmpty(freeText) && !freeText.equals("*")) { - if (freeText.startsWith("{") && freeText.endsWith("}")) { + if (allowJsonQueries && freeText.startsWith("{") && freeText.endsWith("}")) { Condition cond = new Condition(); cond.setAttribute("json_data"); cond.setOperator("@>"); String[] values = {freeText}; cond.setValues(Arrays.asList(values)); conditions.add(cond); - } else { + } else if (allowFullTextQueries) { Condition cond = new Condition(); cond.setAttribute("jsonb_to_tsvector('english', json_data, '[\"all\"]')"); cond.setOperator("@@"); diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresIndexQueryBuilderTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresIndexQueryBuilderTest.java index 3ce48add9..2a1f8a394 100644 --- a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresIndexQueryBuilderTest.java +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresIndexQueryBuilderTest.java @@ -21,16 +21,21 @@ import org.mockito.InOrder; import org.mockito.Mockito; +import com.netflix.conductor.postgres.config.PostgresProperties; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; public class PostgresIndexQueryBuilderTest { + + private PostgresProperties properties = new PostgresProperties(); + @Test void shouldGenerateQueryForEmptyString() throws SQLException { String inputQuery = ""; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals("SELECT json_data::TEXT FROM table_name LIMIT ? OFFSET ?", generatedQuery); Query mockQuery = mock(Query.class); @@ -46,7 +51,7 @@ void shouldGenerateQueryForNull() throws SQLException { String inputQuery = null; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals("SELECT json_data::TEXT FROM table_name LIMIT ? OFFSET ?", generatedQuery); Query mockQuery = mock(Query.class); @@ -62,7 +67,7 @@ void shouldGenerateQueryForWorkflowId() throws SQLException { String inputQuery = "workflowId=\"abc123\""; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals( "SELECT json_data::TEXT FROM table_name WHERE workflow_id = ? LIMIT ? OFFSET ?", @@ -81,7 +86,7 @@ void shouldGenerateQueryForMultipleInClause() throws SQLException { String inputQuery = "status IN (COMPLETED,RUNNING)"; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals( "SELECT json_data::TEXT FROM table_name WHERE status = ANY(?) LIMIT ? OFFSET ?", @@ -100,7 +105,7 @@ void shouldGenerateQueryForSingleInClause() throws SQLException { String inputQuery = "status IN (COMPLETED)"; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals( "SELECT json_data::TEXT FROM table_name WHERE status = ? LIMIT ? OFFSET ?", @@ -119,7 +124,7 @@ void shouldGenerateQueryForStartTimeGt() throws SQLException { String inputQuery = "startTime>1675702498000"; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals( "SELECT json_data::TEXT FROM table_name WHERE start_time > ?::TIMESTAMPTZ LIMIT ? OFFSET ?", @@ -138,7 +143,7 @@ void shouldGenerateQueryForStartTimeLt() throws SQLException { String inputQuery = "startTime<1675702498000"; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals( "SELECT json_data::TEXT FROM table_name WHERE start_time < ?::TIMESTAMPTZ LIMIT ? OFFSET ?", @@ -157,7 +162,7 @@ void shouldGenerateQueryForUpdateTimeGt() throws SQLException { String inputQuery = "updateTime>1675702498000"; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals( "SELECT json_data::TEXT FROM table_name WHERE update_time > ?::TIMESTAMPTZ LIMIT ? OFFSET ?", @@ -176,7 +181,7 @@ void shouldGenerateQueryForUpdateTimeLt() throws SQLException { String inputQuery = "updateTime<1675702498000"; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals( "SELECT json_data::TEXT FROM table_name WHERE update_time < ?::TIMESTAMPTZ LIMIT ? OFFSET ?", @@ -196,7 +201,7 @@ void shouldGenerateQueryForMultipleConditions() throws SQLException { "workflowId=\"abc123\" AND workflowType IN (one,two) AND status IN (COMPLETED,RUNNING) AND startTime>1675701498000 AND startTime<1675702498000"; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String generatedQuery = builder.getQuery(); assertEquals( "SELECT json_data::TEXT FROM table_name WHERE start_time < ?::TIMESTAMPTZ AND start_time > ?::TIMESTAMPTZ AND status = ANY(?) AND workflow_id = ? AND workflow_type = ANY(?) LIMIT ? OFFSET ?", @@ -220,7 +225,7 @@ void shouldGenerateOrderBy() throws SQLException { String[] query = {"updateTime:DESC"}; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, Arrays.asList(query)); + "table_name", inputQuery, "", 0, 15, Arrays.asList(query), properties); String expectedQuery = "SELECT json_data::TEXT FROM table_name WHERE update_time < ?::TIMESTAMPTZ ORDER BY update_time DESC LIMIT ? OFFSET ?"; assertEquals(expectedQuery, builder.getQuery()); @@ -232,7 +237,7 @@ void shouldGenerateOrderByMultiple() throws SQLException { String[] query = {"updateTime:DESC", "correlationId:ASC"}; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, Arrays.asList(query)); + "table_name", inputQuery, "", 0, 15, Arrays.asList(query), properties); String expectedQuery = "SELECT json_data::TEXT FROM table_name WHERE update_time < ?::TIMESTAMPTZ ORDER BY update_time DESC, correlation_id ASC LIMIT ? OFFSET ?"; assertEquals(expectedQuery, builder.getQuery()); @@ -243,7 +248,7 @@ void shouldNotAllowInvalidColumns() throws SQLException { String inputQuery = "sqlInjection<1675702498000"; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, new ArrayList<>()); + "table_name", inputQuery, "", 0, 15, new ArrayList<>(), properties); String expectedQuery = "SELECT json_data::TEXT FROM table_name LIMIT ? OFFSET ?"; assertEquals(expectedQuery, builder.getQuery()); } @@ -254,7 +259,7 @@ void shouldNotAllowInvalidSortColumn() throws SQLException { String[] query = {"sqlInjection:DESC"}; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", inputQuery, "", 0, 15, Arrays.asList(query)); + "table_name", inputQuery, "", 0, 15, Arrays.asList(query), properties); String expectedQuery = "SELECT json_data::TEXT FROM table_name WHERE update_time < ?::TIMESTAMPTZ LIMIT ? OFFSET ?"; assertEquals(expectedQuery, builder.getQuery()); @@ -266,7 +271,7 @@ void shouldAllowFullTextSearch() throws SQLException { String[] query = {"sqlInjection:DESC"}; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", "", freeText, 0, 15, Arrays.asList(query)); + "table_name", "", freeText, 0, 15, Arrays.asList(query), properties); String expectedQuery = "SELECT json_data::TEXT FROM table_name WHERE jsonb_to_tsvector('english', json_data, '[\"all\"]') @@ to_tsquery(?) LIMIT ? OFFSET ?"; assertEquals(expectedQuery, builder.getQuery()); @@ -278,7 +283,7 @@ void shouldAllowJsonSearch() throws SQLException { String[] query = {"sqlInjection:DESC"}; PostgresIndexQueryBuilder builder = new PostgresIndexQueryBuilder( - "table_name", "", freeText, 0, 15, Arrays.asList(query)); + "table_name", "", freeText, 0, 15, Arrays.asList(query), properties); String expectedQuery = "SELECT json_data::TEXT FROM table_name WHERE json_data @> ?::JSONB LIMIT ? OFFSET ?"; assertEquals(expectedQuery, builder.getQuery());