@@ -2,6 +2,7 @@ use std::sync::{LazyLock, OnceLock};
2
2
3
3
pub use jobserver_crate:: { Acquired , Client , HelperThread } ;
4
4
use jobserver_crate:: { FromEnv , FromEnvErrorKind } ;
5
+ use parking_lot:: { Condvar , Mutex } ;
5
6
6
7
// We can only call `from_env_ext` once per process
7
8
@@ -52,8 +53,6 @@ fn default_client() -> Client {
52
53
client
53
54
}
54
55
55
- static GLOBAL_CLIENT_CHECKED : OnceLock < Client > = OnceLock :: new ( ) ;
56
-
57
56
pub fn initialize_checked ( report_warning : impl FnOnce ( & ' static str ) ) {
58
57
let client_checked = match & * GLOBAL_CLIENT {
59
58
Ok ( client) => client. clone ( ) ,
@@ -62,19 +61,112 @@ pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
62
61
default_client ( )
63
62
}
64
63
} ;
65
- GLOBAL_CLIENT_CHECKED . set ( client_checked) . ok ( ) ;
64
+ let proxy = Proxy {
65
+ client : client_checked,
66
+ data : Mutex :: new ( ProxyData { total : 1 , used : 1 , needed : 0 } ) ,
67
+ wake_needer : Condvar :: new ( ) ,
68
+ wake_helper : Condvar :: new ( ) ,
69
+ } ;
70
+ GLOBAL_PROXY . set ( proxy) . ok ( ) ;
71
+
72
+ std:: thread:: spawn ( || {
73
+ GLOBAL_PROXY . get ( ) . unwrap ( ) . helper ( ) ;
74
+ } ) ;
66
75
}
67
76
68
77
const ACCESS_ERROR : & str = "jobserver check should have been called earlier" ;
69
78
70
79
pub fn client ( ) -> Client {
71
- GLOBAL_CLIENT_CHECKED . get ( ) . expect ( ACCESS_ERROR ) . clone ( )
80
+ GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . client . clone ( )
72
81
}
73
82
74
83
pub fn acquire_thread ( ) {
75
- GLOBAL_CLIENT_CHECKED . get ( ) . expect ( ACCESS_ERROR ) . acquire_raw ( ) . ok ( ) ;
84
+ GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . acquire_thread ( ) ;
76
85
}
77
86
78
87
pub fn release_thread ( ) {
79
- GLOBAL_CLIENT_CHECKED . get ( ) . expect ( ACCESS_ERROR ) . release_raw ( ) . ok ( ) ;
88
+ GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . release_thread ( ) ;
89
+ }
90
+
91
+ static GLOBAL_PROXY : OnceLock < Proxy > = OnceLock :: new ( ) ;
92
+
93
+ struct ProxyData {
94
+ /// The number of tokens assigned to this process.
95
+ total : u16 ,
96
+
97
+ /// The number of tokens assigned to threads.
98
+ used : u16 ,
99
+
100
+ /// The number of threads requesting a token
101
+ needed : u16 ,
102
+ }
103
+
104
+ /// This is a jobserver proxy used to ensure that we hold on to at least one token.
105
+ struct Proxy {
106
+ client : Client ,
107
+ data : Mutex < ProxyData > ,
108
+
109
+ /// Threads which are waiting on a token will wait on this.
110
+ wake_needer : Condvar ,
111
+
112
+ /// This is used to wake the helper thread when tokens are needed.
113
+ wake_helper : Condvar ,
114
+ }
115
+
116
+ impl Proxy {
117
+ fn helper ( & self ) {
118
+ let mut data = self . data . lock ( ) ;
119
+ loop {
120
+ while data. needed > 0 {
121
+ drop ( data) ;
122
+ self . client . acquire_raw ( ) . ok ( ) ;
123
+ data = self . data . lock ( ) ;
124
+ if data. needed > 0 {
125
+ data. total += 1 ;
126
+ data. used += 1 ;
127
+ data. needed -= 1 ;
128
+ self . wake_needer . notify_one ( ) ;
129
+ } else {
130
+ drop ( data) ;
131
+ self . client . release_raw ( ) . ok ( ) ;
132
+ data = self . data . lock ( ) ;
133
+ }
134
+ }
135
+ self . wake_helper . wait ( & mut data) ;
136
+ }
137
+ }
138
+
139
+ fn acquire_thread ( & self ) {
140
+ let mut data = self . data . lock ( ) ;
141
+
142
+ if data. total > data. used {
143
+ assert_eq ! ( data. needed, 0 ) ;
144
+ data. used += 1 ;
145
+ } else {
146
+ if data. needed == 0 {
147
+ self . wake_helper . notify_one ( ) ;
148
+ }
149
+ data. needed += 1 ;
150
+ self . wake_needer . wait ( & mut data) ;
151
+ }
152
+ }
153
+
154
+ fn release_thread ( & self ) {
155
+ let mut data = self . data . lock ( ) ;
156
+
157
+ if data. needed > 0 {
158
+ // Give the token to a waiting thread
159
+ data. needed -= 1 ;
160
+ self . wake_needer . notify_one ( ) ;
161
+ } else {
162
+ data. used -= 1 ;
163
+
164
+ // Release the token unless it's the last one in the process
165
+ if data. total > 1 {
166
+ data. total -= 1 ;
167
+ drop ( data) ;
168
+ self . client . release_raw ( ) . ok ( ) ;
169
+ }
170
+ }
171
+ }
80
172
}
0 commit comments