-
Notifications
You must be signed in to change notification settings - Fork 3
/
cpuWola.py
127 lines (105 loc) · 4.71 KB
/
cpuWola.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import numpy as np
import ctypes as ct
import os
from intelHelpers import include_ipp
os.environ["PATH"] = (
os.environ["PATH"] + os.pathsep +
os.path.dirname(os.path.realpath(__file__))
)
try:
include_ipp()
# os.add_dll_directory(os.path.join(
# os.environ["IPPROOT"], "redist", "intel64"))
except KeyError:
raise ImportError("Couldn't find IPPROOT env var.")
def cpu_threaded_wola(y, f_tap, fftlen, Dec, NUM_THREADS=4):
"""
Has been edited to use the IPP FFT libraries. Now includes threads selection.
Use this over all other methods. Commenting out the rest..
"""
if len(f_tap) % fftlen != 0:
print("Filter taps length must be factor multiple of fft length!")
return 1
# if (np.mod(fftlen,Dec)!=0):
# print("FFT length must be multiple of Decimation Factor!")
# return 1
if Dec * 2 != fftlen and Dec != fftlen:
print(Dec)
print(fftlen)
print(
"PHASE CORRECTION ONLY IMPLEMENTED FOR DECIMATION = FFT LENGTH OR DECIMATION * 2 = FFT LENGTH!"
)
return 1
_libmc = np.ctypeslib.load_library(
"cpuWolaDll", loader_path=os.path.dirname(os.path.realpath(__file__))
)
array_1d_complexfloat = np.ctypeslib.ndpointer(
dtype=np.complex64, ndim=1, flags="CONTIGUOUS"
)
array_1d_single = np.ctypeslib.ndpointer(
dtype=np.float32, ndim=1, flags="CONTIGUOUS"
)
_libmc.cpuWola.restype = ct.c_int32
_libmc.cpuWola.argtypes = [
array_1d_complexfloat,
array_1d_single,
ct.c_int32,
ct.c_int32,
ct.c_int32,
ct.c_int32,
array_1d_complexfloat,
ct.c_int32,
]
siglen = len(y)
out = np.empty(int(siglen / Dec * fftlen),
dtype=np.complex64) # make the output
retcode = _libmc.cpuWola(
y, f_tap, fftlen, Dec, int(siglen / Dec), len(f_tap), out, NUM_THREADS
) # run the dll function
# reshape to channels in columns
out = out.reshape((int(siglen / Dec), fftlen))
return out, retcode
# def cpu_threaded_wola_32fc(y, f_tap, fftlen, Dec):
# if (len(f_tap) % fftlen != 0):
# print("Filter taps length must be factor multiple of fft length!")
# return 1
# if (np.mod(fftlen,Dec)!=0):
# print("FFT length must be multiple of Decimation Factor!")
# return 1
# # if (Dec != fftlen and Dec != fftlen*2):
# # print(Dec)
# # print(fftlen)
# # print("PHASE CORRECTION ONLY IMPLEMENTED FOR DECIMATION = FFT LENGTH OR DECIMATION * 2 = FFT LENGTH!")
# # return 1
# _libmc = np.ctypeslib.load_library('cpuWolaDll_32fc', loader_path=os.path.dirname(os.path.realpath(__file__)))
# array_1d_complex = np.ctypeslib.ndpointer(dtype=np.complex64, ndim = 1, flags = 'CONTIGUOUS')
# array_1d_single = np.ctypeslib.ndpointer(dtype=np.float32, ndim = 1, flags = 'CONTIGUOUS')
# _libmc.cpuWola.restype = ct.c_int32
# _libmc.cpuWola.argtypes = [array_1d_complex, array_1d_single, ct.c_int32, ct.c_int32, ct.c_int32, ct.c_int32, array_1d_complex]
# siglen = len(y)
# out = np.empty(int(siglen/Dec * fftlen), dtype = np.complex64) # make the output
# retcode = _libmc.cpuWola(y, f_tap, fftlen, Dec, int(siglen/Dec), len(f_tap), out) # run the dll function
# out = out.reshape((int(siglen/Dec),fftlen)) # reshape to channels in columns
# return out, retcode
# def cpu_threaded_wola_choosethreads(y, f_tap, fftlen, Dec, threads):
# if (len(f_tap) % fftlen != 0):
# print("Filter taps length must be factor multiple of fft length!")
# return 1
# if (np.mod(fftlen,Dec)!=0):
# print("FFT length must be multiple of Decimation Factor!")
# return 1
# # if (Dec != fftlen and Dec != fftlen*2):
# # print(Dec)
# # print(fftlen)
# # print("PHASE CORRECTION ONLY IMPLEMENTED FOR DECIMATION = FFT LENGTH OR DECIMATION * 2 = FFT LENGTH!")
# # return 1
# _libmc = np.ctypeslib.load_library('cpuWolaDll_choosethreads','.')
# array_1d_complexdouble = np.ctypeslib.ndpointer(dtype=np.complex128, ndim = 1, flags = 'CONTIGUOUS')
# array_1d_double = np.ctypeslib.ndpointer(dtype=np.float64, ndim = 1, flags = 'CONTIGUOUS')
# _libmc.cpuWola.restype = ct.c_int32
# _libmc.cpuWola.argtypes = [array_1d_complexdouble, array_1d_double, ct.c_int32, ct.c_int32, ct.c_int32, ct.c_int32, array_1d_complexdouble,ct.c_int32]
# siglen = len(y)
# out = np.empty(int(siglen/Dec * fftlen), dtype = np.complex128) # make the output
# retcode = _libmc.cpuWola(y, f_tap, fftlen, Dec, int(siglen/Dec), len(f_tap), out, threads) # run the dll function
# out = out.reshape((int(siglen/Dec),fftlen)) # reshape to channels in columns
# return out, retcode