diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala
index be2e0c3a..dfe984d8 100644
--- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala
+++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala
@@ -40,32 +40,16 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
     hudiSchemaParser = new HudiSchemaParser()
 
     // Register Flink metrics for inputEventCount and failedEventCount
-    inputEventCount = getRuntimeContext.getMetricGroup.counter("inputEventCountdata")
-    failedEventCount = getRuntimeContext.getMetricGroup.counter("failedEventCountdata")
+    inputEventCount = getRuntimeContext.getMetricGroup.counter("input-event-count")
+    failedEventCount = getRuntimeContext.getMetricGroup.counter("input-event-count")
   }
 
   override def map(event: MMap[String, AnyRef]): RowData = {
     startTime = System.currentTimeMillis()
 
     try {
-      // Increment the input event count
-      inputEventCount.inc()
-
-      // Optionally, increment by the size of the event (if it's a collection)
       inputEventCount.inc(event.size)
-
-      // Convert the event data into RowData for further processing
       val rowData = convertToRowData(event)
-
-      logger.info("Metric inputEventCount: " + inputEventCount)
-      logger.info("Metric failedEventCount: " + failedEventCount)
-
-      logger.info("Before increment: inputEventCount=" + inputEventCount.getCount)
-      inputEventCount.inc()
-      logger.info("After increment: inputEventCount=" + inputEventCount.getCount)
-
-
-
       rowData
     } catch {
       case ex: Exception =>
@@ -89,8 +73,8 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
   // Custom method to retrieve the metric values (e.g., for exposing them in a monitoring system)
   def getMetrics: Map[String, Long] = {
     Map(
-      "inputEventCountdata" -> inputEventCount.getCount,
-      "failedEventCountdata" -> failedEventCount.getCount
+      "input-event-count" -> inputEventCount.getCount,
+      "failed-event-count" -> failedEventCount.getCount
     )
   }
 }