Skip to content

Commit

Permalink
MRG: Merge pull request #40 from aerosense-ai/fix/add-diff-baros-sens…
Browse files Browse the repository at this point in the history
…or-type-to-big-query

Add ability to add sensor types to BigQuery dataset via CLI
  • Loading branch information
cortadocodes authored Feb 16, 2022
2 parents e7d6fd4 + 72eaf93 commit 8923b57
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 92 deletions.
76 changes: 54 additions & 22 deletions cloud_functions/big_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

from blake3 import blake3
from google.cloud import bigquery
from slugify import slugify

from exceptions import ConfigurationAlreadyExists, InstallationWithSameNameAlreadyExists
from exceptions import (
ConfigurationAlreadyExists,
InstallationWithSameNameAlreadyExists,
SensorTypeWithSameReferenceAlreadyExists,
)


logger = logging.getLogger(__name__)
Expand All @@ -18,6 +21,7 @@
"Mics": "microphone",
"Baros_P": "barometer",
"Baros_T": "barometer_thermometer",
"Diff_Baros": "differential_barometer",
"Acc": "accelerometer",
"Gyro": "gyroscope",
"Mag": "magnetometer",
Expand Down Expand Up @@ -116,18 +120,31 @@ def record_microphone_data_location_and_metadata(

logger.info("Added microphone data location and metadata to BigQuery dataset %r.", self.dataset_id)

def add_sensor_type(self, name, description=None, measuring_unit=None, metadata=None):
def add_sensor_type(self, name, reference, description=None, measuring_unit=None, metadata=None):
"""Add a new sensor type to the BigQuery dataset. The sensor name is slugified on receipt.
:param str name: the name of the new sensor
:param str reference: the reference name for the sensor (usually slugified)
:param str|None description: a description of what the sensor is and does
:param str|None measuring_unit: the unit the sensor measures its relevant quantity in
:param dict|None metadata: any useful metadata about the sensor e.g. sensitivities
:raise ValueError: if the addition fails
:return None:
"""
reference = slugify(name)
metadata = json.dumps(metadata or {})
sensor_type_already_exists = self._get_field_if_exists(
table_name=self.table_names["sensor_type"],
field_name="reference",
comparison_field_name="reference",
value=reference,
)

if sensor_type_already_exists:
raise SensorTypeWithSameReferenceAlreadyExists(
f"A sensor type with the reference {reference!r} already exists."
)

if not isinstance(metadata, str):
metadata = json.dumps(metadata or {})

errors = self.client.insert_rows(
table=self.client.get_table(self.table_names["sensor_type"]),
Expand Down Expand Up @@ -160,15 +177,11 @@ def add_installation(self, reference, turbine_id, blade_id, hardware_version, se
:raise ValueError: if the addition fails
:return None:
"""
installation_already_exists = (
len(
list(
self.client.query(
f"SELECT 1 FROM `{self.table_names['installation']}` WHERE `reference`='{reference}' LIMIT 1"
).result()
)
)
> 0
installation_already_exists = self._get_field_if_exists(
table_name=self.table_names["installation"],
field_name="reference",
comparison_field_name="reference",
value=reference,
)

if installation_already_exists:
Expand Down Expand Up @@ -211,17 +224,17 @@ def add_configuration(self, configuration):
software_configuration_json = json.dumps(configuration)
software_configuration_hash = blake3(software_configuration_json.encode()).hexdigest()

configurations = list(
self.client.query(
f"SELECT id FROM `{self.table_names['configuration']}` WHERE `software_configuration_hash`='{software_configuration_hash}' "
f"LIMIT 1"
).result()
configuration_id = self._get_field_if_exists(
table_name=self.table_names["configuration"],
field_name="id",
comparison_field_name="software_configuration_hash",
value=software_configuration_hash,
)

if len(configurations) > 0:
if configuration_id:
raise ConfigurationAlreadyExists(
f"An identical configuration already exists in the database with UUID {configurations[0].id}.",
configurations[0].id,
f"An identical configuration already exists in the database with UUID {configuration_id}.",
configuration_id,
)

configuration_id = str(uuid.uuid4())
Expand All @@ -246,3 +259,22 @@ def add_configuration(self, configuration):

logger.info("Added configuration %r to BigQuery dataset %r.", configuration_id, self.dataset_id)
return configuration_id

def _get_field_if_exists(self, table_name, field_name, comparison_field_name, value):
"""Get the value of the given field for the row of the given table for which the comparison field has the
given value.
:param str table_name:
:param str field_name:
:param str comparison_field_name:
:param any value:
:return str|None:
"""
result = list(
self.client.query(
f"SELECT {field_name} FROM `{table_name}` WHERE `{comparison_field_name}`='{value}' LIMIT 1"
).result()
)

if result:
return getattr(result[0], field_name)
4 changes: 4 additions & 0 deletions cloud_functions/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class ConfigurationAlreadyExists(BaseException):

class InstallationWithSameNameAlreadyExists(BaseException):
pass


class SensorTypeWithSameReferenceAlreadyExists(BaseException):
pass
8 changes: 8 additions & 0 deletions cloud_functions/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@ class CreateInstallationForm(FlaskForm):
sensor_coordinates = StringField("Sensor coordinates", [validators.DataRequired()])
longitude = FloatField("Longitude", [validators.Optional()])
latitude = FloatField("Latitude", [validators.Optional()])


class AddSensorTypeForm(FlaskForm):
name = StringField("Name", [validators.DataRequired()])
reference = StringField("Reference", [validators.DataRequired(), SlugifiedValidator()])
description = StringField("Description", [validators.Optional()])
measuring_unit = StringField("Measuring unit", [validators.Optional()])
metadata = StringField("Metadata", [validators.Optional()])
50 changes: 47 additions & 3 deletions cloud_functions/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from octue.log_handlers import apply_log_handler

from big_query import BigQueryDataset
from exceptions import InstallationWithSameNameAlreadyExists
from forms import CreateInstallationForm
from exceptions import InstallationWithSameNameAlreadyExists, SensorTypeWithSameReferenceAlreadyExists
from forms import AddSensorTypeForm, CreateInstallationForm
from window_handler import WindowHandler


Expand Down Expand Up @@ -40,6 +40,50 @@ def clean_and_upload_window(event, context):
window_handler.persist_window(cleaned_window, window_metadata)


def add_sensor_type(request):
"""Add a new sensor type to the BigQuery dataset. This is the entrypoint for the `add-sensor-type` cloud function."""
form = AddSensorTypeForm(meta={"csrf": False})

if request.method != "POST":
return {"nonFieldErrors": "Method Not Allowed. Try 'POST'."}, 405

if form.validate_on_submit():
try:
dataset = BigQueryDataset(
project_name=os.environ["DESTINATION_PROJECT_NAME"],
dataset_name=os.environ["BIG_QUERY_DATASET_NAME"],
)

dataset.add_sensor_type(
reference=form.reference.data,
name=form.name.data,
description=form.description.data,
measuring_unit=form.measuring_unit.data,
metadata=form.metadata.data,
)

except SensorTypeWithSameReferenceAlreadyExists:
return {
"fieldErrors": {
"reference": f"A sensor type with the reference {form.reference.data!r} already exists."
}
}, 409

except Exception as e:
logger.exception(e)
return {"nonFieldErrors": f"An error occurred. Form data was: {form.data}"}, 500

return form.data, 200

logger.error(json.dumps(form.errors))

# Reduce lists of form field errors to single items.
for field, error_messages in form.errors.items():
form.errors[field] = error_messages[0] if len(error_messages) > 0 else "Unknown field error"

return {"fieldErrors": form.errors}, 400


def create_installation(request):
"""Create a new installation in the BigQuery dataset. This is the entrypoint for the `create-installation` cloud
function.
Expand Down Expand Up @@ -83,7 +127,7 @@ def create_installation(request):

return form.data, 200

logger.info(json.dumps(form.errors))
logger.error(json.dumps(form.errors))

# Reduce lists of form field errors to single items.
for field, error_messages in form.errors.items():
Expand Down
17 changes: 17 additions & 0 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ steps:
- --region=europe-west6
- --set-env-vars=SOURCE_PROJECT_NAME=aerosense-twined,DESTINATION_PROJECT_NAME=aerosense-twined,DESTINATION_BUCKET_NAME=test-data-gateway-processed-data,BIG_QUERY_DATASET_NAME=test_greta
- --timeout=540

- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
id: Deploy create-installation cloud function
args:
Expand All @@ -29,6 +30,22 @@ steps:
- --security-level=secure-always
- --region=europe-west6
- --set-env-vars=DESTINATION_PROJECT_NAME=aerosense-twined,BIG_QUERY_DATASET_NAME=greta

- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
id: Deploy add-sensor-type cloud function
args:
- gcloud
- functions
- deploy
- add-sensor-type
- --source=cloud_functions
- --entry-point=add_sensor_type
- --runtime=python39
- --trigger-http
- --security-level=secure-always
- --region=europe-west6
- --set-env-vars=DESTINATION_PROJECT_NAME=aerosense-twined,BIG_QUERY_DATASET_NAME=greta

- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
id: Deploy ingress-eu cloud function
args:
Expand Down
57 changes: 56 additions & 1 deletion data_gateway/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

SUPERVISORD_PROGRAM_NAME = "AerosenseGateway"
CREATE_INSTALLATION_CLOUD_FUNCTION_URL = "https://europe-west6-aerosense-twined.cloudfunctions.net/create-installation"
ADD_SENSOR_TYPE_CLOUD_FUNCTION_URL = "https://europe-west6-aerosense-twined.cloudfunctions.net/add-sensor-type"

global_cli_context = {}

Expand Down Expand Up @@ -243,7 +244,61 @@ def create_installation(config_file):
if not response.status_code == 200:
raise HTTPError(f"{response.status_code}: {response.text}")

logger.info("Installation created: %r", parameters)
print(f"Installation created: {parameters!r}")


@gateway_cli.command()
@click.argument("name", type=str)
@click.option(
"--description",
type=str,
default=None,
help="A description of the sensor type.",
)
@click.option(
"--measuring-unit",
type=str,
default=None,
help="The SI unit that the sensor type measures the relevant quantity in.",
)
@click.option(
"--metadata",
type=str,
default=None,
help="Other metadata about the sensor type in JSON format.",
)
def add_sensor_type(name, description, measuring_unit, metadata):
"""Add a sensor type to the BigQuery dataset.
NAME: The name of the sensor type
"""
reference = slugify(name)

while True:
user_confirmation = input(f"Add sensor type with reference {reference!r}? [Y/n]\n")

if user_confirmation.upper() == "N":
return

if user_confirmation.upper() in {"Y", ""}:
break

# Required parameters:
parameters = {"name": name, "reference": reference}

# Optional parameters:
for name, parameter in (("description", description), ("measuring_unit", measuring_unit), ("metadata", metadata)):
if parameter:
parameters[name] = parameter

print("Creating...")

response = requests.post(url=ADD_SENSOR_TYPE_CLOUD_FUNCTION_URL, json=parameters)

if not response.status_code == 200:
raise HTTPError(f"{response.status_code}: {response.text}")

print(f"New sensor type added: {parameters!r}")


@gateway_cli.command()
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

setup(
name="data_gateway",
version="0.10.1",
version="0.11.0",
install_requires=[
"click>=7.1.2",
"pyserial==3.5",
"python-slugify==5.0.2",
"python-slugify>=5,<6",
"octue==0.10.5",
],
url="https://gitlab.com/windenergie-hsr/aerosense/digital-twin/data-gateway",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cloud_functions/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def query(self, query):
:param str query:
:return MockQueryResult:
"""
self.query = query
self._query = query
return MockQueryResult(result=self.expected_query_result)


Expand Down
Loading

0 comments on commit 8923b57

Please sign in to comment.