20
20
21
21
import com .google .common .annotations .VisibleForTesting ;
22
22
import io .prometheus .client .Gauge ;
23
+ import java .util .ArrayDeque ;
23
24
import java .util .Optional ;
24
25
import java .util .Queue ;
25
26
import java .util .concurrent .ScheduledExecutorService ;
26
27
import java .util .concurrent .TimeUnit ;
27
28
import java .util .function .Consumer ;
28
29
import lombok .extern .slf4j .Slf4j ;
29
- import org .jctools .queues .SpscArrayQueue ;
30
30
31
31
@ Slf4j
32
32
public class InflightReadsLimiter {
@@ -37,6 +37,7 @@ public class InflightReadsLimiter {
37
37
.help ("Estimated number of bytes retained by data read from storage or cache" )
38
38
.register ();
39
39
40
+ private final int maxReadsInFlightAcquireQueueSize ;
40
41
private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
41
42
.build ()
42
43
.name ("pulsar_ml_reads_available_inflight_bytes" )
@@ -64,9 +65,10 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui
64
65
this .remainingBytes = maxReadsInFlightSize ;
65
66
this .acquireTimeoutMillis = acquireTimeoutMillis ;
66
67
this .timeOutExecutor = timeOutExecutor ;
68
+ this .maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize ;
67
69
if (maxReadsInFlightSize > 0 ) {
68
70
enabled = true ;
69
- this .queuedHandles = new SpscArrayQueue <>(maxReadsInFlightAcquireQueueSize );
71
+ this .queuedHandles = new ArrayDeque <>();
70
72
} else {
71
73
enabled = false ;
72
74
this .queuedHandles = null ;
@@ -129,13 +131,14 @@ private synchronized Optional<Handle> internalAcquire(long permits, Consumer<Han
129
131
updateMetrics ();
130
132
return Optional .of (new Handle (maxReadsInFlightSize , handle .creationTime , true ));
131
133
} else {
132
- if (queuedHandles .offer (new QueuedHandle (handle , callback ))) {
133
- scheduleTimeOutCheck (acquireTimeoutMillis );
134
- return Optional .empty ();
135
- } else {
134
+ if (queuedHandles .size () >= maxReadsInFlightAcquireQueueSize ) {
136
135
log .warn ("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}" ,
137
136
permits , handle .creationTime , remainingBytes );
138
137
return Optional .of (new Handle (0 , handle .creationTime , false ));
138
+ } else {
139
+ queuedHandles .offer (new QueuedHandle (handle , callback ));
140
+ scheduleTimeOutCheck (acquireTimeoutMillis );
141
+ return Optional .empty ();
139
142
}
140
143
}
141
144
}
0 commit comments