Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.17] Force create last run context in monitor workflow metadata when workflow is re-enabled #1782

Open
wants to merge 1 commit into
base: 2.17
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,15 @@ object MonitorMetadataService :
monitor: Monitor,
createWithRunContext: Boolean = true,
skipIndex: Boolean = false,
workflowMetadataId: String? = null
workflowMetadataId: String? = null,
forceCreateLastRunContext: Boolean = false
): Pair<MonitorMetadata, Boolean> {
try {
val created = true
val metadata = getMetadata(monitor, workflowMetadataId)
var metadata = getMetadata(monitor, workflowMetadataId)
if (forceCreateLastRunContext) {
metadata = metadata?.copy(lastRunContext = createUpdatedRunContext(monitor))
}
return if (metadata != null) {
metadata to !created
} else {
Expand All @@ -159,6 +163,20 @@ object MonitorMetadataService :
}
}

private suspend fun createUpdatedRunContext(
monitor: Monitor
): Map<String, MutableMap<String, Any>> {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
else null
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex)
else emptyMap()
return runContext
}

suspend fun getMetadata(monitor: Monitor, workflowMetadataId: String? = null): MonitorMetadata? {
try {
val metadataId = MonitorMetadata.getId(monitor, workflowMetadataId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,21 @@ class TransportIndexMonitorAction @Inject constructor(
)
return
}
var isDocLevelMonitorRestarted = false
// Force re-creation of last run context if monitor is of type standard doc-level/threat-intel
// And monitor is re-enabled
if (request.monitor.enabled && !currentMonitor.enabled &&
request.monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
) {
isDocLevelMonitorRestarted = true
}

var updatedMetadata: MonitorMetadata
val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(
request.monitor,
forceCreateLastRunContext = isDocLevelMonitorRestarted
)

// Recreate runContext if metadata exists
// Delete and insert all queries from/to queryIndex

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,17 @@ class TransportIndexWorkflowAction @Inject constructor(
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)

for (monitor in monitors) {
var isWorkflowRestarted = false

if (request.workflow.enabled && !currentWorkflow.enabled) {
isWorkflowRestarted = true
}

val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = true,
workflowMetadataId = workflowMetadata.id
workflowMetadataId = workflowMetadata.id,
forceCreateLastRunContext = isWorkflowRestarted
)
if (!created &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6351,4 +6351,186 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}
}
}

fun `test execute workflow when monitor is disabled and re-enabled`() {
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)

val index1 = "index_123"
createIndex(index1, Settings.EMPTY)
val q1 = DocLevelQuery(query = "properties:\"abcd\"", name = "1", fields = listOf())

val docLevelInput = DocLevelMonitorInput(
"description",
listOf(index1),
listOf(q1)
)

val customQueryIndex = "custom_alerts_index"

val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex
)
)

val monitorResponse = createMonitor(monitor)!!

val workflowRequest = randomWorkflow(
monitorIds = listOf(monitorResponse.id)
)
val workflowResponse = upsertWorkflow(workflowRequest)!!
val workflowId = workflowResponse.id
val getWorkflowResponse = getWorkflowById(id = workflowResponse.id)

assertNotNull(getWorkflowResponse)
assertEquals(workflowId, getWorkflowResponse.id)

// Verify that monitor workflow metadata exists
assertNotNull(searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata"))

val testDoc1 = """{
"properties": "abcd"
}"""
indexDoc(index1, "1", testDoc1)
indexDoc(index1, "2", testDoc1)
indexDoc(index1, "3", testDoc1)

// Run workflow
var executeWorkflowResponse = executeWorkflow(workflowRequest, workflowId, false)
Assert.assertNotNull(executeWorkflowResponse)
var findings = searchFindings(monitorResponse.id)
assertEquals(3, findings.size)

// Verify that monitor workflow metadata is updated with lastRunContext
var monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")
val lastRunContextBeforeDisable = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map<String, Any>)
assertEquals(2, lastRunContextBeforeDisable?.get("0"))

