Skip to content

Commit fc757f0

Browse files
committed
Update access to the Vert.x local context for Panache
In preparation to the upgrade to Vertx. 5, Hibernate Reactive doesn't use anymore the deprecated API to access the local context. This commits updates the way Panache access the context so that the two extensions can comunicate. Fix #47314 Relats to the Hibernate Reactive issues: * hibernate/hibernate-reactive#2176 * hibernate/hibernate-reactive#2174 * hibernate/hibernate-reactive#2207
1 parent 49e1ebe commit fc757f0

File tree

4 files changed

+220
-28
lines changed

4 files changed

+220
-28
lines changed

extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/SessionOperations.java

+33-28
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
package io.quarkus.hibernate.reactive.panache.common.runtime;
22

3+
import static org.hibernate.reactive.context.impl.ContextualDataStorage.contextualDataMap;
4+
5+
import java.util.Map;
36
import java.util.function.Function;
47
import java.util.function.Supplier;
58

69
import org.hibernate.reactive.common.spi.Implementor;
710
import org.hibernate.reactive.context.Context.Key;
811
import org.hibernate.reactive.context.impl.BaseKey;
12+
import org.hibernate.reactive.context.impl.ContextualDataStorage;
913
import org.hibernate.reactive.mutiny.Mutiny;
1014
import org.hibernate.reactive.mutiny.Mutiny.Session;
11-
import org.hibernate.reactive.mutiny.Mutiny.SessionFactory;
1215
import org.hibernate.reactive.mutiny.Mutiny.Transaction;
1316

1417
import io.quarkus.arc.Arc;
1518
import io.quarkus.arc.ClientProxy;
1619
import io.quarkus.arc.impl.LazyValue;
1720
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
1821
import io.smallrye.mutiny.Uni;
19-
import io.vertx.core.Context;
20-
import io.vertx.core.Vertx;
22+
import io.vertx.core.impl.ContextInternal;
2123

