-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patha_mapping.py
199 lines (191 loc) · 8.4 KB
/
a_mapping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import pyaudiowpatch as pyaudio
import numpy as np
import queue
import threading
import a_shared
import time
import a_openrgb
#######################
isRunning = False
Start = False
outputDevs=None # 輸出固定參數
inputDev=None # 輸入固定參數
np_type = np.float32
pya_type = pyaudio.paFloat32
#######################
def StartStream():
global isRunning,Start,inputDev,outputDevs,CHUNK
def getTime():
curTime = time.time()
ms = int((curTime % 1) * 1000)
localTime = time.localtime(curTime)
return f'{time.strftime("%H:%M:%S", localTime)}.{ms:03d}'
#輸出處理(重採樣,分配聲道)
def OutputProcesse(devName,indata,CHUNKFix,CH_num):
outdata = np.zeros((CHUNKFix,CH_num),dtype=np_type)
channelSet = a_shared.Config[devName]['channels']
for outCh,inCh_Vol in enumerate(channelSet):
if indata.size > 0:
inCh = int(inCh_Vol)
vol = (inCh_Vol % 1)*10
vol = 1 if vol > 0.99 else vol
if CHUNKFix/CHUNK == 1: # 原始
outdata[:,outCh] = indata[:,inCh]*vol
elif CHUNKFix/CHUNK == 1/2: # 1/2倍採樣
outdata[:,outCh] = indata[::2,inCh]*vol
elif CHUNKFix/CHUNK == 2: # 2倍採樣
outdata[:,outCh] = np.repeat(indata[:,inCh],2)*vol
else: # 其他
indices = np.linspace(0, CHUNK, CHUNKFix+1)
outdata[:,outCh] = np.interp(indices[:-1], np.arange(CHUNK),indata[:,inCh])*vol
return outdata
# 輸入處理
def callback_input(inCh):
def callback_A(in_data, frame_count, time_info, status):
indata = np.frombuffer(in_data, dtype=np_type).reshape(-1, inCh)
for devName,Dev in outputDevs.items():
if Dev['switch']:
IP = Dev['IP']
Queue = Dev['queue']
Queue.put(indata)
if IP: # 網路輸出裝置
CH_num = Dev['maxOutputChannels']
delay = round(a_shared.Config[devName]['delay']/Frametime)
Qsize = Queue.qsize()
if Qsize <= delay:
#print(f'[Time] {getTime()} Qsize:{Qsize} wait')
pass
else:
if Qsize > delay+1:
#print(f'[Time] {getTime()} Qsize:{Qsize} 降低延遲')
while Queue.qsize() > delay+1:
Queue.get_nowait()
outdata_bytes = OutputProcesse(devName,Queue.get(),CHUNK,CH_num).tobytes()
a_shared.to_server.put([IP,False,outdata_bytes])
if a_openrgb.Start:
a_openrgb.RGBQueue.put(indata)
return (in_data, pyaudio.paContinue)
return callback_A
# 本機輸出裝置
def callback_output(outdevName,Queue,CH_num,CHUNKFix):
def callback_B(in_data, frame_count, time_info, status):
Qsize = Queue.qsize()
delay = int(a_shared.Config[outdevName]['delay']/Frametime)
if Qsize < delay:
#print(f'[Time] {getTime()} Qsize:{Qsize} wait')
outdata = np.zeros((CHUNKFix,CH_num),dtype=np_type)
else:
if Qsize > delay + 2:
#print(f'[Time] {getTime()} Qsize:{Qsize} 降低延遲')
while Queue.qsize() > delay + 2:
Queue.get_nowait()
indata = Queue.get()
outdata = OutputProcesse(outdevName,indata,CHUNKFix,CH_num)
return (outdata, pyaudio.paContinue)
return callback_B
# 顯示延遲
def queueDelay():
for devName,Dev in outputDevs.items():
if Dev['switch']:
Qsize = Dev['queue'].qsize()
a_shared.to_GUI.put([5,[devName,f'{Qsize * Frametime:.1f}ms']])
if Qsize > 200: # 延遲太多重新掃描
a_shared.to_GUI.put([3,None])
# 開始
# 發送狀態
def sendState():
'對所有已連線裝置發送狀態'
a_shared.Header.volume = -1
for devName in outputDevs:
IP = outputDevs[devName].get('IP',False)
if IP:
a_shared.to_server.put([IP,True,None])
while True:
if Start:
isRunning = True
a_shared.to_GUI.put([1,isRunning]) #運作狀態
a_shared.to_GUI.put([2,'Start mapping'])
# 輸入聲道,samplerate
InputChannel = inputDev['maxInputChannels']
InputRate = int(inputDev['defaultSampleRate'])
# 初始化輸出流
Resample = False
Framerate = 100 #可以整除96/48/44.1KHz
Frametime = 1000/Framerate #ms
CHUNK = round(InputRate/Framerate)
for devName in a_shared.Config['devList']:
writeQueue = queue.Queue()
chNum = outputDevs[devName]['maxOutputChannels']
OutputRate = int(outputDevs[devName]['defaultSampleRate'])
RateScale = OutputRate/InputRate
CHUNKFix = round(CHUNK*RateScale)
if RateScale not in {1,2}:
Resample = True
if not outputDevs[devName]['IP']: #本機裝置
try:
po = pyaudio.PyAudio()
stream = po.open(
format=pya_type,
channels=chNum,
rate=OutputRate,
output=True,
output_device_index=outputDevs[devName]['index'],
frames_per_buffer=CHUNKFix,
stream_callback=callback_output(devName,writeQueue,chNum,CHUNKFix))
outputDevs[devName]['pyaudio']=po
outputDevs[devName]['stream']=stream
except Exception as error:
print(f'start {devName} error:{error}')
outputDevs[devName]['queue']=writeQueue
# 初始化輸入流
pIn = pyaudio.PyAudio()
try:
sIn=pIn.open(
format=pya_type,
channels=InputChannel,
rate=InputRate,
input=True,
input_device_index=inputDev['index'],
frames_per_buffer=CHUNK,
stream_callback=callback_input(InputChannel))
except Exception as error:
print(f'start error:{error}')
a_shared.Header.sampleRate = InputRate
a_shared.Header.channels = InputChannel
a_shared.Header.blockSize = CHUNK
a_shared.Header.startStop = True
a_openrgb.RGBQueue.empty()
sendState()
Resample_msg = ''
if Resample:
#Resample_msg = f' 重採樣,音質受損!'
Resample_msg = f',Resampling!'
# 等待停止&清空隊列
print("[INFO] 啟動映射")
a_shared.to_GUI.put([0,f'幀長度:{CHUNK}Hz({Frametime:.1f}ms){Resample_msg}'])
timer = 0
while Start:
if timer > 2: # 秒
timer = 0
threading.Thread(target=queueDelay,daemon = True).start()
#queueDelay()
time.sleep(0.1)
timer +=0.1
# 結束處理
for devName,Dev in outputDevs.items():
if Dev['switch']:
Dev['queue'].put(np.zeros((CHUNK,InputChannel),dtype=np_type))
if not Dev['IP']: #本機裝置
Dev['stream'].stop_stream()
Dev['stream'].close()
Dev['pyaudio'].terminate()
sIn.stop_stream()
sIn.close()
pIn.terminate()
isRunning = False
a_shared.to_GUI.put([0,'']) # 清空文字
a_shared.to_GUI.put([1,isRunning]) # 運作狀態
a_shared.to_GUI.put([2,'Stop mapping'])
a_shared.Header.startStop = False
sendState()
time.sleep(0.1)