// Disable workflow
val disabledWorkflowRequest = randomWorkflow(
monitorIds = listOf(monitorResponse.id),
id = workflowId,
enabled = false
)
upsertWorkflow(disabledWorkflowRequest, method = RestRequest.Method.PUT, id = workflowId)

// Index doc. Since workflow is disabled, monitor workflow metadata shouldn't be updated
indexDoc(index1, "4", testDoc1)

// re-enable workflow
val enabledWorkflowRequest = randomWorkflow(
monitorIds = listOf(monitorResponse.id),
id = workflowId,
enabled = true
)
upsertWorkflow(enabledWorkflowRequest, method = RestRequest.Method.PUT, id = workflowId)

// Assert no new findings generated after workflow is re-enabled
executeWorkflowResponse = executeWorkflow(workflowRequest, workflowId, false)
Assert.assertNotNull(executeWorkflowResponse)
findings = searchFindings(monitorResponse.id)
assertEquals(3, findings.size)

// Verify that monitor workflow metadata exists
// Since workflow is re-enabled, last run context should be updated with latest sequence number
monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")
assertNotNull(monitorWokflowMetadata)
val lastRunContext = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map<String, Any>)
assertEquals(3, lastRunContext?.get("0"))
}

fun `test doc level monitor when it is disabled and re-enabled`() {
// Setup doc level monitor
val docQuery = DocLevelQuery(query = "eventType:\"login\"", name = "3", fields = listOf())

val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery)
)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(),
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)
assertFalse(monitorResponse?.id.isNullOrEmpty())

val testDoc = """{
"eventType" : "login"
}"""
indexDoc(index, "1", testDoc)

monitor = monitorResponse!!.monitor
val id = monitorResponse.id

// Execute monitor
var executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertNotNull(executeMonitorResponse)

// Assert findings generated and last run context in monitor metadata is updated
var findings = searchFindings(id, customFindingsIndex)
assertEquals(1, findings.size)

var monitorMetadata = searchMonitorMetadata("${monitorResponse.id}-metadata")
val lastRunContextBeforeDisable = (monitorMetadata?.lastRunContext?.get(index) as? Map<String, Any>)
assertEquals(0, lastRunContextBeforeDisable?.get("0"))

// Disable monitor
var updateMonitorResponse = updateMonitor(
monitor.copy(
id = monitorResponse.id,
dataSources = DataSources(
queryIndex = customQueryIndex,
),
enabled = false,
enabledTime = null
),
monitorResponse.id
)
Assert.assertNotNull(updateMonitorResponse)

// Index doc. Since monitor is disabled, monitor workflow metadata shouldn't be updated
indexDoc(index, "2", testDoc)
indexDoc(index, "3", testDoc)
indexDoc(index, "4", testDoc)

executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertNotNull(executeMonitorResponse)

// Assert no new findings since monitor was disabled
findings = searchFindings(id, customFindingsIndex)
assertEquals(1, findings.size)

// re-enable monitor
updateMonitorResponse = updateMonitor(
monitor.copy(
id = monitorResponse.id,
dataSources = DataSources(
queryIndex = customQueryIndex,
),
enabled = true,
enabledTime = Instant.now().truncatedTo(ChronoUnit.MILLIS)
),
monitorResponse.id
)
Assert.assertNotNull(updateMonitorResponse)
executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertNotNull(executeMonitorResponse)

// Assert no new findings since monitor didnt run
findings = searchFindings(id, customFindingsIndex)
assertEquals(1, findings.size)
// Assert last run context in monitor metadata updated on enabling it, with no new findings generated
monitorMetadata = searchMonitorMetadata("${monitorResponse.id}-metadata")
val lastRunContextAfterEnable = (monitorMetadata?.lastRunContext?.get(index) as? Map<String, Any>)
assertEquals(3, lastRunContextAfterEnable?.get("0"))
}
}
Loading