From 3cd4a347dd3aadea276e4853fe2a6c9cd25ed11c Mon Sep 17 00:00:00 2001 From: Emilio Date: Mon, 27 Jan 2025 13:00:21 -0500 Subject: [PATCH] GEOMESA-3439 Converters - fix thread-safety of md5 function (#3265) --- build/copyright/ccri-2025.txt | 5 ++ .../transforms/IdFunctionFactory.scala | 6 +- .../transforms/IdFunctionFactoryTest.scala | 62 +++++++++++++++++++ pom.xml | 1 + 4 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 build/copyright/ccri-2025.txt create mode 100644 geomesa-convert/geomesa-convert-common/src/test/scala/org/locationtech/geomesa/convert2/transforms/IdFunctionFactoryTest.scala diff --git a/build/copyright/ccri-2025.txt b/build/copyright/ccri-2025.txt new file mode 100644 index 000000000000..2f728dff9b16 --- /dev/null +++ b/build/copyright/ccri-2025.txt @@ -0,0 +1,5 @@ +Copyright (c) ${project.inceptionYear}-2025 ${owner} +All rights reserved. This program and the accompanying materials +are made available under the terms of the Apache License, Version 2.0 +which accompanies this distribution and is available at +http://www.opensource.org/licenses/apache2.0.php. \ No newline at end of file diff --git a/geomesa-convert/geomesa-convert-common/src/main/scala/org/locationtech/geomesa/convert2/transforms/IdFunctionFactory.scala b/geomesa-convert/geomesa-convert-common/src/main/scala/org/locationtech/geomesa/convert2/transforms/IdFunctionFactory.scala index 03ebc415bca2..14a5c400f5b0 100644 --- a/geomesa-convert/geomesa-convert-common/src/main/scala/org/locationtech/geomesa/convert2/transforms/IdFunctionFactory.scala +++ b/geomesa-convert/geomesa-convert-common/src/main/scala/org/locationtech/geomesa/convert2/transforms/IdFunctionFactory.scala @@ -61,14 +61,16 @@ class IdFunctionFactory extends TransformerFunctionFactory with LazyLogging { } private val md5: TransformerFunction = new NamedTransformerFunction(Seq("md5"), pure = true) { - private val hasher = MessageDigest.getInstance("MD5") + private val hashers = new ThreadLocal[MessageDigest]() { + override def initialValue(): MessageDigest = MessageDigest.getInstance("MD5") + } override def apply(args: Array[AnyRef]): AnyRef = { val bytes = args(0) match { case s: String => s.getBytes(StandardCharsets.UTF_8) case b: Array[Byte] => b case a => throw new IllegalArgumentException(s"Expected String or byte[] but got: $a") } - ByteArrays.toHex(hasher.digest(bytes)) + ByteArrays.toHex(hashers.get.digest(bytes)) } } diff --git a/geomesa-convert/geomesa-convert-common/src/test/scala/org/locationtech/geomesa/convert2/transforms/IdFunctionFactoryTest.scala b/geomesa-convert/geomesa-convert-common/src/test/scala/org/locationtech/geomesa/convert2/transforms/IdFunctionFactoryTest.scala new file mode 100644 index 000000000000..b4d1b722f3dc --- /dev/null +++ b/geomesa-convert/geomesa-convert-common/src/test/scala/org/locationtech/geomesa/convert2/transforms/IdFunctionFactoryTest.scala @@ -0,0 +1,62 @@ +/*********************************************************************** + * Copyright (c) 2013-2025 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.convert2.transforms + +import org.locationtech.geomesa.utils.concurrent.CachedThreadPool +import org.specs2.matcher.MatchResult +import org.specs2.mutable.Specification + +import java.nio.charset.StandardCharsets +import java.util.Collections +import java.util.concurrent.{ConcurrentHashMap, CopyOnWriteArrayList, TimeUnit} +import scala.util.Random + +class IdFunctionFactoryTest extends Specification { + + "IdFunctionFactoryTest" should { + "generate hashes in a thread-safe way" in { + testHash("md5") + testHash("murmurHash3") + testHash("murmur3_32") + testHash("murmur3_64") + } + } + + def testHash(alg: String): MatchResult[_] = { + val exp = Expression(s"$alg($$0)") + val results = Array.fill(3)(Collections.newSetFromMap(new ConcurrentHashMap[AnyRef, java.lang.Boolean]())) + val exceptions = new CopyOnWriteArrayList[Throwable]() + val runnables = Array("foo", "bar", "blubaz").map(_.getBytes(StandardCharsets.UTF_8)).zipWithIndex.map { case (input, i) => + new Runnable() { + override def run(): Unit = { + try { results(i).add(exp.apply(Array(input))) } catch { + case e: Throwable => exceptions.add(e) + } + } + } + } + // baseline results + runnables.foreach(_.run()) + + val r = new Random(-1) + val pool = new CachedThreadPool(10) + try { + var i = 0 + while (i < 1000) { + pool.submit(runnables(r.nextInt(3))) + i += 1 + } + } finally { + pool.shutdown() + } + pool.awaitTermination(1, TimeUnit.SECONDS) + foreach(results)(_ must haveSize(1)) + exceptions must haveSize(0) + } +} diff --git a/pom.xml b/pom.xml index eca3f0353802..2cd8d59263d1 100644 --- a/pom.xml +++ b/pom.xml @@ -3143,6 +3143,7 @@
build/copyright/ccri.txt
+ build/copyright/ccri-2025.txt build/copyright/ccri-azavea.txt build/copyright/ccri-dstl.txt build/copyright/ccri-dstl-mitre.txt