From ed8bbb6be677a0500fd1993c0393a7c0c2362a73 Mon Sep 17 00:00:00 2001 From: Shashank Reddy Boyapally Date: Wed, 2 Oct 2024 13:02:10 -0500 Subject: [PATCH] updated types Signed-off-by: Shashank Reddy Boyapally --- fmatch/splunk_matcher.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/fmatch/splunk_matcher.py b/fmatch/splunk_matcher.py index 3e70514..fb61d96 100644 --- a/fmatch/splunk_matcher.py +++ b/fmatch/splunk_matcher.py @@ -1,21 +1,26 @@ -#pylint: disable = C0209, R0913, E0401 +# pylint: disable = C0209, R0913, E0401 """ Matcher for splunk datasource """ import orjson from splunklib import client, results +from typing import Dict, Any class SplunkMatcher: - """Splunk data source matcher - """ - def __init__(self, host, port, username, password, indice): #pylint: disable = R0917 + """Splunk data source matcher""" + + def __init__( + self, host: str, port: int, username: str, password: str, indice: str + ): # pylint: disable = R0917 self.indice = indice self.service = client.connect( host=host, port=port, username=username, password=password ) - async def query(self, query, searchList="", max_results=10000): + async def query( + self, query: Dict[Any, Any], searchList: str = "", max_results: int = 10000 + ): """ Query data from splunk server using splunk lib sdk @@ -33,7 +38,7 @@ async def query(self, query, searchList="", max_results=10000): ) try: oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query) - except Exception as e: #pylint: disable = W0718 + except Exception as e: # pylint: disable = W0718 print("Error querying splunk: {}".format(e)) return None @@ -43,7 +48,7 @@ async def query(self, query, searchList="", max_results=10000): try: res_array.append( { - "data": orjson.loads(record["_raw"]), #pylint: disable = E1101 + "data": orjson.loads(record["_raw"]), # pylint: disable = E1101 "host": record["host"], "source": record["source"], "sourcetype": record["sourcetype"], @@ -52,12 +57,12 @@ async def query(self, query, searchList="", max_results=10000): "timestamp": record["_indextime"], } ) - except Exception as e: #pylint: disable = W0718 + except Exception as e: # pylint: disable = W0718 print(f"Error on including Splunk record query in results array: {e}") return res_array - async def _stream_results(self, oneshotsearch_results): + async def _stream_results(self, oneshotsearch_results: Any) -> Any: for record in results.JSONResultsReader(oneshotsearch_results): yield record