diff --git a/ingester-common/src/main/java/io/greptime/common/util/ServiceLoader.java b/ingester-common/src/main/java/io/greptime/common/util/ServiceLoader.java index 12083b7..fc2c857 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/ServiceLoader.java +++ b/ingester-common/src/main/java/io/greptime/common/util/ServiceLoader.java @@ -59,7 +59,7 @@ public final class ServiceLoader implements Iterable { private LazyIterator lookupIterator; public static ServiceLoader load(Class service) { - return ServiceLoader.load(service, Thread.currentThread().getContextClassLoader()); + return ServiceLoader.load(service, ServiceLoader.class.getClassLoader()); } public static ServiceLoader load(Class service, ClassLoader loader) { diff --git a/ingester-protocol/src/main/java/io/greptime/CachedPojoMapper.java b/ingester-protocol/src/main/java/io/greptime/CachedPojoMapper.java new file mode 100644 index 0000000..39ade93 --- /dev/null +++ b/ingester-protocol/src/main/java/io/greptime/CachedPojoMapper.java @@ -0,0 +1,151 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.greptime.common.SPI; +import io.greptime.common.util.Ensures; +import io.greptime.errors.PojoException; +import io.greptime.models.Column; +import io.greptime.models.DataType; +import io.greptime.models.Metric; +import io.greptime.models.SemanticType; +import io.greptime.models.Table; +import io.greptime.models.TableSchema; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This utility class converts POJO objects into {@link Table} objects, + * inspired by InfluxDB client-java. + * + * @author jiachun.fjc + */ +@SPI(priority = 9) +public class CachedPojoMapper implements PojoMapper { + + private final LoadingCache, Map> classFieldCache; + + public CachedPojoMapper() { + this(1024); + } + + public CachedPojoMapper(int maxCachedPOJOs) { + this.classFieldCache = + CacheBuilder.newBuilder().maximumSize(maxCachedPOJOs) + .build(new CacheLoader, Map>() { + @SuppressWarnings("NullableProblems") + @Override + public Map load(Class key) { + return createMetricClass(key); + } + }); + } + + @Override + public Table mapToTable(List pojos) { + Ensures.ensureNonNull(pojos, "pojos"); + Ensures.ensure(!pojos.isEmpty(), "pojos can not be empty"); + + M first = pojos.get(0); + + Class metricType = first.getClass(); + + Map fieldMap = this.classFieldCache.getUnchecked(metricType); + + String metricName = getMetricName(metricType); + + TableSchema.Builder schemaBuilder = TableSchema.newBuilder(metricName); + for (Map.Entry entry : fieldMap.entrySet()) { + String name = entry.getKey(); + Field field = entry.getValue(); + Column column = field.getAnnotation(Column.class); + DataType dataType = column.dataType(); + SemanticType semanticType = SemanticType.Field; + if (column.tag()) { + semanticType = SemanticType.Tag; + } else if (column.timestamp()) { + semanticType = SemanticType.Timestamp; + } + schemaBuilder.addColumn(name, semanticType, dataType); + } + + Table table = Table.from(schemaBuilder.build()); + for (M pojo : pojos) { + Class type = pojo.getClass(); + if (!type.equals(metricType)) { + throw new PojoException("All POJOs must be of the same type"); + } + + Object[] values = new Object[fieldMap.size()]; + int j = 0; + for (Map.Entry entry : fieldMap.entrySet()) { + Field field = entry.getValue(); + Object value = getObject(pojo, field); + values[j] = value; + + j++; + } + table.addRow(values); + } + + return table; + } + + private String getMetricName(Class metricType) { + // From @Metric annotation + Metric metricAnnotation = metricType.getAnnotation(Metric.class); + if (metricAnnotation != null) { + return metricAnnotation.name(); + } else { + String err = + String.format("Unable to determine Metric for '%s'." + " Does it have a @Metric annotation?", + metricType); + throw new PojoException(err); + } + } + + private Object getObject(M metric, Field field) { + Object value; + try { + field.setAccessible(true); + value = field.get(metric); + } catch (IllegalAccessException e) { + throw new PojoException(e); + } + return value; + } + + private Map createMetricClass(Class metricType) { + Map fieldMap = new HashMap<>(); + Class currentType = metricType; + while (currentType != null) { + for (Field field : currentType.getDeclaredFields()) { + Column colAnnotation = field.getAnnotation(Column.class); + if (colAnnotation == null) { + continue; + } + fieldMap.put(colAnnotation.name(), field); + } + currentType = currentType.getSuperclass(); + } + return fieldMap; + } +} diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index f0c8426..c53c5be 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -23,6 +23,7 @@ import io.greptime.common.Lifecycle; import io.greptime.common.signal.SignalHandlersLoader; import io.greptime.common.util.MetricsUtil; +import io.greptime.common.util.ServiceLoader; import io.greptime.common.util.Strings; import io.greptime.models.Err; import io.greptime.models.Result; @@ -57,17 +58,16 @@ public class GreptimeDB implements Write, WritePOJO, Lifecycle, private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class); - private static final Map INSTANCES = new ConcurrentHashMap<>(); private static final AtomicInteger ID = new AtomicInteger(0); private static final String VERSION = Util.clientVersion(); private static final String NODE_ID = UUID.randomUUID().toString(); + private static final PojoMapper POJO_MAPPER = getDefaultPojoMapper(); + private final int id; private final AtomicBoolean started = new AtomicBoolean(false); - private final PojoMapper pojoMapper = new PojoMapper(65536); - private GreptimeOptions opts; private RouterClient routerClient; private WriteClient writeClient; @@ -143,7 +143,7 @@ public void ensureInitialized() { public CompletableFuture> writePOJOs(Collection> pojos, WriteOp writeOp, Context ctx) { List rows = new ArrayList<>(pojos.size()); for (List pojo : pojos) { - rows.add(this.pojoMapper.toTableData(pojo)); + rows.add(POJO_MAPPER.mapToTable(pojo)); } return write(rows, writeOp, ctx); } @@ -154,7 +154,7 @@ public StreamWriter, WriteOk> streamWriterPOJOs(int maxPointsPerSecond, return new StreamWriter, WriteOk>() { @Override public StreamWriter, WriteOk> write(List val, WriteOp writeOp) { - Table table = pojoMapper.toTableData(val); + Table table = POJO_MAPPER.mapToTable(val); delegate.write(table, writeOp); return this; } @@ -320,6 +320,15 @@ private void truncateBuf() { } } + private static PojoMapper getDefaultPojoMapper() { + try { + return ServiceLoader.load(PojoMapper.class).first(); + } catch (Throwable t) { + LOG.warn("Failed to load `PojoMapper`, use default: `CachedPojoMapper(1024)`", t); + return new CachedPojoMapper(); + } + } + private static void doGlobalInitializeWorks() { // load all signal handlers SignalHandlersLoader.load(); diff --git a/ingester-protocol/src/main/java/io/greptime/PojoMapper.java b/ingester-protocol/src/main/java/io/greptime/PojoMapper.java index 210f364..a0ceddc 100644 --- a/ingester-protocol/src/main/java/io/greptime/PojoMapper.java +++ b/ingester-protocol/src/main/java/io/greptime/PojoMapper.java @@ -15,128 +15,15 @@ */ package io.greptime; -import io.greptime.common.util.Ensures; -import io.greptime.errors.PojoException; -import io.greptime.models.Column; -import io.greptime.models.DataType; -import io.greptime.models.Metric; -import io.greptime.models.SemanticType; import io.greptime.models.Table; -import io.greptime.models.TableSchema; -import java.lang.reflect.Field; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** - * This utility class converts POJO classes into {@link Table} objects, - * inspired by InfluxDB client-java. + * Converts POJO objects into {@link Table} objects * * @author jiachun.fjc */ -public class PojoMapper { +public interface PojoMapper { - private final ConcurrentMap> classFieldCache = new ConcurrentHashMap<>(); - private final int maxCachedPOJOs; - - public PojoMapper(int maxCachedPOJOs) { - this.maxCachedPOJOs = maxCachedPOJOs; - } - - public Table toTableData(List pojos) { - Ensures.ensureNonNull(pojos, "pojos"); - Ensures.ensure(!pojos.isEmpty(), "pojos can not be empty"); - - M first = pojos.get(0); - - Class metricType = first.getClass(); - - Map fieldMap = getAndCacheMetricClass(metricType); - - String metricName = getMetricName(metricType); - - TableSchema.Builder schemaBuilder = TableSchema.newBuilder(metricName); - for (Map.Entry entry : fieldMap.entrySet()) { - String name = entry.getKey(); - Field field = entry.getValue(); - Column column = field.getAnnotation(Column.class); - DataType dataType = column.dataType(); - SemanticType semanticType = SemanticType.Field; - if (column.tag()) { - semanticType = SemanticType.Tag; - } else if (column.timestamp()) { - semanticType = SemanticType.Timestamp; - } - schemaBuilder.addColumn(name, semanticType, dataType); - } - - Table table = Table.from(schemaBuilder.build()); - for (M pojo : pojos) { - Class type = pojo.getClass(); - if (!type.equals(metricType)) { - throw new PojoException("All POJOs must be of the same type"); - } - - Object[] values = new Object[fieldMap.size()]; - int j = 0; - for (Map.Entry entry : fieldMap.entrySet()) { - Field field = entry.getValue(); - Object value = getObject(pojo, field); - values[j] = value; - - j++; - } - table.addRow(values); - } - - return table; - } - - private String getMetricName(Class metricType) { - // From @Metric annotation - Metric metricAnnotation = metricType.getAnnotation(Metric.class); - if (metricAnnotation != null) { - return metricAnnotation.name(); - } else { - String err = - String.format("Unable to determine Metric for '%s'." + " Does it have a @Metric annotation?", - metricType); - throw new PojoException(err); - } - } - - private Object getObject(M metric, Field field) { - Object value; - try { - field.setAccessible(true); - value = field.get(metric); - } catch (IllegalAccessException e) { - throw new PojoException(e); - } - return value; - } - - private Map getAndCacheMetricClass(Class metricType) { - if (this.classFieldCache.size() >= this.maxCachedPOJOs) { - this.classFieldCache.clear(); - } - - return this.classFieldCache.computeIfAbsent(metricType.getName(), k -> { - Map fieldMap = new HashMap<>(); - Class currentType = metricType; - while (currentType != null) { - for (Field field : currentType.getDeclaredFields()) { - Column colAnnotation = field.getAnnotation(Column.class); - if (colAnnotation == null) { - continue; - } - fieldMap.put(colAnnotation.name(), field); - } - currentType = currentType.getSuperclass(); - } - return fieldMap; - }); - } + Table mapToTable(List pojos); } diff --git a/ingester-protocol/src/main/resources/META-INF/services/io.greptime.PojoMapper b/ingester-protocol/src/main/resources/META-INF/services/io.greptime.PojoMapper new file mode 100644 index 0000000..231782d --- /dev/null +++ b/ingester-protocol/src/main/resources/META-INF/services/io.greptime.PojoMapper @@ -0,0 +1 @@ +io.greptime.CachedPojoMapper diff --git a/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java b/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java index 8cefe17..280dfc1 100644 --- a/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java +++ b/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java @@ -37,7 +37,7 @@ public void testToTable() { Pojo1Test pojo1 = createNewPojo1Test(); pojos1.add(pojo1); } - Table tp1 = new PojoMapper(65536).toTableData(pojos1); + Table tp1 = new CachedPojoMapper().mapToTable(pojos1); Assert.assertEquals("pojo1", tp1.tableName()); Assert.assertEquals(50, tp1.pointCount()); @@ -47,7 +47,7 @@ public void testToTable() { Pojo2Test pojo2 = createNewPojo2Test(); pojos2.add(pojo2); } - Table tp2 = new PojoMapper(65536).toTableData(pojos2); + Table tp2 = new CachedPojoMapper().mapToTable(pojos2); Assert.assertEquals("pojo2", tp2.tableName()); Assert.assertEquals(30, tp2.pointCount()); }