AQP is a highly modular pipeline designed to enable the testing and comparison of different objective speech and audio quality metrics (e.g. ViSQOL, PESQ, Warp-Q, etc.). AQP allows researchers to test and compare objective quality metrics helping to improve robustness, reproducibility and development speed.
Earlier speech/audio quality models were developed in either C/C++ for speed or MATLAB for research prototyping and were based on monolithic codebases that were difficult to adapt or extend. Models such as NISQA , CDPAM ,SESQA and WARP-Q have been developed using standard python libraries. Python has matured and become widely adopted in both research and industry deployment with packages and libraries available to implement many standard audio signal processing and machine learning algorithms as well as data wrangling and visualisation.
With the change to using Python for research of this nature in mind, AQP was designed to be modular and easy to use for almost anyone. Using Python allows for quick testing and integration of new functionality to the pipeline. The core architecture of AQP is built up around a Directed Acyclic Graph (DAG) data structure. Each node in the graph encapsulates some logic, e.g. loading a signal, scaling a signal, creating a spectrogram, etc.
Alongside the DAG, some concepts from the component-based design pattern is used to form the core functionality of the pipeline. A result dictionary is declared before the pipeline is executed. This dictionary is passed to each node and it's contents are either retrieved to use as input to the encapsulated logic and/or updated with the results of executing the node.
Due to this repository being included in the submission, detailed in the next section, this repository will be kept as is. However, the repository and any future commits will be available here https://github.com/QxLabIreland/AQP
The AQP paper has been accepted to MMSys22, Athlone, Ireland. Citation will be uploaded afterwards
The first step to running AQP is obviously to clone the repository to a directory of your choosing.
AQP relies on several Python pacakges. The information about these packages is found in the requirements.txt
file and they can all be installed with a single command.
pip install -r requirements.txt
If you are a person using a Mac or Unix based operating system and want to run the scripts located in the AQP/scripts
directory, the subversion
package is required. This is just to pull specific datasets from the https://github.com/QxLabIreland/datasets repository.
On Mac
brew install subversion
On Ubuntu/Debian
apt-get install subversion
For more Unix based install instructions go to https://subversion.apache.org/packages.html .
There is also information there for Windows users, but likely the easiest method is to just download the full dataset repository.
To make full use of the pipelines visualization capabilities, it is recommended to install Graphviz. Graph visualization is a way of representing structural information as diagrams of abstract graphs and networks. Instructions on how to install Graphviz can be found here.
To get a quick idea of how the pipeline works and can be used to run an experiment there is a quickstart example that is designed to get the pipeline working in a few easy steps.
The example uses a subset(the first 10 entries) of the Genspeech dataset. The full Genspeech and TCDVoip datasets are available at https://github.com/QxLabIreland/datasets. The example configuration of the pipeline is the one used in the case study of the AQP paper, which compared two variations of the Warp-Q metric vs PESQ.
Note: These steps assume you have the subversion package installed and have access to the wget command.
- From the terminal,
cd
into thescripts
directory (AQP/scripts
) - Execute the
run_example.sh
script using./run_example.sh
For those who don't have access to subversion and wget.
-
Download the
datasets
repository from the link given above. -
Move/copy the
quickstart_genspeech
directory and thequickstart_genspeech.csv
file located underdatasets/genspeech
to the resources folder of AQP. Resulting file structure should look likeAQP/resources/quickstart_genspeech/
andAQP/resources/quickstart_genspeech.csv
-
From the root directory of AQP, execute the following
python scripts/prepend_files.py --prepend_with "resources/" --dataset "resources/quickstart_genspeech.csv" python pipeline.py --root_node_id "Load DF" --graph_config_path "config/example.json" --plot_graph
After executing the quickstart example through either method described above the figure below should be produced.
It is also easy to extend the above example to recreate the case study performed in the AQP paper. On Mac/Unix, execute the genspeech.sh
script from inside the scripts directory to download the entire Genspeech dataset and csv file. Then from the root directory execute the following commands:
python scripts/prepend_files.py --prepend_with "resources/" --dataset "resources/genspeech.csv"
python pipeline.py --root_node_id "Load DF" --graph_config_path "config/warpq_pesq_dataset.json" --plot_graph
The figure below shows the expected output after executing the above commands and allowing the pipeline to run on the full Genspeech dataset.
There are several arguments which can be passed to AQP, some are required, others optional.
--root_node_id
: This is the name/key of the root node of the graph. This will be the first node executed and from there it's children will be executed and so on.--graph_config_path
: Path to the graph definition to use.
The other options are:
--plot_graph
: Flag to create a diagram of the pipeline, default is False.--output_dir
: Path to a directory for any files produced, default isresults/
--graph_output_file
: Path to file to store the created DOT file, do NOT include the file extension (this is done automatically), default is "results/graph". This gets expanded to, for example, "results/graph.dot".--validate:
Signals that the pipeline should just be validated and optionally graphed. Pipeline will not run if this is set toTrue
.--debug
: Enables debug logging.--version
: Prints the version.
Forewarning: Some of the configurations described below use nodes that are implemented for the ViSQOL quality metric. Some code for ViSQOL still exists on the main branch, under qualitymetrics/visqol
, however the nodes used for ViSQOL have been removed temporarily and are located on the visqol_dev
branch. This was done due to some bugs being present. They will be added back into the main branch as soon as these bugs have been fixed.
The Visqol branch has plenty of examples of how nodes can be created and used to encapsulate different bits of functionality.
In future we plan on adding more quality metrics, datasets and core nodes to the platform. Stay tuned folks!
Nodes are used to encapsulate some unique logic/functionality, but there is common properties/functionality to all nodes. These are defined in the abstract Node class (found in nodes/node.py). Node must be contained within the nodes
directory. Any node placed in this directory will be detected by the pipeline and will be available for use.
class Node(object):
def __init__(self, id_: str, output_key: str, draw_options: dict, **kwargs):
self.id_ = id_
self.output_key = output_key
self.draw_options = draw_options
self.type = '__node__'
def execute(self, result: dict, **kwargs):
LOGGER.info(f'Executing node {self.id_} | type={self.type_}')
There are currently four class which inherit/implement the Node base class, the AQPNode, ViSQOLNode and PESQNode and WarpQNode. The main reason for these classes is related to outputing the pipeline in .dot format for GraphViz. Each node class can have a dictionary called draw_options passed to it and it is used to control how the node will be drawn in the output image. The three Node implementations simply add specific draw options for nodes belonging to different use cases. More on this in the Drawing the Pipeline section. All other node classes implement one of the three previously mentioned classes.
The execute function is main function of the pipeline, it is common to all nodes and is how data moves through the pipeline. The execute function takes in the result dictionary as an argument and should return the same dictionary in the majority of cases(more details on this in the Advanced Nodes section)
Adding your own node is quick and easy to do. All it requires is to implement one of AQPNode, ViSQOLNode, PESQNode, WarpQNode or your own base node that you've created. Then the __init__
function and the execute
function must be implemented.
Example
from .node import AQPNode
class MyCustomNode(AQPNode):
# Note that output_key and draw_options are optional, so they do not have to be used if not needed.
def __init__(self, id_: str, output_key: str, param_one: str, param_two: int, draw_options: dict=None, **kwargs):
super().__init__(id_, output_key=output_key, draw_options=draw_options, **kwargs)
self.param_one = param_one
self.param_two = param_two
# You MUST declare self.type_, this is used to logging purpose and just describe the name of the node
self.type_ = 'MyCustomNode'
def execute(self, result: dict, **kwargs):
super().execute(result, **kwargs)
output_value = result[self.param_one] * self.param_two
result[self.output_key] = output_value
return result
A real implemented node
import math
import numpy as np
from .node import AQPNode
class ScaleSignalsNode(AQPNode):
"""Node used to scale two signals to the same Sound Pressure Level."""
def __init__(self, id_: str,
ref_sig_key: str='reference_signal',
deg_sig_key: str='degraded_signal',
draw_options: dict=None, **kwargs):
"""Initialize a ScaleSignalsNode.
Parameters
----------
ref_sig_key : str, optional
Key to retrieve the reference audio signal. The default is 'reference_signal'.
deg_sig_key : str, optional
Key to retrieve the degraded audio signal. The default is 'degraded_signal'.
"""
super().__init__(id_, draw_options=draw_options)
self.ref_sig_key = ref_sig_key
self.deg_sig_key = deg_sig_key
self.type_ = 'ScaleSignalNode'
def execute(self, result: dict, **kwargs):
"""Execute the ScaleSignalNode and update the degraded signal key with the scaled signal."""
super().execute(result)
required_reference_spl = ScaleSignalsNode._calculate_SPL(result[self.ref_sig_key])
required_degraded_spl = ScaleSignalsNode._calculate_SPL(result[self.deg_sig_key])
result[self.deg_sig_key] *= (10 ** ((required_reference_spl - required_degraded_spl) / 20))
return result
@classmethod
def _calculate_SPL(cls, signal: np.ndarray) -> float:
return 20 * math.log10(math.sqrt(np.mean(np.square(signal))) / 20e-6)
The pipeline is defined as a collection of connected nodes. This information is described through a JSON configuration file. This file is passed to the program on startup as a command-line argument, --graph_config_path
. Each entry in the JSON file is used to describe some node in the pipeline. The key for an entry is used as a unique id for each node and is how nodes are connected together. The values associated with the key are then used as the construction parameters for a node, with some extra parameters, such as children
and type
being used to facilitate the creation of connections between nodes and instructing the pipeline in which type of node to create.
{
"load_ref_signal": {
"type": "LoadSignalNode",
"children": ["load_deg_signal"],
"output_key": "reference_signal",
"signal_path": "resources/reference/ref.wav",
"file_name_key": "reference_file"
},
"load_deg_signal": {
"type": "LoadSignalNode",
"children": ["scale_signal"],
"output_key": "degraded_signal",
"signal_path": "resources/degraded/deg.wav",
"file_name_key": "degraded_file"
},
"scale_signal": {
"type": "ScaleSignalsNode"
}
}
The above configuration results in a graph that looks like the image below.
The LoopNode is used to loop over some iterable entry in the result dictionary. When creating a LoopNode a definition is provided of all the nodes which it should loop over using each entry in the iterable object. Each iteration, a copy of the original dictionary is used (so as to avoid key conflicts), and this copy is then assigned to a results dictionary. The output of the loop node is this results dictionary, it gets assigned to the main result dictionary. The execute function of the LoopNode is shown below.
def execute(self, result: dict, **kwargs):
super().execute(result)
results = {}
for i in result[self.iterable_key]:
LOGGER.info("Running on iterable entry: %s", i)
result_copy = {k: result[k] for k in result if k not in self.key_blacklist}
result_copy['iterator_item'] = i
graphutils.run_node(self.execution_node, result_copy)
results[i] = result_copy
result[self.output_key] = results
return result
When creating the copy of the result dictionary, an additional list can be used to blacklist specified keys from appearing in the copy. This is useful when running quality metric configurations against each other, e.g. ViSQOL with a Mel Spectrogram against ViSQOL with a Gammatone Spectrogram. It prevents the output of each configuration being present when looping over another set of nodes. Without this, it is possible that the result dictionary ends up storing most of itself at the output key of a loop node. An example of using the blacklist is seen later in a more advanced graph config.
The EncapsulationNode is mostly a utility node that can store a pipeline definition. Like the LoopNode, it also receives a sub-graph definition during construction and upon calling it's execute function it call each node contained within that sub-graph. This functionality is useful as it can be used to shorten graph configuration files, as well as reuse the same definition without having to redefine the graph again.
def execute(self, result: dict, **kwargs):
super().execute(result, **kwargs)
graphutils.run_node(self.execution_node, result, **kwargs)
return result
An EncapsulationNode can be created in one of two ways:
- Defining the sub-graph in the EncapsulationNode entry or by
- Providing a path to a JSON file containing the sub-graph definition.
An id of the starting node in the sub-graph must be provided in both cases.
{
"load_ref": {
"children": ["load_deg"],
"type": "LoadSignalNode",
"signal_path": "resources/reference/ref.wav",
"file_name_key": "reference_file",
"output_key": "reference_signal"
},
"load_deg": {
"children": ["ViSQOL_Mel"],
"type": "LoadSignalNode",
"signal_path": "resources/degraded/deg.wav",
"file_name_key": "degraded_file",
"output_key": "degraded_signal"
},
"ViSQOL_Mel": {
"children": ["output"],
"type": "EncapsulationNode",
"start_node": "visqol_args",
"path_to_node_config": "config/visqol/graphs/default_visqol_mel.json"
},
"output": {
"type": "IdentityNode"
}
}
The JSON below is the contents of config/visqol/graphs/default_visqol_mel.json
(found under the visqol_dev branch
or feature_dev_two
for now)
{
"visqol_args": {
"output_key": "visqol_args",
"children": ["channel_loop"],
"type": "VisqolStructuresNode",
"config_file_path": "config/visqol/structures/default_visqol_mel_structures.json"
},
"channel_loop": {
"output_key": "vnsims_mel",
"type": "LoopNode",
"iterable_key": "active_channels",
"key_blacklist": ["wav_files", "vnsims_gamma", "vnsims_mel", "vnsims_goertzel"],
"start_node": "extract_channels",
"node_data": {
"extract_channels": {
"children": ["scale_signals"],
"type": "ViSQOLChannelNode"
},
"scale_signals": {
"children": ["ref_spect"],
"type": "ScaleSignalsNode"
},
"ref_spect": {
"children": ["deg_spect"],
"output_key": "reference_spect",
"type": "SpectrogramNode",
"signal_key": "reference_signal",
"file_name_key": "reference_file",
"save_spectrogram": true,
"output_dir": "spectrograms/mel/"
},
"deg_spect": {
"children": ["floor_spect"],
"output_key": "degraded_spect",
"type": "SpectrogramNode",
"signal_key": "degraded_signal",
"file_name_key": "degraded_file",
"save_spectrogram": true,
"output_dir": "spectrograms/mel/"
},
"floor_spect": {
"children": ["reference_patches"],
"type": "FloorSpectrogramsNode"
},
"reference_patches": {
"children": ["vad"],
"type": "ReferencePatchNode"
},
"vad": {
"children": ["patch_alignment"],
"type": "VADNode"
},
"patch_alignment": {
"children": ["patch_similarity"],
"type": "PatchAlignmentNode"
},
"patch_similarity": {
"children": [],
"type": "PatchSimilarityNode"
}
}
}
}
The SinkNode is a relatively simple but useful node. It's purpose is prevent the execution of the nodes below it in the pipeline until it has received a set number of results from different branches. Once it has seen the expected number of results, the result is passed to the child nodes of the SinkNode.
When executing the pipeline, each node has it's execute function called. This normally returns the result dictionary and execution continues. In this case, each of the children of the current node being executed are added to a Stack (pipeline operates using a modified Depth First Traversal). However, if the return value is None, then the children of the current node don't get added to the Stack and instead, execution continues using the next node on the same level of the graph as the current node.
The SinkNode allows for the collection of different results before they are passed to an output node for a node which relies on having multiple different results.
def execute(self, result: dict, **kwargs):
super().execute(result, **kwargs)
self.counter += 1
return result if self.counter == self.num_expected_results else None
The TransformNode contains several transformation function which can be used to operate on some data contained within the result dictionary. These functions/transforms could have been encapsulated into their own nodes, but they're short and having each of them be defined separately would bloat the nodes directory further. The transforms so far, are designed around taking some value(s) from the result dictionary and creating some new value or remove a layer of nesting etc. So far there are three transforms used:
df_columns_to_tuples
: when a dataset is loaded from a csv file the reference and degraded files are in different columns, this transform makes a single list of tuples from these columns.tuple_to_top_level
: When iterating (using a LoopNode) over the list of tuples described above, the individual parts need to be extracted so as to load the reference and degraded signal. This transform retrieves the current tuple, and assigns each value back to the dictionary.update_df
: After running a quality metric on a signal, the results for that signal needs to be stored somewhere, so as to be to graph them later on. This needs to happen per signal tested. This transform locates the correct row in the loaded dataframe based off of the reference file and updates a (new) column with the results of the signal.
Each function available in the TransformNode takes it's own unique arguments. These arguments should be provided in the definition of the TransformNode using the function_args
field. These values get upacked during execution for use.
Example
{
"type ": "TransformNode",
"transform_name": "tuple_to_top_level",
"target_key": "iterator_item",
"function_args": {
"reference_file_key": "reference",
"degraded_file_key": "degraded"
}
}
{
"Load DF": {
"type": "LoadCSVAsDFNode",
"children": ["Wrangle Data"],
"output_key": "dataframe",
"path_to_csv": "resources/quickstart_genspeech.csv"
},
"Wrangle Data": {
"type": "TransformNode",
"children": ["Add sr"],
"transform_name": "df_columns_to_tuples",
"target_key": "dataframe",
"output_key": "wav_files",
"function_args": {
"col_one": "Ref_Wave",
"col_two": "Test_Wave"
}
},
"Add sr": {
"type": "VariableNode",
"children": ["DF Loop"],
"output_key": "sr",
"variable_value": 16000
},
"DF Loop": {
"type": "LoopNode",
"children": ["DF to csv"],
"output_key": "dataset_output",
"iterable_key": "wav_files",
"start_node": "Tuple Transform",
"key_blacklist": [],
"node_data": {
"Tuple Transform": {
"type": "TransformNode",
"children": ["Load Ref"],
"transform_name": "tuple_to_top_level",
"target_key": "iterator_item",
"function_args": {
"reference_file_key": "reference",
"degraded_file_key": "degraded"
}
},
"Load Ref": {
"type": "LoadSignalNode",
"children": ["Load Test"],
"target_sample_rate": 16000,
"file_name_key": "reference_file",
"signal_key": "reference",
"output_key": "reference_signal"
},
"Load Test": {
"type": "LoadSignalNode",
"children": ["VAD", "PESQ"],
"target_sample_rate": 16000,
"signal_key": "degraded",
"output_key": "degraded_signal",
"file_name_key": "degraded_file"
},
"VAD": {
"type": "WarpQVADNode",
"children": ["MFCC", "Mel"],
"ref_sig_key": "reference_signal",
"deg_sig_key": "degraded_signal"
},
"MFCC": {
"type": "MFCCNode",
"children": ["MFCC SDTW"],
"ref_sig_key": "reference_signal",
"deg_sig_key": "degraded_signal"
},
"Mel": {
"type": "MelNode",
"children": ["Mel SDTW"],
"ref_sig_key": "reference_signal",
"deg_sig_key": "degraded_signal"
},
"Mel SDTW": {
"type": "WarpQSDTWNode",
"children": ["Update DF Mel"],
"output_key": "warp_q_mel",
"mfcc_ref_key": "mfcc_ref",
"mfcc_coded_patch_key": "mfcc_coded_patch"
},
"MFCC SDTW": {
"type": "WarpQSDTWNode",
"children": ["Update DF MFCC"],
"output_key": "warp_q_mfcc",
"mfcc_ref_key": "mfcc_ref",
"mfcc_coded_patch_key": "mfcc_coded_patch"
},
"Update DF Mel": {
"type": "TransformNode",
"transform_name": "update_df",
"target_key": "dataframe",
"function_args": {
"key": "warp_q_mel",
"col_name": "Ref_Wave"
}
},
"Update DF MFCC": {
"type": "TransformNode",
"transform_name": "update_df",
"target_key": "dataframe",
"function_args": {
"key": "warp_q_mfcc",
"col_name": "Ref_Wave"
}
},
"PESQ": {
"type": "EncapsulationNode",
"children": ["Update DF PESQ"],
"start_node": "pesq_alignment",
"node_data": {
"pesq_alignment": {
"type": "AlignmentNode",
"children": ["pesq"]
},
"pesq": {
"type": "PyPESQNode",
"output_key": "pesq"
}
}
},
"Update DF PESQ": {
"type": "TransformNode",
"transform_name": "update_df",
"target_key": "dataframe",
"function_args": {
"key": "pesq",
"col_name": "Ref_Wave"
}
}
}
},
"DF to csv": {
"type": "TransformNode",
"children": ["Graph Output"],
"transform_name": "to_csv",
"target_key": "dataframe",
"function_args": {
"output_file_name": "results/RESULTS.csv"
}
},
"Graph Output": {
"type": "GraphNode",
"df_key": "dataframe",
"x_data_key": "MOS",
"y_data_keys": ["warp_q_mel", "warp_q_mfcc", "pesq"],
"y_labels": ["WARP-Q Distance", "WARP-Q Distance", "Predicted MOS"],
"titles": ["WARP-Q Mel", "WARP-Q MFCC", "PESQ"]
}
}
It is possible to produce a visual output of the pipeline. This visualization shows the connections between nodes and clusters of nodes that are related due to a NestedNode. These clusters are shown by boxes drawn around nodes that belong to a NestedNode. The first node in these clusters is the NestedNode itself, then it contains the nodes connected to the execution_node of the NestedNode.
The visualization is produced by means of the DOT language. The DOT file produced by the pipeline can be modifying to add any additional drawing information. If Graphviz is installed on your machine and the dot
command is available in the terminal then after the DOT file is created this command is invoked and a PNG and SVG version of the DOT file is created.
The draw_options passed to a given node are used here to produce the desired colours/shapes/etc. in the visualization. Further, global DOT attributes such as "rankdir" can be specified(as it was to produce the images below) in the config/dot_config.json
file. All entries in this JSON file will be added to the top of the DOT output.
The top image is the output produced from running the scripts/run_example.sh
script, which uses the config/example.json
configuration. The below image is the output of the previous visualization implementation.
The large pipeline visualization below is generated from the config/colour_demo.json
configuration (currently found in the feature_dev_two
branch). It contains nodes from each of the node superclasses, ViSQOL, WARP-Q, PESQ, Nested and the default AQP. It's purpose is to show off all the different colours, as well as serve as another example of how the pipeline can be put together.