Skip to content

Commit

Permalink
GEOMESA-3439 Converters - fix thread-safety of md5 function (#3265)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Jan 27, 2025
1 parent 1fec6a4 commit 3cd4a34
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 2 deletions.
5 changes: 5 additions & 0 deletions build/copyright/ccri-2025.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Copyright (c) ${project.inceptionYear}-2025 ${owner}
All rights reserved. This program and the accompanying materials
are made available under the terms of the Apache License, Version 2.0
which accompanies this distribution and is available at
http://www.opensource.org/licenses/apache2.0.php.
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ class IdFunctionFactory extends TransformerFunctionFactory with LazyLogging {
}

private val md5: TransformerFunction = new NamedTransformerFunction(Seq("md5"), pure = true) {
private val hasher = MessageDigest.getInstance("MD5")
private val hashers = new ThreadLocal[MessageDigest]() {
override def initialValue(): MessageDigest = MessageDigest.getInstance("MD5")
}
override def apply(args: Array[AnyRef]): AnyRef = {
val bytes = args(0) match {
case s: String => s.getBytes(StandardCharsets.UTF_8)
case b: Array[Byte] => b
case a => throw new IllegalArgumentException(s"Expected String or byte[] but got: $a")
}
ByteArrays.toHex(hasher.digest(bytes))
ByteArrays.toHex(hashers.get.digest(bytes))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/***********************************************************************
* Copyright (c) 2013-2025 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.convert2.transforms

import org.locationtech.geomesa.utils.concurrent.CachedThreadPool
import org.specs2.matcher.MatchResult
import org.specs2.mutable.Specification

import java.nio.charset.StandardCharsets
import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, CopyOnWriteArrayList, TimeUnit}
import scala.util.Random

class IdFunctionFactoryTest extends Specification {

"IdFunctionFactoryTest" should {
"generate hashes in a thread-safe way" in {
testHash("md5")
testHash("murmurHash3")
testHash("murmur3_32")
testHash("murmur3_64")
}
}

def testHash(alg: String): MatchResult[_] = {
val exp = Expression(s"$alg($$0)")
val results = Array.fill(3)(Collections.newSetFromMap(new ConcurrentHashMap[AnyRef, java.lang.Boolean]()))
val exceptions = new CopyOnWriteArrayList[Throwable]()
val runnables = Array("foo", "bar", "blubaz").map(_.getBytes(StandardCharsets.UTF_8)).zipWithIndex.map { case (input, i) =>
new Runnable() {
override def run(): Unit = {
try { results(i).add(exp.apply(Array(input))) } catch {
case e: Throwable => exceptions.add(e)
}
}
}
}
// baseline results
runnables.foreach(_.run())

val r = new Random(-1)
val pool = new CachedThreadPool(10)
try {
var i = 0
while (i < 1000) {
pool.submit(runnables(r.nextInt(3)))
i += 1
}
} finally {
pool.shutdown()
}
pool.awaitTermination(1, TimeUnit.SECONDS)
foreach(results)(_ must haveSize(1))
exceptions must haveSize(0)
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3143,6 +3143,7 @@
<configuration>
<header>build/copyright/ccri.txt</header>
<validHeaders>
<validHeader>build/copyright/ccri-2025.txt</validHeader>
<validHeader>build/copyright/ccri-azavea.txt</validHeader>
<validHeader>build/copyright/ccri-dstl.txt</validHeader>
<validHeader>build/copyright/ccri-dstl-mitre.txt</validHeader>
Expand Down

0 comments on commit 3cd4a34

Please sign in to comment.