Skip to content

Commit

Permalink
FMWK-496 Resolve potential memory leak related to object references (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jul 15, 2024
1 parent 9e89592 commit 1562be5
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 47 deletions.
20 changes: 8 additions & 12 deletions src/main/java/com/aerospike/mapper/tools/AeroMapper.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
package com.aerospike.mapper.tools;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import javax.validation.constraints.NotNull;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.AerospikeException.ScanTerminated;
import com.aerospike.client.Bin;
Expand All @@ -29,7 +21,13 @@
import com.aerospike.mapper.tools.converters.MappingConverter;
import com.aerospike.mapper.tools.utils.MapperUtils;
import com.aerospike.mapper.tools.virtuallist.VirtualList;
import reactor.core.publisher.Mono;

import javax.validation.constraints.NotNull;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class AeroMapper implements IAeroMapper {

Expand All @@ -42,7 +40,7 @@ private AeroMapper(@NotNull IAerospikeClient client) {
}

/**
* Create a new Builder to instantiate the AeroMapper.
* Create a new Builder to instantiate the AeroMapper.
* @author tfaulkes
*
*/
Expand Down Expand Up @@ -233,12 +231,10 @@ private <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key,
} else {
try {
ThreadLocalKeySaver.save(key);
LoadedObjectResolver.begin();
return mappingConverter.convertToObject(clazz, key, record, entry, resolveDependencies);
} catch (ReflectiveOperationException e) {
throw new AerospikeException(e);
} finally {
LoadedObjectResolver.end();
ThreadLocalKeySaver.clear();
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/main/java/com/aerospike/mapper/tools/LoadedObjectResolver.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package com.aerospike.mapper.tools;

import com.aerospike.client.Key;

import java.util.HashMap;
import java.util.Map;

import com.aerospike.client.Key;

public class LoadedObjectResolver {

private static class LoadedObjectMap {
private int referenceCount = 0;
private final Map<Key, Object> objectMap = new HashMap<>();
}

private static final ThreadLocal<LoadedObjectMap> threadLocalObjects = ThreadLocal.withInitial(LoadedObjectMap::new);

private LoadedObjectResolver() {
}

public static void begin() {
LoadedObjectMap map = threadLocalObjects.get();
map.referenceCount++;
Expand All @@ -23,7 +21,7 @@ public static void end() {
LoadedObjectMap map = threadLocalObjects.get();
map.referenceCount--;
if (map.referenceCount == 0) {
map.objectMap.clear();
threadLocalObjects.remove();
}
}

Expand All @@ -39,4 +37,9 @@ public static Object get(Key key) {
LoadedObjectMap map = threadLocalObjects.get();
return map.objectMap.get(key);
}
}

private static class LoadedObjectMap {
private final Map<Key, Object> objectMap = new HashMap<>();
private int referenceCount = 0;
}
}
47 changes: 26 additions & 21 deletions src/main/java/com/aerospike/mapper/tools/ReactiveAeroMapper.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package com.aerospike.mapper.tools;

import java.util.Arrays;
import java.util.Objects;
import java.util.function.Function;

import javax.validation.constraints.NotNull;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
Expand All @@ -24,28 +18,20 @@
import com.aerospike.mapper.tools.converters.MappingConverter;
import com.aerospike.mapper.tools.utils.MapperUtils;
import com.aerospike.mapper.tools.virtuallist.ReactiveVirtualList;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Function;

public class ReactiveAeroMapper implements IReactiveAeroMapper {

private final IAerospikeReactorClient reactorClient;
private final IAeroMapper aeroMapper;
private final MappingConverter mappingConverter;

/**
* Create a new Builder to instantiate the AeroMapper.
* @author tfaulkes
*
*/
public static class Builder extends AbstractBuilder<ReactiveAeroMapper> {
public Builder(IAerospikeReactorClient reactorClient) {
super(new ReactiveAeroMapper(reactorClient));
ClassCache.getInstance().setReactiveDefaultPolicies(reactorClient);
}
}

private ReactiveAeroMapper(@NotNull IAerospikeReactorClient reactorClient) {
this.reactorClient = reactorClient;
this.aeroMapper = new AeroMapper.Builder(reactorClient.getAerospikeClient()).build();
Expand Down Expand Up @@ -205,7 +191,15 @@ public <T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNu
return readBatch(batchPolicy, clazz, keys, entry, operations);
}

private <T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key, @NotNull ClassCacheEntry<T> entry, boolean resolveDependencies) {
@SuppressWarnings("unchecked")
private <T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key,
@NotNull ClassCacheEntry<T> entry, boolean resolveDependencies) {
if (readPolicy == null || readPolicy.filterExp == null) {
Object objectForKey = LoadedObjectResolver.get(key);
if (objectForKey != null) {
return Mono.just((T) objectForKey);
}
}
if (readPolicy == null) {
readPolicy = entry.getReadPolicy();
}
Expand Down Expand Up @@ -368,7 +362,8 @@ public <T> ReactiveVirtualList<T> asBackedList(@NotNull Object object, @NotNull
}

@Override
public <T> ReactiveVirtualList<T> asBackedList(@NotNull Class<?> owningClazz, @NotNull Object key, @NotNull String binName, Class<T> elementClazz) {
public <T> ReactiveVirtualList<T> asBackedList(@NotNull Class<?> owningClazz, @NotNull Object key,
@NotNull String binName, Class<T> elementClazz) {
return new ReactiveVirtualList<>(this, owningClazz, key, binName, elementClazz);
}

Expand Down Expand Up @@ -454,4 +449,14 @@ public Mono<Key> getRecordKey(Object obj) {
ClassCacheEntry<?> entry = ClassCache.getInstance().loadClass(obj.getClass(), this);
return entry == null ? null : Mono.just(new Key(entry.getNamespace(), entry.getSetName(), Value.get(entry.getKey(obj))));
}

/**
* Create a new Builder to instantiate the AeroMapper.
*/
public static class Builder extends AbstractBuilder<ReactiveAeroMapper> {
public Builder(IAerospikeReactorClient reactorClient) {
super(new ReactiveAeroMapper(reactorClient));
ClassCache.getInstance().setReactiveDefaultPolicies(reactorClient);
}
}
}
18 changes: 13 additions & 5 deletions src/main/java/com/aerospike/mapper/tools/ThreadLocalKeySaver.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
package com.aerospike.mapper.tools;

import com.aerospike.client.Key;

import java.util.ArrayDeque;
import java.util.Deque;

import com.aerospike.client.Key;

/**
* Save the keys. Note that this is effectively a stack of keys, as A can load B which can load C, and C needs B's key, not A's.
*
* @author timfaulkes
* Save the keys. Note that this is effectively a stack of keys, as A can load B which can load C, and C needs B's key,
* not A's.
*/
public class ThreadLocalKeySaver {

private static final ThreadLocal<Deque<Key>> threadLocalKeys = ThreadLocal.withInitial(ArrayDeque::new);

private ThreadLocalKeySaver() {
}

public static void save(Key key) {
threadLocalKeys.get().addLast(key);
LoadedObjectResolver.begin();
}

public static void clear() {
LoadedObjectResolver.end();
threadLocalKeys.get().removeLast();
if (threadLocalKeys.get().isEmpty()) {
threadLocalKeys.remove();
}
}

public static Key get() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.aerospike.mapper.reactive;

import com.aerospike.mapper.RecursiveObjectTest;
import com.aerospike.mapper.tools.ReactiveAeroMapper;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class ReactiveRecursiveObjectTest extends ReactiveAeroMapperBaseTest {

@Test
public void runTest() {
RecursiveObjectTest.A a1 = new RecursiveObjectTest.A("a", 10, 1);
a1.a = a1;

ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();
reactiveMapper.save(a1).block();

RecursiveObjectTest.A a2 = reactiveMapper.read(RecursiveObjectTest.A.class, a1.id).block();
assertNotNull(a2);
assertEquals(a1.age, a2.age);
assertEquals(a1.name, a2.name);
assertEquals(a1.id, a2.id);
}

@Test
public void runMultipleObjectTest() {
RecursiveObjectTest.A a1 = new RecursiveObjectTest.A("a", 10, 1);
a1.a = a1;
RecursiveObjectTest.B b = new RecursiveObjectTest.B();
b.id = 10;
b.a = a1;

ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();
reactiveMapper.save(a1).block();
reactiveMapper.save(b).block();

RecursiveObjectTest.B b2 = reactiveMapper.read(RecursiveObjectTest.B.class, b.id).block();
assertNotNull(b2);
assertEquals(b.id, b2.id);
assertEquals(b.a.age, b2.a.age);
assertEquals(b.a.name, b2.a.name);
assertEquals(b.a.id, b2.a.id);
}

@Test
public void runTest2() {
RecursiveObjectTest.A a1 = new RecursiveObjectTest.A("a", 10, 1);
a1.a = new RecursiveObjectTest.A("a2", 11, 11);

ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();
reactiveMapper.save(a1, a1.a).blockLast();

RecursiveObjectTest.A a2 = reactiveMapper.read(RecursiveObjectTest.A.class, a1.id).block();
assertNotNull(a2);
assertEquals(a1.age, a2.age);
assertEquals(a1.name, a2.name);
assertEquals(a1.id, a2.id);
assertEquals(a1.a.id, a2.a.id);
}
}

0 comments on commit 1562be5

Please sign in to comment.