2224
/**
2325
* Static util methods for {@link Mutiny.Session}.
@@ -29,7 +31,7 @@ public final class SessionOperations {
2931
private static final LazyValue<Mutiny.SessionFactory> SESSION_FACTORY = new LazyValue<>(
3032
new Supplier<Mutiny.SessionFactory>() {
3133
@Override
32-
public SessionFactory get() {
34+
public Mutiny.SessionFactory get() {
3335
// Note that Mutiny.SessionFactory is @ApplicationScoped bean - it's safe to use the cached client proxy
3436
Mutiny.SessionFactory sessionFactory = Arc.container().instance(Mutiny.SessionFactory.class).get();
3537
if (sessionFactory == null) {
@@ -50,8 +52,10 @@ public Key<Session> get() {
5052
});
5153

5254
// This key is used to indicate that a reactive session should be opened lazily (when needed) in the current vertx context
53-
private static final String SESSION_ON_DEMAND_KEY = "hibernate.reactive.panache.sessionOnDemand";
54-
private static final String SESSION_ON_DEMAND_OPENED_KEY = "hibernate.reactive.panache.sessionOnDemandOpened";
55+
private static final Key<Boolean> SESSION_ON_DEMAND_KEY = new BaseKey<>(Boolean.class,
56+
"hibernate.reactive.panache.sessionOnDemand");
57+
private static final Key<Boolean> SESSION_ON_DEMAND_OPENED_KEY = new BaseKey<>(Boolean.class,
58+
"hibernate.reactive.panache.sessionOnDemandOpened");
5559

5660
/**
5761
* Marks the current vertx duplicated context as "lazy" which indicates that a reactive session should be opened lazily if
@@ -63,17 +67,17 @@ public Key<Session> get() {
6367
* @see #getSession()
6468
*/
6569
static <T> Uni<T> withSessionOnDemand(Supplier<Uni<T>> work) {
66-
Context context = vertxContext();
67-
if (context.getLocal(SESSION_ON_DEMAND_KEY) != null) {
70+
Map<Key<Boolean>, Boolean> contextualDataMap = contextualDataMap(vertxContext());
71+
if (contextualDataMap.get(SESSION_ON_DEMAND_KEY) != null) {
6872
// context already marked - no need to set the key and close the session
6973
return work.get();
7074
} else {
7175
// mark the lazy session
72-
context.putLocal(SESSION_ON_DEMAND_KEY, true);
76+
contextualDataMap.put(SESSION_ON_DEMAND_KEY, Boolean.TRUE);
7377
// perform the work and eventually close the session and remove the key
7478
return work.get().eventually(() -> {
75-
context.removeLocal(SESSION_ON_DEMAND_KEY);
76-
context.removeLocal(SESSION_ON_DEMAND_OPENED_KEY);
79+
contextualDataMap.remove(SESSION_ON_DEMAND_KEY);
80+
contextualDataMap.remove(SESSION_ON_DEMAND_OPENED_KEY);
7781
return closeSession();
7882
});
7983
}
@@ -109,17 +113,17 @@ public static <T> Uni<T> withTransaction(Function<Transaction, Uni<T>> work) {
109113
* @return a new {@link Uni}
110114
*/
111115
public static <T> Uni<T> withSession(Function<Mutiny.Session, Uni<T>> work) {
112-
Context context = vertxContext();
116+
Map<Key<Session>, Session> contextualDataMap = contextualDataMap(vertxContext());
113117
Key<Mutiny.Session> key = getSessionKey();
114-
Mutiny.Session current = context.getLocal(key);
118+
Mutiny.Session current = contextualDataMap.get(key);
115119
if (current != null && current.isOpen()) {
116120
// reactive session exists - reuse this session
117121
return work.apply(current);
118122
} else {
119123
// reactive session does not exist - open a new one and close it when the returned Uni completes
120124
return getSessionFactory()
121125
.openSession()
122-
.invoke(s -> context.putLocal(key, s))
126+
.invoke(s -> contextualDataMap.put(key, s))
123127
.chain(work::apply)
124128
.eventually(SessionOperations::closeSession);
125129
}
@@ -140,22 +144,24 @@ public static <T> Uni<T> withSession(Function<Mutiny.Session, Uni<T>> work) {
140144
* @return the {@link Mutiny.Session}
141145
*/
142146
public static Uni<Mutiny.Session> getSession() {
143-
Context context = vertxContext();
144-
Key<Mutiny.Session> key = getSessionKey();
145-
Mutiny.Session current = context.getLocal(key);
147+
final ContextInternal context = vertxContext();
148+
final Key<Mutiny.Session> key = getSessionKey();
149+
final Mutiny.Session current = ContextualDataStorage.<Session> contextualDataMap(context).get(key);
150+
final Map<Key<Boolean>, Boolean> objectsDataMap = contextualDataMap(context);
146151
if (current != null && current.isOpen()) {
147152
// reuse the existing reactive session
148153
return Uni.createFrom().item(current);
149154
} else {
150-
if (context.getLocal(SESSION_ON_DEMAND_KEY) != null) {
151-
if (context.getLocal(SESSION_ON_DEMAND_OPENED_KEY) != null) {
155+
if (objectsDataMap.get(SESSION_ON_DEMAND_KEY) != null) {
156+
if (objectsDataMap.get(SESSION_ON_DEMAND_OPENED_KEY) != null) {
152157
// a new reactive session is opened in a previous stage
153158
return Uni.createFrom().item(SessionOperations::getCurrentSession);
154159
} else {
155160
// open a new reactive session and store it in the vertx duplicated context
156161
// the context was marked as "lazy" which means that the session will be eventually closed
157-
context.putLocal(SESSION_ON_DEMAND_OPENED_KEY, true);
158-
return getSessionFactory().openSession().invoke(s -> context.putLocal(key, s));
162+
objectsDataMap.put(SESSION_ON_DEMAND_OPENED_KEY, Boolean.TRUE);
163+
return getSessionFactory().openSession().invoke(s -> ContextualDataStorage
164+
.<Session> contextualDataMap(context).put(key, s));
159165
}
160166
} else {
161167
throw new IllegalStateException("No current Mutiny.Session found"
@@ -170,8 +176,7 @@ public static Uni<Mutiny.Session> getSession() {
170176
* @return the current reactive session stored in the context, or {@code null} if no session exists
171177
*/
172178
public static Mutiny.Session getCurrentSession() {
173-
Context context = vertxContext();
174-
Mutiny.Session current = context.getLocal(getSessionKey());
179+
Mutiny.Session current = ContextualDataStorage.<Session> contextualDataMap(vertxContext()).get(getSessionKey());
175180
if (current != null && current.isOpen()) {
176181
return current;
177182
}
@@ -184,8 +189,8 @@ public static Mutiny.Session getCurrentSession() {
184189
* @throws IllegalStateException If no vertx context is found or is not a safe context as mandated by the
185190
* {@link VertxContextSafetyToggle}
186191
*/
187-
private static Context vertxContext() {
188-
Context context = Vertx.currentContext();
192+
private static ContextInternal vertxContext() {
193+
ContextInternal context = ContextInternal.current();
189194
if (context != null) {
190195
VertxContextSafetyToggle.validateContextIfExists(ERROR_MSG, ERROR_MSG);
191196
return context;
@@ -195,11 +200,11 @@ private static Context vertxContext() {
195200
}
196201

197202
static Uni<Void> closeSession() {
198-
Context context = vertxContext();
199203
Key<Mutiny.Session> key = getSessionKey();
200-
Mutiny.Session current = context.getLocal(key);
204+
Map<Key<Session>, Session> contextualDataMap = contextualDataMap(vertxContext());
205+
Mutiny.Session current = contextualDataMap.get(key);
201206
if (current != null && current.isOpen()) {
202-
return current.close().eventually(() -> context.removeLocal(key));
207+
return current.close().eventually(() -> contextualDataMap.remove(key));
203208
}
204209
return Uni.createFrom().voidItem();
205210
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.quarkus.it.panache.reactive;
2+
3+
import jakarta.persistence.Entity;
4+
import jakarta.persistence.Id;
5+
6+
import io.quarkus.hibernate.reactive.panache.PanacheEntityBase;
7+
8+
@Entity
9+
public class Counter extends PanacheEntityBase {
10+
11+
@Id
12+
private Long id;
13+
private int count = 0;
14+
15+
public Counter() {
16+
}
17+
18+
public Counter(long id) {
19+
this.id = id;
20+
}
21+
22+
public Long getId() {
23+
return id;
24+
}
25+
26+
public int getCount() {
27+
return count;
28+
}
29+
30+
@Override
31+
public String toString() {
32+
return String.valueOf(count);
33+
}
34+
35+
public void increase() {
36+
this.count++;
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.quarkus.it.panache.reactive;
2+
3+
import static jakarta.persistence.LockModeType.PESSIMISTIC_WRITE;
4+
5+
import java.util.function.Function;
6+
import java.util.function.Supplier;
7+
8+
import jakarta.enterprise.context.ApplicationScoped;
9+
import jakarta.inject.Inject;
10+
11+
import org.hibernate.reactive.mutiny.Mutiny;
12+
13+
import io.quarkus.hibernate.reactive.panache.Panache;
14+
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
15+
import io.smallrye.mutiny.Uni;
16+
17+
/**
18+
* The goal if this class is to test the usage of {@link WithTransaction},
19+
* you should not use {@link io.quarkus.hibernate.reactive.panache.Panache#withTransaction(Supplier)}
20+
* nor {@link org.hibernate.reactive.mutiny.Mutiny.SessionFactory#withTransaction(Function)}
21+
*/
22+
@ApplicationScoped
23+
@WithTransaction
24+
public class WithTransactionCounterBean {
25+
26+
private static final Long ID_COUNTER = 42L;
27+
28+
@Inject
29+
Mutiny.SessionFactory sessionFactory;
30+
31+
public Uni<Counter> createOrResetCounter() {
32+
final Counter counter = new Counter(ID_COUNTER);
33+
return sessionFactory.withStatelessSession(session -> session
34+
.upsert(counter)
35+
.replaceWith(counter));
36+
}
37+
38+
public Uni<Counter> increaseCounterWithHR() {
39+
return sessionFactory.withSession(session -> session
40+
.find(Counter.class, ID_COUNTER, PESSIMISTIC_WRITE)
41+
.invoke(Counter::increase));
42+
}
43+
44+
public Uni<Counter> increaseCounterWithPanache() {
45+
return Counter
46+
.<Counter> findById(ID_COUNTER, PESSIMISTIC_WRITE)
47+
.invoke(Counter::increase);
48+
}
49+
50+
public Uni<Void> assertThatSessionsAreEqual() {
51+
return sessionFactory.withSession(hrSession -> Panache
52+
.getSession().chain(panacheSession -> {
53+
if (panacheSession != hrSession) {
54+
return Uni.createFrom().failure(new AssertionError("Sessions are different!"));
55+
}
56+
return Uni.createFrom().voidItem();
57+
}));
58+
}
59+
60+
public Uni<Counter> findCounter() {
61+
return sessionFactory.withSession(session -> session
62+
.find(Counter.class, ID_COUNTER));
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.quarkus.it.panache.reactive;
2+
3+
import static io.quarkus.vertx.VertxContextSupport.subscribeAndAwait;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
import static org.junit.jupiter.api.Assertions.fail;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.function.Supplier;
10+
11+
import jakarta.inject.Inject;
12+
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
16+
import io.quarkus.test.junit.QuarkusTest;
17+
import io.smallrye.mutiny.Uni;
18+
19+
/**
20+
* Hibernate Reactive and Panache store the created sessions in a local Vert.x context,
21+
* and we need to make sure that they won't get lost.
22+
* See issue <a href="https://github.com/quarkusio/quarkus/issues/47314">47314</a>
23+
*
24+
* @see io.quarkus.hibernate.reactive.panache.common.runtime.SessionOperations
25+
* @see io.quarkus.hibernate.reactive.panache.common.WithTransaction
26+
*/
27+
@QuarkusTest
28+
public class WithTransactionTest {
29+
// How many times we want to increase the counter
30+
private static final int INCREASES_NUM = 10;
31+
32+
@Inject
33+
WithTransactionCounterBean counterBean;
34+
35+
@BeforeEach
36+
public void createOrResetCounter() throws Throwable {
37+
subscribeAndAwait(counterBean::createOrResetCounter);
38+
}
39+
40+
@Test
41+
void increaseCounterWithHibernateReactive() throws Throwable {
42+
subscribeAndAwait(counterBean::increaseCounterWithHR);
43+
Counter counter = subscribeAndAwait(counterBean::findCounter);
44+
45+
assertThat(counter.getCount()).isEqualTo(1);
46+
}
47+
48+
@Test
49+
void increaseCounterWithPanache() throws Throwable {
50+
subscribeAndAwait(counterBean::increaseCounterWithPanache);
51+
Counter counter = subscribeAndAwait(counterBean::findCounter);
52+
53+
assertThat(counter.getCount()).isEqualTo(1);
54+
}
55+
56+
@Test
57+
void shouldReuseExistingSessions() throws Throwable {
58+
subscribeAndAwait(counterBean::assertThatSessionsAreEqual);
59+
}
60+
61+
@Test
62+
void increaseCounter() throws Throwable {
63+
List<Supplier<Uni<Counter>>> suppliers = new ArrayList<>();
64+
for (int i = 0; i < INCREASES_NUM; i++) {
65+
suppliers.add(() -> counterBean.increaseCounterWithHR());
66+
}
67+
68+
final List<Counter> results = new ArrayList<>();
69+
suppliers.stream()
70+
.parallel()
71+
.forEach(uni -> increaseCounter(uni, results));
72+
73+
final Counter dbCounter = subscribeAndAwait(() -> counterBean.findCounter());
74+
assertThat(dbCounter.getCount()).isEqualTo(INCREASES_NUM);
75+
}
76+
77+
private static void increaseCounter(Supplier<Uni<Counter>> func, List<Counter> counters) {
78+
try {
79+
final var counter = subscribeAndAwait(func);
80+
counters.add(counter);
81+
} catch (final Throwable e) {
82+
fail(e);
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)