From efb723d03234af8cfdbc4b73422e0ce0704c093c Mon Sep 17 00:00:00 2001 From: thegreatcodeholio Date: Mon, 13 May 2024 01:58:04 -0400 Subject: [PATCH] updated frequency extraction to make start and end time of tones to be within 200ms precision. updated tone_detection to include alternations and total length for hi low tones, tone a/b lengths for two tone. --- detect_test.py | 2 +- pyproject.toml | 2 +- .../frequency_extraction.py | 63 +++--- src/icad_tone_detection/tone_detection.py | 196 ++---------------- 4 files changed, 56 insertions(+), 207 deletions(-) diff --git a/detect_test.py b/detect_test.py index b2b023b..23c33f1 100644 --- a/detect_test.py +++ b/detect_test.py @@ -9,7 +9,7 @@ print("Requires a audio path provided. Either file path, or URL.") exit(0) -detect_result = tone_detect(audio_path, time_resolution_ms=50, debug=True) +detect_result = tone_detect(audio_path, matching_threshold=1.5, time_resolution_ms=50, debug=True) if len(detect_result.two_tone_result) == 0 and len(detect_result.long_result) == 0 and len(detect_result.hi_low_result) == 0: print("No tones") diff --git a/pyproject.toml b/pyproject.toml index 71684dd..aac882b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "icad_tone_detection" -version = "0.8" +version = "0.9" authors = [ {name = "TheGreatCodeholio", email = "ian@icarey.net"}, ] diff --git a/src/icad_tone_detection/frequency_extraction.py b/src/icad_tone_detection/frequency_extraction.py index 60ad9b8..20f5b92 100644 --- a/src/icad_tone_detection/frequency_extraction.py +++ b/src/icad_tone_detection/frequency_extraction.py @@ -43,14 +43,13 @@ def get_audio_frequencies(self): """ try: window = 'hann' - n_fft = 2048 # Number of FFT points + n_fft = 2048 # Number of FFT points, smaller values may be considered if finer time resolution is needed # Calculate hop_length based on the desired time resolution - hop_length = int(self.frame_rate * self.time_resolution_ms / 1000) + hop_length = max(1, int(self.frame_rate * self.time_resolution_ms / 1000)) # Perform the STFT - frequencies, _, zxx = stft(self.samples, self.frame_rate, window=window, nperseg=n_fft, - noverlap=n_fft - hop_length) + frequencies, time_samples, zxx = stft(self.samples, self.frame_rate, window=window, nperseg=n_fft, noverlap=n_fft - hop_length) amplitude = np.abs(zxx) # Get the magnitude of the STFT coefficients # Convert amplitude to decibels @@ -59,7 +58,7 @@ def get_audio_frequencies(self): # Detect the frequency with the highest amplitude at each time step detected_frequencies = frequencies[np.argmax(amplitude_db, axis=0)] - matching_frequencies = self.match_frequencies(detected_frequencies.tolist()) + matching_frequencies = self.match_frequencies(detected_frequencies.tolist(), time_samples) return matching_frequencies @@ -67,6 +66,21 @@ def get_audio_frequencies(self): print(f"Error extracting frequencies: {e}") return None + def dynamic_threshold(self, frequencies, index): + """ + Calculates a dynamic threshold based on the frequency changes. + """ + base_frequency = frequencies[max(0, index - 1)] + return base_frequency * self.matching_threshold / 100 + + def calculate_times(self, start_index, end_index, time_samples): + """ + Calculates accurate start and end times for frequency matches. + """ + start_time = round(time_samples[start_index], 3) + end_time = round(time_samples[end_index - 1], 3) + return start_time, end_time + @staticmethod def amplitude_to_decibels(amplitude, reference_value): """ @@ -83,17 +97,19 @@ def amplitude_to_decibels(amplitude, reference_value): reference_value = np.maximum(reference_value, 1e-20) return 20 * np.log10(np.maximum(amplitude, 1e-20) / reference_value) - def match_frequencies(self, detected_frequencies): + def match_frequencies(self, detected_frequencies, time_samples): """ - Identifies and groups matching frequencies from a list of detected frequencies based on the matching threshold. - Each group's start time, end time, and the matching frequencies are returned. + Identifies and groups matching frequencies from a list of detected frequencies based on the matching threshold. + Each group's start time, end time, and the matching frequencies are returned. - Parameters: - detected_frequencies (list of float): The detected frequencies from the audio sample. + Parameters: + detected_frequencies (list of float): The detected frequencies from the audio sample. + time_samples (np.array): Array of times corresponding to each frequency sample. - Returns: - list of tuples: Each tuple contains the start time, end time, and a list of matching frequencies. + Returns: + list of tuples: Each tuple contains the start time, end time, and a list of matching frequencies. """ + if not detected_frequencies: return [] @@ -101,29 +117,26 @@ def match_frequencies(self, detected_frequencies): frequencies = [round(f, 1) for f in detected_frequencies] matching_frequencies = [] current_match = [frequencies[0]] - start_index = 0 # Initialize start index for the first frequency + start_index = 0 for i in range(1, len(frequencies)): - threshold = frequencies[i - 1] * self.matching_threshold / 100 + threshold = self.dynamic_threshold(frequencies, i) if abs(frequencies[i] - frequencies[i - 1]) <= threshold: current_match.append(frequencies[i]) else: if len(current_match) >= 2: - # Calculate start and end times for the current match - start_time = round(start_index * self.duration_seconds / len(frequencies), 3) - end_time = round(i * self.duration_seconds / len(frequencies), 3) - matching_frequencies.append((start_time, end_time, current_match)) + start_time, end_time = self.calculate_times(start_index, i, time_samples) + freq_length = round((end_time - start_time) + .1, 2) + matching_frequencies.append((start_time, end_time, freq_length, current_match)) current_match = [frequencies[i]] - start_index = i # Update start index for the new match + start_index = i - # Handle the last group of matching frequencies if len(current_match) >= 2: - start_time = round(start_index * self.duration_seconds / len(frequencies), 3) - end_time = round(len(frequencies) * self.duration_seconds / len( - frequencies), 3) # End time for the last frequency - matching_frequencies.append((start_time, end_time, current_match)) + start_time, end_time = self.calculate_times(start_index, len(frequencies), time_samples) + freq_length = round((end_time - start_time) + .1, 2) + matching_frequencies.append((start_time, end_time, freq_length, current_match)) return matching_frequencies except Exception as e: print(f"Error matching frequencies: {e}") - return [] + return [] \ No newline at end of file diff --git a/src/icad_tone_detection/tone_detection.py b/src/icad_tone_detection/tone_detection.py index ffcdc99..ea9943e 100644 --- a/src/icad_tone_detection/tone_detection.py +++ b/src/icad_tone_detection/tone_detection.py @@ -1,27 +1,4 @@ -# def detect_quickcall(frequency_matches): -# qc2_matches = [] -# tone_id = 0 -# last_set = None -# if not frequency_matches or len(frequency_matches) < 1: -# return qc2_matches -# for x in frequency_matches: -# if last_set is None and len(x[2]) >= 8 and 0 not in x[2] and 0.0 not in x[2]: -# last_set = x -# else: -# if len(x[2]) >= 8 and 0 not in x[2] and 0.0 not in x[2]: -# if len(last_set[2]) <= 12 and len(x[2]) >= 28: -# tone_data = {"tone_id": f'qc_{tone_id + 1}', "detected": [last_set[2][0], x[2][0]], -# "start": last_set[0], "end": x[1]} -# tone_id += 1 -# qc2_matches.append(tone_data) -# last_set = x -# else: -# last_set = x -# -# return qc2_matches - - -def detect_two_tone(frequency_matches, min_tone_a_length=0.8, min_tone_b_length=2.8): +def detect_two_tone(frequency_matches, min_tone_a_length=0.7, min_tone_b_length=2.7): two_tone_matches = [] tone_id = 0 last_set = None @@ -29,7 +6,7 @@ def detect_two_tone(frequency_matches, min_tone_a_length=0.8, min_tone_b_length= return two_tone_matches for current_set in frequency_matches: - if all(f > 0 for f in current_set[2]): # Ensure frequencies are non-zero + if all(f > 0 for f in current_set[3]): # Ensure frequencies are non-zero current_duration = current_set[1] - current_set[0] # Calculate the duration of the current tone if last_set is None: @@ -40,7 +17,9 @@ def detect_two_tone(frequency_matches, min_tone_a_length=0.8, min_tone_b_length= if last_duration >= min_tone_a_length and current_duration >= min_tone_b_length: tone_data = { "tone_id": f'qc_{tone_id + 1}', - "detected": [last_set[2][0], current_set[2][0]], # Frequency values of A and B tones + "detected": [last_set[3][0], current_set[3][0]], # Frequency values of A and B tones + "tone_a_length": last_set[2], + "tone_b_length": current_set[2], "start": last_set[0], # Start time of tone A "end": current_set[1] # End time of tone B } @@ -60,8 +39,7 @@ def detect_long_tones(frequency_matches, detected_quickcall, min_duration=2.0): for quickcall in detected_quickcall: excluded_frequencies.update(quickcall["detected"][:2]) - for start, end, frequencies in frequency_matches: - duration = end - start + for start, end, duration, frequencies in frequency_matches: if not frequencies: continue @@ -85,150 +63,6 @@ def detect_long_tones(frequency_matches, detected_quickcall, min_duration=2.0): return long_tone_matches -# def extract_warble_tones(frequency_matches, interval_length, min_alternations): -# """ -# Extract sequences of alternating tones (warble tones) from detected tones, -# following the specified process. -# -# Parameters: -# detected_tones (list of tuples): Detected tones with start time, end time, and frequencies. -# interval_length (float): The maximum allowed interval in seconds between alternating tones. -# min_alternations (int): The minimum number of alternations for a sequence to be considered valid. -# -# Returns: -# list of dicts: Extracted warble tones with details including start, end, and tones. -# """ -# sequences = [] -# id_index = 1 -# current_index = 0 -# new_sequence = [] -# # Iterate over frequency matches -# for index, group in enumerate(frequency_matches): -# if index + 3 >= len(frequency_matches): -# if len(new_sequence) >= min_alternations: -# new_detection = { -# "hl_id": id_index, -# "tones": [new_sequence[0][2][0], new_sequence[1][2][0]], -# "start": new_sequence[0][0], -# "end": new_sequence[-1][0] -# } -# sequences.append(new_detection) -# new_sequence.clear() -# id_index += 1 -# break -# -# if index != current_index: -# continue -# -# group_a = frequency_matches[index] -# group_b = frequency_matches[index + 1] -# group_c = frequency_matches[index + 2] -# group_d = frequency_matches[index + 3] -# -# group_a_tone = group_a[2][0] -# group_b_tone = group_b[2][0] -# group_c_tone = group_c[2][0] -# group_d_tone = group_d[2][0] -# -# if 0 in (group_a_tone, group_b_tone, group_c_tone, group_d_tone) or 0.0 in ( -# group_a_tone, group_b_tone, group_c_tone, group_d_tone): -# current_index += 1 -# if len(new_sequence) >= min_alternations: -# new_detection = { -# "hl_id": id_index, -# "tones": [new_sequence[0][2][0], new_sequence[1][2][0]], -# "start": new_sequence[0][0], -# "end": new_sequence[-1][0] -# } -# sequences.append(new_detection) -# new_sequence.clear() -# id_index += 1 -# continue -# -# if group_a_tone == group_b_tone or group_c_tone == group_d_tone: -# current_index += 1 -# if len(new_sequence) >= min_alternations: -# new_detection = { -# "hl_id": id_index, -# "tones": [new_sequence[0][2][0], new_sequence[1][2][0]], -# "start": new_sequence[0][0], -# "end": new_sequence[-1][0] -# } -# sequences.append(new_detection) -# new_sequence.clear() -# id_index += 1 -# continue -# -# # if any group has less 2 frequencies continue it can't be a warble tone. -# if len(group_a[2]) < 2 or len(group_b[2]) < 2 or len(group_c[2]) < 2 or len(group_d[2]) < 2: -# current_index += 1 -# if len(new_sequence) >= min_alternations: -# new_detection = { -# "hl_id": id_index, -# "tones": [new_sequence[0][2][0], new_sequence[1][2][0]], -# "start": new_sequence[0][0], -# "end": new_sequence[-1][0] -# } -# sequences.append(new_detection) -# new_sequence.clear() -# id_index += 1 -# continue -# -# # check to see if our groups occurred within the threshold interval length threshold -# if not (group_b[0] - group_a[1] <= interval_length) and (group_c[0] - group_b[1] <= interval_length) and (group_d[0] - group_c[1] <= interval_length): -# current_index += 1 -# if len(new_sequence) >= min_alternations: -# new_detection = { -# "hl_id": id_index, -# "tones": [new_sequence[0][2][0], new_sequence[1][2][0]], -# "start": new_sequence[0][0], -# "end": new_sequence[-1][0] -# } -# sequences.append(new_detection) -# new_sequence.clear() -# id_index += 1 -# continue -# -# # if group a and c don't match continue -# if not within_tolerance(group_a_tone, group_c_tone): -# current_index += 1 -# if len(new_sequence) >= min_alternations: -# new_detection = { -# "hl_id": id_index, -# "tones": [new_sequence[0][2][0], new_sequence[1][2][0]], -# "start": new_sequence[0][0], -# "end": new_sequence[-1][0] -# } -# sequences.append(new_detection) -# new_sequence.clear() -# id_index += 1 -# continue -# -# # if group b and d don't match continue -# if not within_tolerance(group_b_tone, group_d_tone): -# current_index += 1 -# if len(new_sequence) >= min_alternations: -# new_detection = { -# "hl_id": id_index, -# "tones": [new_sequence[0][2][0], new_sequence[1][2][0]], -# "start": new_sequence[0][0], -# "end": new_sequence[-1][0] -# } -# sequences.append(new_detection) -# new_sequence.clear() -# id_index += 1 -# continue -# -# # we have a potential match -# new_sequence.append(group_a) -# new_sequence.append(group_b) -# new_sequence.append(group_c) -# new_sequence.append(group_d) -# -# current_index += 4 -# -# return sequences - def within_tolerance(frequency1, frequency2, tolerance=0.02): """ Check if two frequencies are within a specified tolerance percentage. @@ -271,14 +105,14 @@ def detect_warble_tones(frequency_matches, interval_length, min_alternations): while i < len(frequency_matches): group = frequency_matches[i] # Ensure there are frequencies to evaluate - if not group[2]: + if not group[3]: i += 1 break - freq = group[2][0] + freq = group[3][0] # Skip groups with invalid frequencies or not enough tones - if freq <= 0 or len(group[2]) < 2: + if freq <= 0 or len(group[3]) < 2: i += 1 break @@ -286,14 +120,14 @@ def detect_warble_tones(frequency_matches, interval_length, min_alternations): current_sequence.append(group) else: group_a = current_sequence[-1] - tone_a = group_a[2][0] + tone_a = group_a[3][0] tone_b = freq # Check for the next group, if it exists if i + 1 < len(frequency_matches): group_c = frequency_matches[i + 1] - tone_c = group_c[2][0] + tone_c = group_c[3][0] # Ensure current tone does not match the adjacent tones directly if tone_a == tone_b or tone_b == tone_c: @@ -302,7 +136,7 @@ def detect_warble_tones(frequency_matches, interval_length, min_alternations): # Check for alternation with a tolerance if len(current_sequence) >= 2: - previous_alt_tone = current_sequence[-2][2][0] + previous_alt_tone = current_sequence[-2][3][0] if within_tolerance(tone_b, previous_alt_tone) and group[0] - group_a[1] < interval_length: current_sequence.append(group) else: @@ -321,9 +155,11 @@ def detect_warble_tones(frequency_matches, interval_length, min_alternations): if len(current_sequence) >= min_alternations * 2: sequences.append({ "tone_id": f"hl_{id_index}", - "detected": [current_sequence[0][2][0], current_sequence[1][2][0]], + "detected": [current_sequence[0][3][0], current_sequence[1][3][0]], "start": current_sequence[0][0], - "end": current_sequence[-1][1] + "end": current_sequence[-1][1], + "length": round(current_sequence[-1][1] - current_sequence[0][0], 2), + "alternations": len(current_sequence) // 2 }) id_index += 1