From 8750f3140ff239e989204acaa77c40e64f41bc59 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 28 Jan 2025 03:49:16 +0000 Subject: [PATCH] Force create last run context in monitor workflow metadata when workflow is re-enabled (#1778) * Force create last run context in monitor worflow metadata when workflow is re-enabled Signed-off-by: Nishtha Mehrotra * Force creation of re-enabled doc level monitors Signed-off-by: Nishtha Mehrotra * Updated IT Signed-off-by: Nishtha Mehrotra --------- Signed-off-by: Nishtha Mehrotra Co-authored-by: Nishtha Mehrotra (cherry picked from commit e2e01645744a31cc724e290663d17618c279d683) Signed-off-by: github-actions[bot] --- .../alerting/MonitorMetadataService.kt | 22 ++- .../transport/TransportIndexMonitorAction.kt | 15 +- .../transport/TransportIndexWorkflowAction.kt | 9 +- .../alerting/MonitorDataSourcesIT.kt | 182 ++++++++++++++++++ 4 files changed, 224 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index dabd3069d..dd5ddbda8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -139,11 +139,15 @@ object MonitorMetadataService : monitor: Monitor, createWithRunContext: Boolean = true, skipIndex: Boolean = false, - workflowMetadataId: String? = null + workflowMetadataId: String? = null, + forceCreateLastRunContext: Boolean = false ): Pair { try { val created = true - val metadata = getMetadata(monitor, workflowMetadataId) + var metadata = getMetadata(monitor, workflowMetadataId) + if (forceCreateLastRunContext) { + metadata = metadata?.copy(lastRunContext = createUpdatedRunContext(monitor)) + } return if (metadata != null) { metadata to !created } else { @@ -159,6 +163,20 @@ object MonitorMetadataService : } } + private suspend fun createUpdatedRunContext( + monitor: Monitor + ): Map> { + val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value) + (monitor.inputs[0] as DocLevelMonitorInput).indices[0] + else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) + (monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0] + else null + val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) + createFullRunContext(monitorIndex) + else emptyMap() + return runContext + } + suspend fun getMetadata(monitor: Monitor, workflowMetadataId: String? = null): MonitorMetadata? { try { val metadataId = MonitorMetadata.getId(monitor, workflowMetadataId) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index a29ac1a9d..b1e84046c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -709,8 +709,21 @@ class TransportIndexMonitorAction @Inject constructor( ) return } + var isDocLevelMonitorRestarted = false + // Force re-creation of last run context if monitor is of type standard doc-level/threat-intel + // And monitor is re-enabled + if (request.monitor.enabled && !currentMonitor.enabled && + request.monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value) + ) { + isDocLevelMonitorRestarted = true + } + var updatedMetadata: MonitorMetadata - val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor) + val (metadata, created) = MonitorMetadataService.getOrCreateMetadata( + request.monitor, + forceCreateLastRunContext = isDocLevelMonitorRestarted + ) + // Recreate runContext if metadata exists // Delete and insert all queries from/to queryIndex diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index 0538aa207..3bdc1588e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -551,10 +551,17 @@ class TransportIndexWorkflowAction @Inject constructor( val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) for (monitor in monitors) { + var isWorkflowRestarted = false + + if (request.workflow.enabled && !currentWorkflow.enabled) { + isWorkflowRestarted = true + } + val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata( monitor = monitor, createWithRunContext = true, - workflowMetadataId = workflowMetadata.id + workflowMetadataId = workflowMetadata.id, + forceCreateLastRunContext = isWorkflowRestarted ) if (!created && Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 1ba9b55e9..c5981403a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -6351,4 +6351,186 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } } } + + fun `test execute workflow when monitor is disabled and re-enabled`() { + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val index1 = "index_123" + createIndex(index1, Settings.EMPTY) + val q1 = DocLevelQuery(query = "properties:\"abcd\"", name = "1", fields = listOf()) + + val docLevelInput = DocLevelMonitorInput( + "description", + listOf(index1), + listOf(q1) + ) + + val customQueryIndex = "custom_alerts_index" + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex + ) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflowRequest)!! + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + // Verify that monitor workflow metadata exists + assertNotNull(searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")) + + val testDoc1 = """{ + "properties": "abcd" + }""" + indexDoc(index1, "1", testDoc1) + indexDoc(index1, "2", testDoc1) + indexDoc(index1, "3", testDoc1) + + // Run workflow + var executeWorkflowResponse = executeWorkflow(workflowRequest, workflowId, false) + Assert.assertNotNull(executeWorkflowResponse) + var findings = searchFindings(monitorResponse.id) + assertEquals(3, findings.size) + + // Verify that monitor workflow metadata is updated with lastRunContext + var monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata") + val lastRunContextBeforeDisable = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map) + assertEquals(2, lastRunContextBeforeDisable?.get("0")) + + // Disable workflow + val disabledWorkflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id), + id = workflowId, + enabled = false + ) + upsertWorkflow(disabledWorkflowRequest, method = RestRequest.Method.PUT, id = workflowId) + + // Index doc. Since workflow is disabled, monitor workflow metadata shouldn't be updated + indexDoc(index1, "4", testDoc1) + + // re-enable workflow + val enabledWorkflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id), + id = workflowId, + enabled = true + ) + upsertWorkflow(enabledWorkflowRequest, method = RestRequest.Method.PUT, id = workflowId) + + // Assert no new findings generated after workflow is re-enabled + executeWorkflowResponse = executeWorkflow(workflowRequest, workflowId, false) + Assert.assertNotNull(executeWorkflowResponse) + findings = searchFindings(monitorResponse.id) + assertEquals(3, findings.size) + + // Verify that monitor workflow metadata exists + // Since workflow is re-enabled, last run context should be updated with latest sequence number + monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata") + assertNotNull(monitorWokflowMetadata) + val lastRunContext = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map) + assertEquals(3, lastRunContext?.get("0")) + } + + fun `test doc level monitor when it is disabled and re-enabled`() { + // Setup doc level monitor + val docQuery = DocLevelQuery(query = "eventType:\"login\"", name = "3", fields = listOf()) + + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + + val testDoc = """{ + "eventType" : "login" + }""" + indexDoc(index, "1", testDoc) + + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + + // Execute monitor + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertNotNull(executeMonitorResponse) + + // Assert findings generated and last run context in monitor metadata is updated + var findings = searchFindings(id, customFindingsIndex) + assertEquals(1, findings.size) + + var monitorMetadata = searchMonitorMetadata("${monitorResponse.id}-metadata") + val lastRunContextBeforeDisable = (monitorMetadata?.lastRunContext?.get(index) as? Map) + assertEquals(0, lastRunContextBeforeDisable?.get("0")) + + // Disable monitor + var updateMonitorResponse = updateMonitor( + monitor.copy( + id = monitorResponse.id, + dataSources = DataSources( + queryIndex = customQueryIndex, + ), + enabled = false, + enabledTime = null + ), + monitorResponse.id + ) + Assert.assertNotNull(updateMonitorResponse) + + // Index doc. Since monitor is disabled, monitor workflow metadata shouldn't be updated + indexDoc(index, "2", testDoc) + indexDoc(index, "3", testDoc) + indexDoc(index, "4", testDoc) + + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertNotNull(executeMonitorResponse) + + // Assert no new findings since monitor was disabled + findings = searchFindings(id, customFindingsIndex) + assertEquals(1, findings.size) + + // re-enable monitor + updateMonitorResponse = updateMonitor( + monitor.copy( + id = monitorResponse.id, + dataSources = DataSources( + queryIndex = customQueryIndex, + ), + enabled = true, + enabledTime = Instant.now().truncatedTo(ChronoUnit.MILLIS) + ), + monitorResponse.id + ) + Assert.assertNotNull(updateMonitorResponse) + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertNotNull(executeMonitorResponse) + + // Assert no new findings since monitor didnt run + findings = searchFindings(id, customFindingsIndex) + assertEquals(1, findings.size) + // Assert last run context in monitor metadata updated on enabling it, with no new findings generated + monitorMetadata = searchMonitorMetadata("${monitorResponse.id}-metadata") + val lastRunContextAfterEnable = (monitorMetadata?.lastRunContext?.get(index) as? Map) + assertEquals(3, lastRunContextAfterEnable?.get("0")) + } }