From 88c692f3c26aba1223b7dc339e17460f43289e99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Tue, 22 Nov 2016 13:52:54 +0000 Subject: [PATCH] Upgrade to Scala 2.12.0 (#242) --- CHANGELOG.md | 3 +- plugin/pom.xml | 6 +- .../cassandra/lucene/schema/Schema.java | 24 ++------ .../lucene/schema/SchemaAnalyzer.java | 1 + .../analysis/SnowballAnalyzerBuilder.java | 2 +- .../search/condition/LuceneCondition.java | 2 +- .../condition/SingleColumnCondition.java | 2 +- .../condition/SingleMapperCondition.java | 2 +- .../com/stratio/cassandra/lucene/Index.scala | 8 +-- .../cassandra/lucene/IndexPagingState.scala | 11 ++-- .../cassandra/lucene/IndexPostProcessor.scala | 2 +- .../cassandra/lucene/IndexQueryHandler.scala | 4 +- .../cassandra/lucene/IndexService.scala | 6 +- .../cassandra/lucene/IndexWriterWide.scala | 6 +- .../lucene/mapping/ClusteringMapper.scala | 30 ++++------ .../lucene/mapping/ExpressionMapper.scala | 4 +- .../cassandra/lucene/mapping/KeyMapper.scala | 4 +- .../lucene/mapping/PartitionMapper.scala | 18 ++---- .../lucene/util/JavaConversions.scala | 56 ------------------- .../cassandra/lucene/util/TaskQueue.scala | 13 ++--- .../lucene/schema/SchemaBuilderTest.java | 18 +++--- .../cassandra/lucene/schema/SchemaTest.java | 4 +- 22 files changed, 68 insertions(+), 158 deletions(-) delete mode 100644 plugin/src/main/scala/com/stratio/cassandra/lucene/util/JavaConversions.scala diff --git a/CHANGELOG.md b/CHANGELOG.md index e51a0f2b0..a9b61e2f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ # Changelog -## 3.9.2 (Upcoming) +## 3.9.3 (Upcoming) +* Upgrade to Scala 2.12.0 * Avoid not required string interpolations in logging * Avoid not required string interpolations in tracing diff --git a/plugin/pom.xml b/plugin/pom.xml index 73be38b6c..448c3b101 100644 --- a/plugin/pom.xml +++ b/plugin/pom.xml @@ -35,10 +35,10 @@ Cassandra Lucene Index plugin - 2.11.8 - 2.11 + 2.12.0 + 2.12 3.5.0 - 3.0.0 + 3.0.1 3.9 5.5.1 1.14.0 diff --git a/plugin/src/main/java/com/stratio/cassandra/lucene/schema/Schema.java b/plugin/src/main/java/com/stratio/cassandra/lucene/schema/Schema.java index 7d8326ba6..f2f969a54 100644 --- a/plugin/src/main/java/com/stratio/cassandra/lucene/schema/Schema.java +++ b/plugin/src/main/java/com/stratio/cassandra/lucene/schema/Schema.java @@ -46,7 +46,10 @@ public class Schema implements Closeable { public final Map mappers; /** The wrapping all-in-one {@link Analyzer}. */ - private final SchemaAnalyzer analyzer; + public final SchemaAnalyzer analyzer; + + /** The default {@link Analyzer}. */ + public final Analyzer defaultAnalyzer; /** The names of the mapped cells. */ private final Set mappedCells; @@ -60,6 +63,7 @@ public class Schema implements Closeable { */ public Schema(Analyzer defaultAnalyzer, Map mappers, Map analyzers) { this.mappers = mappers; + this.defaultAnalyzer = defaultAnalyzer; this.analyzer = new SchemaAnalyzer(defaultAnalyzer, analyzers, mappers); mappedCells = mappers.values() .stream() @@ -68,24 +72,6 @@ public Schema(Analyzer defaultAnalyzer, Map mappers, Map} */ public class SchemaAnalyzer extends DelegatingAnalyzerWrapper { + private final TokenLengthAnalyzer defaultAnalyzer; private final Map fieldAnalyzers; diff --git a/plugin/src/main/java/com/stratio/cassandra/lucene/schema/analysis/SnowballAnalyzerBuilder.java b/plugin/src/main/java/com/stratio/cassandra/lucene/schema/analysis/SnowballAnalyzerBuilder.java index a8ea8c525..c27a811cc 100644 --- a/plugin/src/main/java/com/stratio/cassandra/lucene/schema/analysis/SnowballAnalyzerBuilder.java +++ b/plugin/src/main/java/com/stratio/cassandra/lucene/schema/analysis/SnowballAnalyzerBuilder.java @@ -138,7 +138,7 @@ public SnowballAnalyzer(String language, CharArraySet stopwords) { /** {@inheritDoc} */ @Override - protected TokenStreamComponents createComponents(String fieldName) { + protected Analyzer.TokenStreamComponents createComponents(String fieldName) { final Tokenizer source = new StandardTokenizer(); TokenStream result = new StandardFilter(source); result = new LowerCaseFilter(result); diff --git a/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/LuceneCondition.java b/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/LuceneCondition.java index 934b23978..c67e1c128 100644 --- a/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/LuceneCondition.java +++ b/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/LuceneCondition.java @@ -78,7 +78,7 @@ public Set postProcessingFields() { @Override public Query doQuery(Schema schema) { try { - Analyzer analyzer = schema.analyzer(); + Analyzer analyzer = schema.analyzer; QueryParser queryParser = new QueryParser(defaultField, analyzer); queryParser.setAllowLeadingWildcard(true); queryParser.setLowercaseExpandedTerms(false); diff --git a/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/SingleColumnCondition.java b/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/SingleColumnCondition.java index ee09cb27c..7ec49d360 100644 --- a/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/SingleColumnCondition.java +++ b/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/SingleColumnCondition.java @@ -56,7 +56,7 @@ public final Query doQuery(Schema schema) { SingleColumnMapper.class.getSimpleName(), mapper); } - return doQuery((SingleColumnMapper) mapper, schema.analyzer()); + return doQuery((SingleColumnMapper) mapper, schema.analyzer); } /** diff --git a/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/SingleMapperCondition.java b/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/SingleMapperCondition.java index 3997a0b5c..626b1e896 100644 --- a/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/SingleMapperCondition.java +++ b/plugin/src/main/java/com/stratio/cassandra/lucene/search/condition/SingleMapperCondition.java @@ -58,7 +58,7 @@ public final Query doQuery(Schema schema) { } else if (!type.isAssignableFrom(mapper.getClass())) { throw new IndexException("Field '{}' requires a mapper of type '{}' but found '{}'", field, type, mapper); } - return doQuery((T) mapper, schema.analyzer()); + return doQuery((T) mapper, schema.analyzer); } /** diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/Index.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/Index.scala index ec12ef7ca..bad9bbdfb 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/Index.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/Index.scala @@ -22,7 +22,6 @@ import java.util.{Collections, Optional} import java.{util => java} import com.stratio.cassandra.lucene.search.Search -import com.stratio.cassandra.lucene.util.JavaConversions._ import com.stratio.cassandra.lucene.util.Logging import org.apache.cassandra.config.{CFMetaData, ColumnDefinition} import org.apache.cassandra.cql3.Operator @@ -117,7 +116,7 @@ class Index(table: ColumnFamilyStore, indexMetadata: IndexMetadata) * * @return the Index's backing storage table */ - override def getBackingTable: Optional[ColumnFamilyStore] = None + override def getBackingTable: Optional[ColumnFamilyStore] = Optional.empty() /** Return a task which performs a blocking flush of the index's data to persistent storage. * @@ -310,10 +309,7 @@ class Index(table: ColumnFamilyStore, indexMetadata: IndexMetadata) override def searcherFor(command: ReadCommand): Searcher = { logger.trace(s"Getting searcher for $command") try { - new Searcher { - override def search(controller: ReadExecutionController): UnfilteredPartitionIterator = - service.search(command, controller) - } + controller => service.search(command, controller) } catch { case e: Exception => logger.error(s"Error getting searcher for command: $command", e) diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexPagingState.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexPagingState.scala index 42a289590..dbd7d8b69 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexPagingState.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexPagingState.scala @@ -70,9 +70,8 @@ class IndexPagingState(var remaining: Int) { val cfs = Keyspace.open(command.metadata.ksName).getColumnFamilyStore(command.metadata.cfName) for (expr <- command.rowFilter.getExpressions.asScala) { for (index <- cfs.indexManager.listIndexes.asScala) { - if (index.isInstanceOf[Index] && index.supportsExpression( - expr.column, - expr.operator)) return expr + if (index.isInstanceOf[Index] && index.supportsExpression(expr.column, expr.operator)) + return expr } } throw new IndexException("Not found expression") @@ -86,7 +85,7 @@ class IndexPagingState(var remaining: Int) { @throws[ReflectiveOperationException] def rewrite(query: ReadQuery): Unit = query match { case group: SinglePartitionReadCommand.Group => - group.commands.asScala.foreach(rewrite) + group.commands.forEach(rewrite) case read: ReadCommand => val expression = indexExpression(read) val oldValue = expressionValueField.get(expression).asInstanceOf[ByteBuffer] @@ -123,7 +122,7 @@ class IndexPagingState(var remaining: Int) { while (partition.hasNext) { val newRowIterator = new SingleRowIterator(partition) rowIterators += newRowIterator - entries.put(key, newRowIterator.row.clustering) + entries.put(key, newRowIterator.row.clustering()) if (remaining > 0) remaining -= 1 count += 1 } @@ -175,7 +174,7 @@ class IndexPagingState(var remaining: Int) { if (hasMorePages) new PagingState(toByteBuffer, null, remaining, remaining) else null } - /** @inheritdoc */ + /** @inheritdoc*/ override def toString: String = { MoreObjects.toStringHelper(this).add("remaining", remaining).add("entries", entries).toString } diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexPostProcessor.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexPostProcessor.scala index d3b85d17f..09725a657 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexPostProcessor.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexPostProcessor.scala @@ -139,7 +139,7 @@ sealed abstract class IndexPostProcessor[A <: ReadQuery](service: IndexService) val doc = new Document val cols = service.columns(key, row) service.keyIndexableFields(key, row).foreach(doc.add) - service.schema.postProcessingIndexableFields(cols, search).asScala.foreach(doc.add) + service.schema.postProcessingIndexableFields(cols, search).forEach(doc add _) doc } } diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexQueryHandler.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexQueryHandler.scala index 03acbfed7..1ad0b55d5 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexQueryHandler.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexQueryHandler.scala @@ -133,14 +133,14 @@ class IndexQueryHandler extends QueryHandler with Logging { val expressions = select.getRowFilter(options).getExpressions val cfs = Keyspace.open(select.keyspace).getColumnFamilyStore(select.columnFamily) val indexes = cfs.indexManager.listIndexes.asScala.collect { case index: Index => index } - expressions.asScala.foreach { + expressions.forEach { case expression: CustomExpression => val clazz = expression.getTargetIndex.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME) if (clazz == classOf[Index].getCanonicalName) { val index = cfs.indexManager.getIndex(expression.getTargetIndex).asInstanceOf[Index] map += expression -> index } - case expr => + case expr: Expression => indexes.filter(_.supportsExpression(expr.column, expr.operator)).foreach(map.put(expr, _)) } map.toMap diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexService.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexService.scala index 677b066a2..22642db22 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexService.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexService.scala @@ -70,7 +70,7 @@ abstract class IndexService(val table: ColumnFamilyStore, val indexMetadata: Ind val lucene = new FSIndex( idxName, options.path, - options.schema.analyzer(), + options.schema.analyzer, options.refreshSeconds, options.ramBufferMB, options.maxMergeMB, @@ -227,7 +227,7 @@ abstract class IndexService(val table: ColumnFamilyStore, val indexMetadata: Ind } else { val doc = new Document keyIndexableFields(key, row).foreach(doc.add) - fields.asScala.foreach(doc.add) + fields.forEach(doc add _) lucene.upsert(t, doc) } }) @@ -358,7 +358,7 @@ abstract class IndexService(val table: ColumnFamilyStore, val indexMetadata: Ind */ def validate(update: PartitionUpdate) { val key = update.partitionKey - update.asScala.foreach(row => schema.validate(columns(key, row))) + update.forEach(row => schema.validate(columns(key, row))) } /** @inheritdoc */ diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexWriterWide.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexWriterWide.scala index 7112e872e..9c019fe50 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexWriterWide.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/IndexWriterWide.scala @@ -78,10 +78,10 @@ class IndexWriterWide( read(key, clusterings) .asScala .map(_.asInstanceOf[Row]) - .foreach(row => rows.put(row.clustering, row)) + .foreach(row => rows.put(row.clustering(), row)) // Write rows - for ((clustering, row) <- rows.asScala) { + rows.forEach((clustering, row) => { if (row.hasLiveData(nowInSec)) { tracer.trace("Lucene index writing document") service.upsert(key, row, nowInSec) @@ -89,7 +89,7 @@ class IndexWriterWide( tracer.trace("Lucene index deleting document") service.delete(key, row) } - } + }) } } \ No newline at end of file diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/ClusteringMapper.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/ClusteringMapper.scala index 6b86a26ac..1afb8b45f 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/ClusteringMapper.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/ClusteringMapper.scala @@ -97,7 +97,7 @@ class ClusteringMapper(metadata: CFMetaData) { * @return a byte buffer representing `clustering` */ def byteBuffer(clustering: Clustering): ByteBuffer = { - (clusteringType.builder /: clustering.getRawValues) (_ add _) build + (clusteringType.builder /: clustering.getRawValues) (_ add _) build() } /** Returns the [[String]] human-readable representation of the specified [[ClusteringPrefix]]. @@ -168,7 +168,7 @@ class ClusteringMapper(metadata: CFMetaData) { */ def query(key: DecoratedKey, filter: ClusteringIndexSliceFilter): Query = { (new BooleanQuery.Builder /: filter.requestedSlices.asScala) ( - (builder, slice) => builder.add(query(key, slice), SHOULD)) build + (builder, slice) => builder.add(query(key, slice), SHOULD)).build() } } @@ -247,23 +247,15 @@ object ClusteringMapper { * @param mapper the primary key mapper to be used */ class ClusteringSort(mapper: ClusteringMapper) extends SortField( - FIELD_NAME, new FieldComparatorSource { - override def newComparator( - field: String, - hits: Int, - sortPos: Int, - reversed: Boolean): FieldComparator[_] = { - new TermValComparator(hits, field, false) { - override def compareValues(t1: BytesRef, t2: BytesRef): Int = { - val comp = compareUnsigned(t1.bytes, 0, PREFIX_SIZE, t2.bytes, 0, PREFIX_SIZE) - if (comp != 0) return comp - val bb1 = ByteBuffer.wrap(t1.bytes, PREFIX_SIZE, t1.length - PREFIX_SIZE) - val bb2 = ByteBuffer.wrap(t2.bytes, PREFIX_SIZE, t2.length - PREFIX_SIZE) - val clustering1 = mapper.clustering(bb1) - val clustering2 = mapper.clustering(bb2) - mapper.comparator.compare(clustering1, clustering2) - } - } + FIELD_NAME, (field, hits, sortPos, reversed) => new TermValComparator(hits, field, false) { + override def compareValues(t1: BytesRef, t2: BytesRef): Int = { + val comp = compareUnsigned(t1.bytes, 0, PREFIX_SIZE, t2.bytes, 0, PREFIX_SIZE) + if (comp != 0) return comp + val bb1 = ByteBuffer.wrap(t1.bytes, PREFIX_SIZE, t1.length - PREFIX_SIZE) + val bb2 = ByteBuffer.wrap(t2.bytes, PREFIX_SIZE, t2.length - PREFIX_SIZE) + val clustering1 = mapper.clustering(bb1) + val clustering2 = mapper.clustering(bb2) + mapper.comparator.compare(clustering1, clustering2) } }) { diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/ExpressionMapper.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/ExpressionMapper.scala index 7f640cfb8..1d1104253 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/ExpressionMapper.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/ExpressionMapper.scala @@ -133,10 +133,10 @@ case class ExpressionMapper(tableMetadata: CFMetaData, indexMetadata: IndexMetad // Copy row val builder = BTreeRow.unsortedBuilder(nowInSec) - builder.newRow(row.clustering) + builder.newRow(row.clustering()) builder.addRowDeletion(row.deletion) builder.addPrimaryKeyLivenessInfo(row.primaryKeyLivenessInfo) - row.cells.asScala.foreach(builder.addCell) + row.cells.forEach(builder addCell _) // Add score cell val timestamp = row.primaryKeyLivenessInfo.timestamp diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/KeyMapper.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/KeyMapper.scala index bb7596390..c7451f604 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/KeyMapper.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/KeyMapper.scala @@ -53,7 +53,7 @@ class KeyMapper(metadata: CFMetaData) { * @return the byte buffer representing `clustering` */ private def byteBuffer(clustering: Clustering): ByteBuffer = { - (clusteringType.builder /: clustering.getRawValues) (_ add _) build + (clusteringType.builder /: clustering.getRawValues) (_ add _) build() } /** Returns the Lucene [[IndexableField]] representing the specified primary key. @@ -98,7 +98,7 @@ class KeyMapper(metadata: CFMetaData) { */ def query(key: DecoratedKey, filter: ClusteringIndexNamesFilter): Query = { (new BooleanQuery.Builder /: filter.requestedRows.asScala) ( - (builder, clustering) => builder.add(query(key, clustering), SHOULD)) build + (builder, clustering) => builder.add(query(key, clustering), SHOULD)) build() } } diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/PartitionMapper.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/PartitionMapper.scala index 896b37762..81a48378a 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/PartitionMapper.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/mapping/PartitionMapper.scala @@ -154,19 +154,11 @@ object PartitionMapper { * @author Andres de la Pena `adelapena@stratio.com` */ class PartitionSort(mapper: PartitionMapper) extends SortField( - FIELD_NAME, new FieldComparatorSource { - override def newComparator( - field: String, - hits: Int, - sortPos: Int, - reversed: Boolean): FieldComparator[_] = { - new TermValComparator(hits, field, false) { - override def compareValues(t1: BytesRef, t2: BytesRef): Int = { - val bb1 = ByteBufferUtils.byteBuffer(t1) - val bb2 = ByteBufferUtils.byteBuffer(t2) - mapper.validator.compare(bb1, bb2) - } - } + FIELD_NAME, (field, hits, sortPos, reversed) => new TermValComparator(hits, field, false) { + override def compareValues(t1: BytesRef, t2: BytesRef): Int = { + val bb1 = ByteBufferUtils.byteBuffer(t1) + val bb2 = ByteBufferUtils.byteBuffer(t2) + mapper.validator.compare(bb1, bb2) } }) { diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/util/JavaConversions.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/util/JavaConversions.scala deleted file mode 100644 index 9c6a3ec90..000000000 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/util/JavaConversions.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (C) 2014 Stratio (http://stratio.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.stratio.cassandra.lucene.util - -import java.util.Optional -import java.util.concurrent.Callable -import java.util.function.{BiFunction, Function} - -/** Implicit Scala to Java conversions. - * - * @author Andres de la Pena `adelapena@stratio.com` - */ -object JavaConversions { - - implicit def asJavaCallable[A](f: () => A): Callable[A] = { - new Callable[A] { - override def call: A = f.apply - } - } - - implicit def asJavaRunnable(f: () => Unit): Runnable = { - new Runnable { - override def run(): Unit = f.apply - } - } - - implicit def asJavaFunction[A, B](f: A => B): Function[A, B] = { - new Function[A, B] { - override def apply(a: A): B = f(a) - } - } - - implicit def asJavaBiFunction[A, B, C](sf: (A, B) => C): BiFunction[A, B, C] = { - new BiFunction[A, B, C] { - override def apply(a: A, b: B): C = sf(a, b) - } - } - - implicit def asJavaOptional[A](o: Option[A]): Optional[A] = o match { - case Some(a) => Optional.of(a) - case None => Optional.empty() - } -} diff --git a/plugin/src/main/scala/com/stratio/cassandra/lucene/util/TaskQueue.scala b/plugin/src/main/scala/com/stratio/cassandra/lucene/util/TaskQueue.scala index bd1fcac8f..00e591f85 100644 --- a/plugin/src/main/scala/com/stratio/cassandra/lucene/util/TaskQueue.scala +++ b/plugin/src/main/scala/com/stratio/cassandra/lucene/util/TaskQueue.scala @@ -21,7 +21,8 @@ import java.util.concurrent._ import java.util.concurrent.locks.ReentrantReadWriteLock import com.stratio.cassandra.lucene.IndexException -import com.stratio.cassandra.lucene.util.JavaConversions.asJavaCallable + +//import com.stratio.cassandra.lucene.util.JavaConversions.asJavaCallable import scala.concurrent.ExecutionException @@ -77,16 +78,14 @@ private class TaskQueueAsync(numThreads: Int, queuesSize: Int) extends TaskQueue private val lock = new ReentrantReadWriteLock(true) private val pools = (1 to numThreads) .map(index => new ArrayBlockingQueue[Runnable](queuesSize, true)) - .map(queue => new ThreadPoolExecutor(1, 1, 1, DAYS, queue, new RejectedExecutionHandler() { - override def rejectedExecution(task: Runnable, executor: ThreadPoolExecutor) = - if (!executor.isShutdown) executor.getQueue.put(task) - })) + .map(queue => new ThreadPoolExecutor(1, 1, 1, DAYS, queue, + (task, executor) => if (!executor.isShutdown) executor.getQueue.put(task))) /** @inheritdoc */ override def submitAsynchronous[A](id: AnyRef, task: () => A): Unit = { lock.readLock.lock() try { - pools(Math.abs(id.hashCode % numThreads)).submit(task) + pools(Math.abs(id.hashCode % numThreads)).submit(() => task.apply()) } catch { case e: Exception => logger.error("Task queue asynchronous submission failed", e) @@ -98,7 +97,7 @@ private class TaskQueueAsync(numThreads: Int, queuesSize: Int) extends TaskQueue override def submitSynchronous[A](task: () => A): A = { lock.writeLock.lock() try { - pools.map(_.submit(() => {})).foreach(_.get) // Wait for queued tasks completion + pools.map(_.submit(() => None)).map(_.get()) // Wait for queued tasks completion task.apply // Run synchronous task } catch { case e: InterruptedException => diff --git a/plugin/src/test/java/com/stratio/cassandra/lucene/schema/SchemaBuilderTest.java b/plugin/src/test/java/com/stratio/cassandra/lucene/schema/SchemaBuilderTest.java index e912fbd13..e1b972783 100644 --- a/plugin/src/test/java/com/stratio/cassandra/lucene/schema/SchemaBuilderTest.java +++ b/plugin/src/test/java/com/stratio/cassandra/lucene/schema/SchemaBuilderTest.java @@ -50,7 +50,7 @@ public void testBuild() throws Exception { .mapper("text", textMapper().analyzer("snowball")) .mapper("uuid", uuidMapper()) .build(); - assertEquals("Failed schema building", EnglishAnalyzer.class, schema.defaultAnalyzer().getClass()); + assertEquals("Failed schema building", EnglishAnalyzer.class, schema.defaultAnalyzer.getClass()); assertEquals("Failed schema building", BlobMapper.class, schema.mapper("blob").getClass()); assertEquals("Failed schema building", BooleanMapper.class, schema.mapper("bool").getClass()); assertEquals("Failed schema building", DateMapper.class, schema.mapper("date").getClass()); @@ -73,7 +73,7 @@ public void testBuildNumeric() throws Exception { .mapper("int", integerMapper().boost(0.3f)) .mapper("long", longMapper()) .build(); - assertEquals("Failed schema building", EnglishAnalyzer.class, schema.defaultAnalyzer().getClass()); + assertEquals("Failed schema building", EnglishAnalyzer.class, schema.defaultAnalyzer.getClass()); assertEquals("Failed schema building", BigIntegerMapper.class, schema.mapper("big_int").getClass()); assertEquals("Failed schema building", BigDecimalMapper.class, schema.mapper("big_dec").getClass()); assertEquals("Failed schema building", DoubleMapper.class, schema.mapper("double").getClass()); @@ -96,7 +96,7 @@ public void testBuildComplex() throws Exception { .mapper("date_range", dateRangeMapper("from", "to")) .mapper("geo", geoPointMapper("lat", "lon")) .build(); - assertEquals("Failed schema building", EnglishAnalyzer.class, schema.defaultAnalyzer().getClass()); + assertEquals("Failed schema building", EnglishAnalyzer.class, schema.defaultAnalyzer.getClass()); assertEquals("Failed schema building", BitemporalMapper.class, schema.mapper("bitemporal").getClass()); assertEquals("Failed schema building", DateRangeMapper.class, schema.mapper("date_range").getClass()); assertEquals("Failed schema building", GeoPointMapper.class, schema.mapper("geo").getClass()); @@ -166,7 +166,7 @@ public void testFromJsonRegular() throws IOException { "uuid:{type:\"uuid\"}" + "}}"; Schema schema = SchemaBuilder.fromJson(json).build(); - assertEquals("Failed schema JSON parsing", EnglishAnalyzer.class, schema.defaultAnalyzer().getClass()); + assertEquals("Failed schema JSON parsing", EnglishAnalyzer.class, schema.defaultAnalyzer.getClass()); assertEquals("Failed schema JSON parsing", BlobMapper.class, schema.mapper("blob").getClass()); assertEquals("Failed schema JSON parsing", BooleanMapper.class, schema.mapper("bool").getClass()); assertEquals("Failed schema JSON parsing", DateMapper.class, schema.mapper("date").getClass()); @@ -192,7 +192,7 @@ public void testFromJsonNumeric() throws IOException { "int:{type:\"integer\"}," + "long:{type:\"long\"}}}"; Schema schema = SchemaBuilder.fromJson(json).build(); - assertEquals("Failed schema JSON parsing", EnglishAnalyzer.class, schema.defaultAnalyzer().getClass()); + assertEquals("Failed schema JSON parsing", EnglishAnalyzer.class, schema.defaultAnalyzer.getClass()); assertEquals("Failed schema JSON parsing", BigIntegerMapper.class, schema.mapper("big_int").getClass()); assertEquals("Failed schema JSON parsing", BigDecimalMapper.class, schema.mapper("big_dec").getClass()); assertEquals("Failed schema JSON parsing", DoubleMapper.class, schema.mapper("double").getClass()); @@ -214,7 +214,7 @@ public void testFromJsonComplex() throws IOException { "geo:{type:\"geo_point\",latitude:\"lat\",longitude:\"lon\"}" + "}}"; Schema schema = SchemaBuilder.fromJson(json).build(); - assertEquals("Failed schema JSON parsing", EnglishAnalyzer.class, schema.defaultAnalyzer().getClass()); + assertEquals("Failed schema JSON parsing", EnglishAnalyzer.class, schema.defaultAnalyzer.getClass()); assertEquals("Failed schema JSON parsing", BitemporalMapper.class, schema.mapper("bitemporal").getClass()); assertEquals("Failed schema JSON parsing", DateRangeMapper.class, schema.mapper("date_range").getClass()); assertEquals("Failed schema JSON parsing", GeoPointMapper.class, schema.mapper("geo").getClass()); @@ -239,7 +239,7 @@ public void testFromJSONWithNullAnalyzers() throws IOException { Schema schema = SchemaBuilder.fromJson(json).build(); - Analyzer defaultAnalyzer = schema.defaultAnalyzer(); + Analyzer defaultAnalyzer = schema.defaultAnalyzer; assertTrue("Expected english analyzer", defaultAnalyzer instanceof EnglishAnalyzer); Mapper idMapper = schema.mapper("id"); @@ -276,7 +276,7 @@ public void testFromJSONWithEmptyAnalyzers() throws IOException { " }'"; Schema schema = SchemaBuilder.fromJson(json).build(); - Analyzer defaultAnalyzer = schema.defaultAnalyzer(); + Analyzer defaultAnalyzer = schema.defaultAnalyzer; assertTrue("Expected EnglishAnalyzer", defaultAnalyzer instanceof EnglishAnalyzer); Mapper idMapper = schema.mapper("id"); @@ -310,7 +310,7 @@ public void testParseJSONWithNullDefaultAnalyzer() throws IOException { " }'"; Schema schema = SchemaBuilder.fromJson(json).build(); - Analyzer defaultAnalyzer = schema.defaultAnalyzer(); + Analyzer defaultAnalyzer = schema.defaultAnalyzer; assertEquals("Expected default analyzer", StandardAnalyzers.DEFAULT.get().getClass(), defaultAnalyzer.getClass()); diff --git a/plugin/src/test/java/com/stratio/cassandra/lucene/schema/SchemaTest.java b/plugin/src/test/java/com/stratio/cassandra/lucene/schema/SchemaTest.java index 866dd6e2b..55398ec41 100644 --- a/plugin/src/test/java/com/stratio/cassandra/lucene/schema/SchemaTest.java +++ b/plugin/src/test/java/com/stratio/cassandra/lucene/schema/SchemaTest.java @@ -40,7 +40,7 @@ public void testGetDefaultAnalyzer() { Map mappers = new HashMap<>(); Map analyzers = new HashMap<>(); Schema schema = new Schema(new EnglishAnalyzer(), mappers, analyzers); - Analyzer analyzer = schema.defaultAnalyzer(); + Analyzer analyzer = schema.defaultAnalyzer; assertEquals("Expected english analyzer", EnglishAnalyzer.class, analyzer.getClass()); schema.close(); } @@ -50,7 +50,7 @@ public void testGetDefaultAnalyzerNotSpecified() { Map mappers = new HashMap<>(); Map analyzers = new HashMap<>(); Schema schema = new Schema(new EnglishAnalyzer(), mappers, analyzers); - Analyzer analyzer = schema.defaultAnalyzer(); + Analyzer analyzer = schema.defaultAnalyzer; assertEquals("Expected default analyzer", EnglishAnalyzer.class, analyzer.getClass()); schema.close(); }