-
Notifications
You must be signed in to change notification settings - Fork 0
/
autoswitcher_multiview.py
302 lines (279 loc) · 13.9 KB
/
autoswitcher_multiview.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
from time import monotonic
from sounddevice import sleep, query_devices, Stream
from numpy import linalg
from obswebsocket import obsws, requests
from random import choice, random
from functools import partial, wraps
from json import load
from contextlib import ExitStack
from argparse import ArgumentParser
from html_writer import write_bubble, write_css
from os import path
from obswebsocket.exceptions import ConnectionFailure
from websocket._exceptions import WebSocketConnectionClosedException
import tkinter as tk
def called(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not wrapper.called:
wrapper.old_string = "Initialisation terminée !"
wrapper.new_string = "Initialisation terminée !"
wrapper.patounes_active = False
wrapper.called = True
return func(*args, **kwargs)
wrapper.called = False
return wrapper
def callback(indata, outdata, frames, time, status, buffer: list, intcode: int) -> None:
"""Callback func to update buffers
Args:
buffer (list): target buffer
"""
volume_norm_in = int(linalg.norm(indata)*10) # *NORMALISER[intcode]
if volume_norm_in > THRESHOLD:
# add stuff if above threshold
buffering('increase', volume_norm_in, buffer)
else:
# remove stuff if above threshold
buffering('decrease', volume_norm_in, buffer)
def buffering(status: str, value: int, buffer: list) -> None:
"""Updates buffer status
Args:
status (str): increase or decrease instruction
value (int): dB level to be write
buffer (list): targeted buffer
"""
match status, len(buffer):
case 'increase', 14:
pass
case 'increase', _:
buffer.append(value)
case 'decrease', 0:
pass
case 'decrease', _:
buffer.remove(buffer[-1])
@called
def scene_caller(ws: obsws, delay: int, future_delay: int, requested_name: str, override: bool) -> tuple:
"""Calls for a specific OBS Studio scene
Args:
ws (obsws): a connexion socket
delay (int): current dealy
future_delay (int): delay to match against
requested_name (str): name of scene request
override (bool): if time should be accounted
Returns:
tuple: delay informations
"""
if monotonic() - delay > future_delay:
old_scene = ws.call(requests.GetCurrentScene()).getName()
ws.call(requests.SetCurrentScene(requested_name))
delay = monotonic()
if override:
future_delay = 3
else:
future_delay = choice([6, 8, 10])
# calling for html writing only if Patounes is displayed
if scene_caller.patounes_active:
while scene_caller.new_string == scene_caller.old_string:
match requested_name.split('_')[-1]:
case 'Yoka':
individual_quotes = [
"Il devait pas écrire Ja'eel, lui ?",
"Tueur de dragon... dans ses rêves !",
"Ça sent le sel par ici non ?",
"Pitié que quelqu'un lui coupe la parole"
]
case 'Tharos':
individual_quotes = [
"Ouh, ça fait mal à la tête...",
"J'avais tout compris, puis il m'a perdu !",
"En tant que tel.",
"Non, il ne parle pas de D&D"
]
case _:
individual_quotes = [
"Radieux comme un zénith d'été !",
"Maman ! Regarde ! Je passe à la télé !",
"On reste un peu sur ce plan ?",
"Où sont mes croquettes ?"
]
if old_scene != requested_name:
scene_caller.new_string = choice(
[
"Oops, trop long sur ce plan !",
f"Attends, je switch vers {requested_name.split('_')[-1]}",
"Oooh, c'est super joli ici !",
"Vous vouliez changer de vue ?",
"On va voir d'autres têtes sympathiques !",
"Lancement d'un plan de domination du monde... demain",
"Soyez attentifs, contrôle la prochaine fois !",
"Lui, c'est mon humain préféré ! Il s'appelle comment déjà ?",
"Quel sujet intéressant ! Mais je vois pas le rapport avec les chats ?",
"42 ! Je le savais ! Euh ... C'était quoi la question déjà ?",
"N'ETES VOUS PAS ASSEZ DIVERTIS !",
"Rolistes ! Rassemblement !"
])
else:
scene_caller.new_string = choice(
[
f"On reste un peu sur {requested_name.split('_')[-1]} ?",
"C'est pas si mal, ici !",
"Vous vouliez garder cette vue ?",
"Ca parle longtemps, par ici !",
"01101101 01100101 01101111 01110111",
"Quelqu'un a des questions ?",
"<i>ronronne</i>",
"Mon rêve est de devenir un véritable petit chat !",
"J'ai des articles de qualité ici <a href=https://utip.io/tharos/shop>utip.io/tharos</a>",
"Vous êtes a couper le souffle !",
"On va encore dépasser les horaires ...",
"Et on oublie pas de follow !",
"Rolls ! Egalement utilisable en mode ASMR !",
"Ca va partir en débat ca ...",
"Mais ! c'était pas dans le script !",
"Par le pouvoir du dé ancestral ! Je suis un chat de combat ?",
"Pourquoi un corbeau ressemble t-il à un bureau ?",
"J'étais un chaventurier avant, mais j'ai pris une ligne de code dans le genou",
"Nous, les Patounes, sommes des bots fiers",
"Patounes ! Quel est votre métier ! Miaou ! Miaou !"
] + individual_quotes)
write_bubble(scene_caller.old_string,
scene_caller.new_string, future_delay+0.5)
scene_caller.old_string = scene_caller.new_string
return delay, future_delay
###################### MAIN ########################
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"-d", "--devices", help="Prints the list of available devices and exits", action='store_true')
parser.add_argument('creds', nargs='?', help="Optional path to creditentials",
default=f"{path.dirname(__file__)}/creditentials.json")
args = parser.parse_args()
if (args.devices):
for device in query_devices():
print(device)
else:
write_css()
# init mapping => you need to set names that are not ambiguous on your system !!!
ORATOR_DEVICES: list = [
('Tharos', 'Chat Mic'),
('Yoka', 'VoiceMeeter Output'),
('Invité_1', 'VoiceMeeter Aux Output'),
# ('Invité_2', 'VoiceMeeter VAIO3 Output')
]
# Forming mapping between devices and orators
assignator: dict = {}
print("Starting mapping...")
for orator, target in ORATOR_DEVICES:
for i, device in enumerate(query_devices()):
if target in device['name'] and orator not in assignator:
print(
f" Orator {orator} has been assigned to device #{i}, namely {device['name']}!")
if device['max_input_channels'] == 0:
print(
f" /!\\ Beware, device {device['name']} does not have inputs. It may cause crashes.")
assignator[orator] = i
# init memories
THRESHOLD: int = 5
USERS: list[str] = [user for (user, _) in ORATOR_DEVICES]
DEVICES: list[tuple] = [(assignator[user], None) for user in USERS]
BUFFERS: list[list] = [[] for _ in USERS]
NUM_ORATORS: list = [2, 3, 4]
# init scenes to work with
# list of solo fullscreen scenes
SCENE_SPEAKER: list[str] = [f'Rolls_Solo_{user}' for user in USERS]
# list of solo edito scenes
SCENE_EDITO: list[str] = [f'Rolls_Edito_{user}' for user in USERS]
# list of scenes to use when no one's talking
SCENE_FILL: list[str] = [f'Rolls_Main_{user}' for user in USERS]
# SCENE_FILL: list[str] = ['Rolls_Multicam']
# list of scenes software is allowed to switch from
SUPPORTED_SCENES: list[str] = SCENE_SPEAKER + SCENE_FILL
# where Patounes websource is
NAME_OF_EMBED_SCENE: str = "--Patounes"
scene_caller.patounes_active = False
self.timer: int = 0
root = tk.Tk()
root.geometry(f"160x{(len(NUM_ORATORS)*160)//3}")
root.title('Autoswitcher')
root.resizable(0, 0)
tk.Grid.columnconfigure(root, 0, weight=1)
for i in range(len(NUM_ORATORS)):
tk.Grid.rowconfigure(root, i, weight=1)
button_list = [tk.Button(
master=root, text=f"{orator} orators", bg='#2f3136', fg='white', command=exit) for orator in NUM_ORATORS]
[button.grid(sticky="nswe", column=0, row=i)
for i, button in enumerate(button_list)]
# Loading creditentials for OBSwebsocket
print("Loading creditentials...")
with open(args.creds, 'r') as creds:
creditentials: dict = load(creds)
ws = obsws(creditentials["host"],
creditentials["port"], creditentials["password"])
delay, future_delay = monotonic(), 2
write_bubble("Initialisation terminée !",
"Initialisation terminée !", future_delay)
try:
print("Connecting to OBS...")
ws.connect()
for scene in SCENE_EDITO + SUPPORTED_SCENES:
ws.call(requests.SetSceneItemRender(
scene_name=scene, source=NAME_OF_EMBED_SCENE, render=False))
print("Opening fluxes...")
with ExitStack() as stream_stack:
streams = [stream_stack.enter_context(Stream(device=DEVICES[i], callback=partial(
callback, buffer=BUFFERS[i], intcode=i))) for i, _ in enumerate(USERS)]
print("Starting main loop!")
except KeyboardInterrupt:
ws.disconnect()
print("Connexion closed!")
except ConnectionFailure:
print(
"Could not connect to OBS ; please check creditentials file and if OBS is up and running.")
except WebSocketConnectionClosedException:
print("Connexion to OBS was prematurely closed ; aborting...")
except ConnectionRefusedError:
print("OBS refused connexion to the switcher.")
class Autoswitcher:
def __init__(self) -> None:
self.timer = 0
self.patounes_active = False
self.old_string = ""
self.new_string = ""
def switch_loop(self):
# random condition to make Patounes appear
if self.patounes_active == False and random() < 0.03 and self.timer > 70.0:
self.timer = 0 # reset self.timer for showing/hiding
self.patounes_active = True
self.old_string = "Initialisation terminée !"
self.new_string = "Initialisation terminée !"
[ws.call(requests.SetSceneItemRender(
scene_name=scene, source=NAME_OF_EMBED_SCENE, render=True)) for scene in [*SCENE_EDITO, *SUPPORTED_SCENES]]
if self.patounes_active == True and random() < 0.03 and self.timer > 14.0:
self.timer = 0 # reset self.timer for showing/hiding
self.patounes_active = False
[ws.call(requests.SetSceneItemRender(
scene_name=scene, source=NAME_OF_EMBED_SCENE, render=False)) for scene in [*SCENE_EDITO, *SUPPORTED_SCENES]]
write_bubble("Initialisation terminée !",
"Initialisation terminée !", future_delay)
# main loop to switch scenes
name = ws.call(requests.GetCurrentScene()).getName()
if name in SUPPORTED_SCENES:
if max([len(bf) for bf in BUFFERS]) > 5:
target = [len(bf) for bf in BUFFERS].index(
max([len(bf) for bf in BUFFERS]))
if name in SCENE_SPEAKER:
delay, future_delay = scene_caller(
ws, delay, future_delay, choice([SCENE_SPEAKER[target], SCENE_FILL[target]]), False)
else:
delay, future_delay = scene_caller(
ws, delay, future_delay, SCENE_SPEAKER[target], False)
else:
delay, future_delay = scene_caller(
ws, delay, future_delay, choice(SCENE_FILL), False)
elif name in SCENE_EDITO:
target = [sum(bf) for bf in BUFFERS].index(
max([sum(bf) for bf in BUFFERS]))
delay, future_delay = scene_caller(
ws, delay, future_delay, SCENE_EDITO[target], True)
self.timer += 0.2
self.after(200, self.switch_loop)