Skip to content

Commit

Permalink
Fix memory leak in Concurrent/ThreadSafeLocalContextProvider
Browse files Browse the repository at this point in the history
Uses the Cleaner API to run LocalContext.remove() when a Thread has
terminated, but also eagerly calls all LocalContext.remove() on
terminate().
  • Loading branch information
matthias-fratz-bsz committed Dec 4, 2024
1 parent 9d63c22 commit 3b0c8d1
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,7 @@
* @author Yoko Harada &lt;<a href="mailto:[email protected]">[email protected]</a>&gt;
*/
public class ConcurrentLocalContextProvider extends AbstractLocalContextProvider {
private volatile ConcurrentLinkedQueue<AtomicReference<LocalContext>> contextRefs =
new ConcurrentLinkedQueue<AtomicReference<LocalContext>>();

private final ThreadLocal<AtomicReference<LocalContext>> contextHolder =
new ThreadLocal<AtomicReference<LocalContext>>() {
@Override
public AtomicReference<LocalContext> initialValue() {
AtomicReference<LocalContext> contextRef = null;

try {
contextRef = new AtomicReference<LocalContext>(getInstance());
contextRefs.add(contextRef);
return contextRef;
} catch (NullPointerException npe) {
if (contextRefs == null) {
// contextRefs became null, we've been terminated
if (contextRef != null) {
contextRef.get().remove();
}

return null;
} else {
throw npe;
}
}
}
};
private final ThreadLocalContext contextHolder = new ThreadLocalContext(this::getInstance);

public ConcurrentLocalContextProvider(LocalVariableBehavior behavior) {
super( getGlobalRuntimeConfigOrNew(), behavior );
Expand All @@ -93,12 +67,12 @@ public RubyInstanceConfig getRubyInstanceConfig() {

@Override
public BiVariableMap getVarMap() {
return contextHolder.get().get().getVarMap(this);
return contextHolder.get().getVarMap(this);
}

@Override
public Map getAttributeMap() {
return contextHolder.get().get().getAttributeMap();
return contextHolder.get().getAttributeMap();
}

@Override
Expand All @@ -108,20 +82,7 @@ public boolean isRuntimeInitialized() {

@Override
public void terminate() {
ConcurrentLinkedQueue<AtomicReference<LocalContext>> terminated = contextRefs;
contextRefs = null;

if (terminated != null) {
for (AtomicReference<LocalContext> contextRef : terminated) {
contextRef.get().remove();
contextRef.lazySet(null);
}

terminated.clear();
}

contextHolder.remove();
contextHolder.set(null);
contextHolder.terminate();
}

}
115 changes: 115 additions & 0 deletions core/src/main/java/org/jruby/embed/internal/ThreadLocalContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* **** BEGIN LICENSE BLOCK *****
* Version: EPL 2.0/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Eclipse Public
* License Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of
* the License at http://www.eclipse.org/legal/epl-v20.html
*
* Software distributed under the License is distributed on an "AS
* IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
* implied. See the License for the specific language governing
* rights and limitations under the License.
*
* Copyright (C) 2011 Yoko Harada <[email protected]>
*
* Alternatively, the contents of this file may be used under the terms of
* either of the GNU General Public License Version 2 or later (the "GPL"),
* or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the EPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the EPL, the GPL or the LGPL.
* **** END LICENSE BLOCK *****
*/
package org.jruby.embed.internal;

import java.lang.ref.Cleaner;
import java.lang.ref.Cleaner.Cleanable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Utility class that wraps a {@link ThreadLocal} holding a {@link LocalContext}
* for each thread. Uses the {@link Cleaner} API to remove this
* {@code LocalContext} once either the corresponding {@code Thread} has
* terminated, or the {@code ThreadLocalContext} instance itself becomes GC'd.
*/
class ThreadLocalContext {
private final ConcurrentHashMap<LocalContextCleaningAction, Object> contextRefs = new ConcurrentHashMap<>();
private final Cleaner cleaner = Cleaner.create();
private final Supplier<LocalContext> localContextFactory;

public ThreadLocalContext(final Supplier<LocalContext> localContextFactory) {
this.localContextFactory = localContextFactory;
}

private volatile ThreadLocal<AtomicReference<LocalContextCleaningAction>> contextHolder = new ThreadLocal<AtomicReference<LocalContextCleaningAction>>() {
@Override
protected AtomicReference<LocalContextCleaningAction> initialValue() {
final LocalContextCleaningAction ctx = new LocalContextCleaningAction(contextRefs,
localContextFactory.get());
final AtomicReference<LocalContextCleaningAction> ref = new AtomicReference<>(ctx);
// register cleaner to run as soon as the *reference* to it gets GC'd. that
// happens when either the Thread terminates, or the ThreadLocal itself gets
// GC'd (i.e. when this class gets GC'd because terminate() was never called).
// see ThreadLocal JavaDoc: ref will stay reachable "as long as the thread is
// alive and the ThreadLocal instance is accessible"
final Cleanable cleanable = cleaner.register(ref, ctx);
if (contextHolder == null)
// boundary case if we're already terminating: clean up immediately, because
// there is no more cleanup thread to do that later
// the returned context will be null, but that's to be expected when operating
// on an object that has been terminated
cleanable.clean();
return ref;
}
};

public LocalContext get() {
return contextHolder.get().get().get();
}

public void terminate() {
contextHolder = null;
for (final LocalContextCleaningAction ref : contextRefs.keySet())
ref.run();
}

/*
* Runnable that actually performs the cleanup for per-thread LocalContext
* instances. MUST be static because these are registered with a Cleaner, so
* everything that is GC-reachable from them will stay reachable until the
* cleaning action has been run.
*/
private static class LocalContextCleaningAction extends AtomicReference<LocalContext> implements Runnable {
private static final long serialVersionUID = 1L;

private final ConcurrentHashMap<LocalContextCleaningAction, Object> contextRefs;

private LocalContextCleaningAction(final ConcurrentHashMap<LocalContextCleaningAction, Object> contextRefs,
final LocalContext context) {
super(context);
this.contextRefs = contextRefs;
contextRefs.put(this, this);
}

@Override
public void run() {
// terminate() vs. clean() relies on this being safe to run multiple times, and
// possibly concurrently
// both AtomicReference.getAndSet() and ConcurrentHashMap.remove() are fully
// thread safe, so the current implementation is fine
final LocalContext lc = getAndSet(null);
if (lc != null)
lc.remove();
contextRefs.remove(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,7 @@
* @author Yoko Harada &lt;<a href="mailto:[email protected]">[email protected]</a>&gt;
*/
public class ThreadSafeLocalContextProvider extends AbstractLocalContextProvider {
private volatile ConcurrentLinkedQueue<AtomicReference<LocalContext>> contextRefs =
new ConcurrentLinkedQueue<AtomicReference<LocalContext>>();

private final ThreadLocal<AtomicReference<LocalContext>> contextHolder =
new ThreadLocal<AtomicReference<LocalContext>>() {
@Override
public AtomicReference<LocalContext> initialValue() {
AtomicReference<LocalContext> contextRef = null;

try {
contextRef = new AtomicReference<LocalContext>(getInstance());
contextRefs.add(contextRef);
return contextRef;
} catch (NullPointerException npe) {
if (contextRefs == null) {
// contextRefs became null, we've been terminated
if (contextRef != null) {
contextRef.get().remove();
}

return null;
} else {
throw npe;
}
}
}
};
private final ThreadLocalContext contextHolder = new ThreadLocalContext(this::getInstance);

public ThreadSafeLocalContextProvider(LocalVariableBehavior behavior) {
super(behavior);
Expand All @@ -79,40 +53,27 @@ public ThreadSafeLocalContextProvider(LocalVariableBehavior behavior, boolean la

@Override
public Ruby getRuntime() {
return contextHolder.get().get().getRuntime();
return contextHolder.get().getRuntime();
}

@Override
public BiVariableMap getVarMap() {
return contextHolder.get().get().getVarMap(this);
return contextHolder.get().getVarMap(this);
}

@Override
public Map getAttributeMap() {
return contextHolder.get().get().getAttributeMap();
return contextHolder.get().getAttributeMap();
}

@Override
public boolean isRuntimeInitialized() {
return contextHolder.get().get().isInitialized();
return contextHolder.get().isInitialized();
}

@Override
public void terminate() {
ConcurrentLinkedQueue<AtomicReference<LocalContext>> terminated = contextRefs;
contextRefs = null;

if (terminated != null) {
for (AtomicReference<LocalContext> contextRef : terminated) {
contextRef.get().remove();
contextRef.lazySet(null);
}

terminated.clear();
}

contextHolder.remove();
contextHolder.set(null);
contextHolder.terminate();
}

}

0 comments on commit 3b0c8d1

Please sign in to comment.