Skip to content

Commit

Permalink
feat: cache pojo mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jan 12, 2024
1 parent 84e9390 commit c7c9eb0
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public final class ServiceLoader<S> implements Iterable<S> {
private LazyIterator lookupIterator;

public static <S> ServiceLoader<S> load(Class<S> service) {
return ServiceLoader.load(service, Thread.currentThread().getContextClassLoader());
return ServiceLoader.load(service, ServiceLoader.class.getClassLoader());
}

public static <S> ServiceLoader<S> load(Class<S> service, ClassLoader loader) {
Expand Down
151 changes: 151 additions & 0 deletions ingester-protocol/src/main/java/io/greptime/CachedPojoMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.greptime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.greptime.common.SPI;
import io.greptime.common.util.Ensures;
import io.greptime.errors.PojoException;
import io.greptime.models.Column;
import io.greptime.models.DataType;
import io.greptime.models.Metric;
import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* This utility class converts POJO objects into {@link Table} objects,
* inspired by <a href="https://github.com/influxdata/influxdb-client-java/blob/master/client/src/main/java/com/influxdb/client/internal/MeasurementMapper.java">InfluxDB client-java</a>.
*
* @author jiachun.fjc
*/
@SPI(priority = 9)
public class CachedPojoMapper implements PojoMapper {

private final LoadingCache<Class<?>, Map<String, Field>> classFieldCache;

public CachedPojoMapper() {
this(1024);
}

public CachedPojoMapper(int maxCachedPOJOs) {
this.classFieldCache =
CacheBuilder.newBuilder().maximumSize(maxCachedPOJOs)
.build(new CacheLoader<Class<?>, Map<String, Field>>() {
@SuppressWarnings("NullableProblems")
@Override
public Map<String, Field> load(Class<?> key) {
return createMetricClass(key);
}
});
}

@Override
public <M> Table mapToTable(List<M> pojos) {
Ensures.ensureNonNull(pojos, "pojos");
Ensures.ensure(!pojos.isEmpty(), "pojos can not be empty");

M first = pojos.get(0);

Class<?> metricType = first.getClass();

Map<String, Field> fieldMap = this.classFieldCache.getUnchecked(metricType);

String metricName = getMetricName(metricType);

TableSchema.Builder schemaBuilder = TableSchema.newBuilder(metricName);
for (Map.Entry<String, Field> entry : fieldMap.entrySet()) {
String name = entry.getKey();
Field field = entry.getValue();
Column column = field.getAnnotation(Column.class);
DataType dataType = column.dataType();
SemanticType semanticType = SemanticType.Field;
if (column.tag()) {
semanticType = SemanticType.Tag;
} else if (column.timestamp()) {
semanticType = SemanticType.Timestamp;
}
schemaBuilder.addColumn(name, semanticType, dataType);
}

Table table = Table.from(schemaBuilder.build());
for (M pojo : pojos) {
Class<?> type = pojo.getClass();
if (!type.equals(metricType)) {
throw new PojoException("All POJOs must be of the same type");
}

Object[] values = new Object[fieldMap.size()];
int j = 0;
for (Map.Entry<String, Field> entry : fieldMap.entrySet()) {
Field field = entry.getValue();
Object value = getObject(pojo, field);
values[j] = value;

j++;
}
table.addRow(values);
}

return table;
}

private String getMetricName(Class<?> metricType) {
// From @Metric annotation
Metric metricAnnotation = metricType.getAnnotation(Metric.class);
if (metricAnnotation != null) {
return metricAnnotation.name();
} else {
String err =
String.format("Unable to determine Metric for '%s'." + " Does it have a @Metric annotation?",
metricType);
throw new PojoException(err);
}
}

private <M> Object getObject(M metric, Field field) {
Object value;
try {
field.setAccessible(true);
value = field.get(metric);
} catch (IllegalAccessException e) {
throw new PojoException(e);
}
return value;
}

private Map<String, Field> createMetricClass(Class<?> metricType) {
Map<String, Field> fieldMap = new HashMap<>();
Class<?> currentType = metricType;
while (currentType != null) {
for (Field field : currentType.getDeclaredFields()) {
Column colAnnotation = field.getAnnotation(Column.class);
if (colAnnotation == null) {
continue;
}
fieldMap.put(colAnnotation.name(), field);
}
currentType = currentType.getSuperclass();
}
return fieldMap;
}
}
19 changes: 14 additions & 5 deletions ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.greptime.common.Lifecycle;
import io.greptime.common.signal.SignalHandlersLoader;
import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.ServiceLoader;
import io.greptime.common.util.Strings;
import io.greptime.models.Err;
import io.greptime.models.Result;
Expand Down Expand Up @@ -57,17 +58,16 @@ public class GreptimeDB implements Write, WritePOJO, Lifecycle<GreptimeOptions>,

private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class);


private static final Map<Integer, GreptimeDB> INSTANCES = new ConcurrentHashMap<>();
private static final AtomicInteger ID = new AtomicInteger(0);
private static final String VERSION = Util.clientVersion();
private static final String NODE_ID = UUID.randomUUID().toString();

private static final PojoMapper POJO_MAPPER = getDefaultPojoMapper();

private final int id;
private final AtomicBoolean started = new AtomicBoolean(false);

private final PojoMapper pojoMapper = new PojoMapper(65536);

private GreptimeOptions opts;
private RouterClient routerClient;
private WriteClient writeClient;
Expand Down Expand Up @@ -143,7 +143,7 @@ public void ensureInitialized() {
public CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, WriteOp writeOp, Context ctx) {
List<Table> rows = new ArrayList<>(pojos.size());
for (List<?> pojo : pojos) {
rows.add(this.pojoMapper.toTableData(pojo));
rows.add(POJO_MAPPER.mapToTable(pojo));
}
return write(rows, writeOp, ctx);
}
Expand All @@ -154,7 +154,7 @@ public StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond,
return new StreamWriter<List<?>, WriteOk>() {
@Override
public StreamWriter<List<?>, WriteOk> write(List<?> val, WriteOp writeOp) {
Table table = pojoMapper.toTableData(val);
Table table = POJO_MAPPER.mapToTable(val);
delegate.write(table, writeOp);
return this;
}
Expand Down Expand Up @@ -320,6 +320,15 @@ private void truncateBuf() {
}
}

private static PojoMapper getDefaultPojoMapper() {
try {
return ServiceLoader.load(PojoMapper.class).first();
} catch (Throwable t) {
LOG.warn("Failed to load `PojoMapper`, use default: `CachedPojoMapper(1024)`", t);
return new CachedPojoMapper();
}
}

private static void doGlobalInitializeWorks() {
// load all signal handlers
SignalHandlersLoader.load();
Expand Down
119 changes: 3 additions & 116 deletions ingester-protocol/src/main/java/io/greptime/PojoMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,128 +15,15 @@
*/
package io.greptime;

import io.greptime.common.util.Ensures;
import io.greptime.errors.PojoException;
import io.greptime.models.Column;
import io.greptime.models.DataType;
import io.greptime.models.Metric;
import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* This utility class converts POJO classes into {@link Table} objects,
* inspired by <a href="https://github.com/influxdata/influxdb-client-java/blob/master/client/src/main/java/com/influxdb/client/internal/MeasurementMapper.java">InfluxDB client-java</a>.
* Converts POJO objects into {@link Table} objects
*
* @author jiachun.fjc
*/
public class PojoMapper {
public interface PojoMapper {

private final ConcurrentMap<String, Map<String, Field>> classFieldCache = new ConcurrentHashMap<>();
private final int maxCachedPOJOs;

public PojoMapper(int maxCachedPOJOs) {
this.maxCachedPOJOs = maxCachedPOJOs;
}

public <M> Table toTableData(List<M> pojos) {
Ensures.ensureNonNull(pojos, "pojos");
Ensures.ensure(!pojos.isEmpty(), "pojos can not be empty");

M first = pojos.get(0);

Class<?> metricType = first.getClass();

Map<String, Field> fieldMap = getAndCacheMetricClass(metricType);

String metricName = getMetricName(metricType);

TableSchema.Builder schemaBuilder = TableSchema.newBuilder(metricName);
for (Map.Entry<String, Field> entry : fieldMap.entrySet()) {
String name = entry.getKey();
Field field = entry.getValue();
Column column = field.getAnnotation(Column.class);
DataType dataType = column.dataType();
SemanticType semanticType = SemanticType.Field;
if (column.tag()) {
semanticType = SemanticType.Tag;
} else if (column.timestamp()) {
semanticType = SemanticType.Timestamp;
}
schemaBuilder.addColumn(name, semanticType, dataType);
}

Table table = Table.from(schemaBuilder.build());
for (M pojo : pojos) {
Class<?> type = pojo.getClass();
if (!type.equals(metricType)) {
throw new PojoException("All POJOs must be of the same type");
}

Object[] values = new Object[fieldMap.size()];
int j = 0;
for (Map.Entry<String, Field> entry : fieldMap.entrySet()) {
Field field = entry.getValue();
Object value = getObject(pojo, field);
values[j] = value;

j++;
}
table.addRow(values);
}

return table;
}

private String getMetricName(Class<?> metricType) {
// From @Metric annotation
Metric metricAnnotation = metricType.getAnnotation(Metric.class);
if (metricAnnotation != null) {
return metricAnnotation.name();
} else {
String err =
String.format("Unable to determine Metric for '%s'." + " Does it have a @Metric annotation?",
metricType);
throw new PojoException(err);
}
}

private <M> Object getObject(M metric, Field field) {
Object value;
try {
field.setAccessible(true);
value = field.get(metric);
} catch (IllegalAccessException e) {
throw new PojoException(e);
}
return value;
}

private Map<String, Field> getAndCacheMetricClass(Class<?> metricType) {
if (this.classFieldCache.size() >= this.maxCachedPOJOs) {
this.classFieldCache.clear();
}

return this.classFieldCache.computeIfAbsent(metricType.getName(), k -> {
Map<String, Field> fieldMap = new HashMap<>();
Class<?> currentType = metricType;
while (currentType != null) {
for (Field field : currentType.getDeclaredFields()) {
Column colAnnotation = field.getAnnotation(Column.class);
if (colAnnotation == null) {
continue;
}
fieldMap.put(colAnnotation.name(), field);
}
currentType = currentType.getSuperclass();
}
return fieldMap;
});
}
<M> Table mapToTable(List<M> pojos);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.greptime.CachedPojoMapper
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testToTable() {
Pojo1Test pojo1 = createNewPojo1Test();
pojos1.add(pojo1);
}
Table tp1 = new PojoMapper(65536).toTableData(pojos1);
Table tp1 = new CachedPojoMapper().mapToTable(pojos1);
Assert.assertEquals("pojo1", tp1.tableName());
Assert.assertEquals(50, tp1.pointCount());

Expand All @@ -47,7 +47,7 @@ public void testToTable() {
Pojo2Test pojo2 = createNewPojo2Test();
pojos2.add(pojo2);
}
Table tp2 = new PojoMapper(65536).toTableData(pojos2);
Table tp2 = new CachedPojoMapper().mapToTable(pojos2);
Assert.assertEquals("pojo2", tp2.tableName());
Assert.assertEquals(30, tp2.pointCount());
}
Expand Down

0 comments on commit c7c9eb0

Please sign in to comment.