Skip to content

Commit

Permalink
add inputted data validation to locations API v6 post & put
Browse files Browse the repository at this point in the history
  • Loading branch information
AddisonDunn committed Jan 29, 2024
1 parent cf191cb commit df18983
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 42 deletions.
92 changes: 58 additions & 34 deletions corehq/apps/locations/resources/v0_6.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from corehq.apps.api.resources.auth import RequirePermissionAuthentication
from corehq.apps.locations.models import SQLLocation, LocationType
from corehq.apps.locations.models import SQLLocation
from corehq.apps.users.models import HqPermissions

from corehq.apps.locations.forms import LocationForm
from corehq.apps.locations.resources import v0_5
from corehq.apps.locations.util import (
validate_site_code, generate_site_code, has_siblings_with_name, get_location_type
)
from corehq.apps.locations.views import LocationFieldsView

from django.utils.translation import gettext as _

from tastypie.exceptions import BadRequest

Expand Down Expand Up @@ -44,61 +51,78 @@ def dehydrate(self, bundle):

def obj_create(self, bundle, **kwargs):
domain = kwargs['domain']
if 'name' not in bundle.data:
raise BadRequest("'name' is a required field.")
if SQLLocation.objects.filter(domain=domain, site_code=bundle.data['site_code']).exists():
raise BadRequest("Location on domain with site code already exists.")
if 'name' not in bundle.data or 'location_type_code' not in bundle.data:
raise BadRequest("'name' and 'location_type_code' are required fields.")
bundle.obj = SQLLocation(domain=domain)
self._update(bundle)
self._update(bundle, domain, is_new_location=True)
return bundle

def obj_update(self, bundle, **kwargs):
try:
bundle.obj = SQLLocation.objects.get(location_id=kwargs['location_id'], domain=kwargs['domain'])
except SQLLocation.DoesNotExist:
raise BadRequest("Could not find location with given ID on the domain.")
self._update(bundle)
raise BadRequest(_("Could not find location with given ID on the domain."))
self._update(bundle, kwargs['domain'], is_new_location=False)
return bundle

def _update(self, bundle):
def _update(self, bundle, domain, is_new_location=False):
data = bundle.data
# Invalid fields that might be common
if data.pop('location_type_name', False):
raise BadRequest('Location type name is not editable.')
raise BadRequest(_('Location type name is not editable.'))
if data.pop('last_modified', False):
raise BadRequest('\"last_modified\" field is not editable.')

easy_field_names = ['name', 'site_code', 'latitude', 'longitude']
for field_name in easy_field_names:
if field_name in data:
setattr(bundle.obj, field_name, data.pop(field_name))
raise BadRequest(_('\"last_modified\" field is not editable.'))

# Other, less "easy" fields
if 'parent_location_id' in data:
parent = self._get_parent_location(data.pop('parent_location_id'))
if not is_new_location and 'location_type_code' not in data:
# Otherwise validation of new parent will effectively be done under 'location_type_code'
self._validate_new_parent(domain, bundle.obj, parent)
bundle.obj.parent = parent
if 'name' in data:
self._validate_unique_among_siblings(bundle.obj, data['name'], bundle.obj.parent)
bundle.obj.name = data.pop('name')
if 'site_code' not in data:
bundle.obj.site_code = generate_site_code(domain, bundle.obj.location_id, bundle.obj.name)
if 'site_code' in data:
site_code = validate_site_code(domain, bundle.obj.location_id, data.pop('site_code'), BadRequest)
bundle.obj.site_code = site_code
if 'location_data' in data:
for key, value in data['location_data'].items():
if not isinstance(key, str) or not isinstance(value, str):
raise BadRequest("Location data keys and values must be strings.")
validator = LocationFieldsView.get_validator(domain)
errors = validator(data['location_data'])
if errors:
raise BadRequest(errors)
setattr(bundle.obj, 'metadata', data.pop('location_data'))
if 'location_type_code' in data:
bundle.obj.location_type = self._get_location_type(data.pop('location_type_code'))
if 'parent_location_id' in data:
bundle.obj.parent = self._get_parent_location(data.pop('parent_location_id'))
if 'location_type_code' in data or is_new_location:
bundle.obj.location_type = get_location_type(domain, bundle.obj, bundle.obj.parent,
data.pop('location_type_code', None),
BadRequest, is_new_location)
if 'latitude' in data:
bundle.obj.latitude = data.pop('latitude')
if 'longitude' in data:
bundle.obj.longitude = data.pop('longitude')

if len(data):
raise BadRequest("Invalid fields were included in request.")
raise BadRequest(_("Invalid fields were included in request."))

bundle.obj.save()

def _get_location_type(self, type_code):
try:
return LocationType.objects.get(
code=type_code
)
except LocationType.DoesNotExist:
raise Exception('Could not find location type with the given code.')

def _get_parent_location(self, id):
try:
return SQLLocation.objects.get(location_id=id)
except SQLLocation.DoesNotExist:
raise Exception("Could not find parent location with the given ID.")
raise Exception(_("Could not find parent location with the given ID."))

def _validate_unique_among_siblings(self, location, name, parent):
if has_siblings_with_name(location, name, parent.location_id):
raise BadRequest(
_("Location with same name and parent already exists.")
)

def _validate_new_parent(self, domain, location, parent):
parent_allowed_types = LocationForm.get_allowed_types(domain, parent)
if not parent_allowed_types:
raise BadRequest(_("The selected parent location cannot have child locations!"))
if location.location_type not in parent_allowed_types:
raise BadRequest(_("Parent cannot have children of this location's type."))
self._validate_unique_among_siblings(location, location.name, parent)
78 changes: 70 additions & 8 deletions corehq/apps/locations/tests/test_api_v6.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,30 +151,92 @@ def test_post(self):
key_value_pair in created_location_json['metadata'].items()
for key_value_pair in post_data_location_data.items()))

def test_put_general(self):
def test_successful_put1(self):
put_data = {
"name": "New Denver",
"site_code": "new denver",
"longitude": 33.9012,
"parent_location_id": self.south_park.location_id
"site_code": "new_denver",
"longitude": 33.9012
}
response = self._assert_auth_post_resource(self.single_endpoint(self.location2.location_id),
put_data, method='PUT')
self.assertEqual(response.status_code, 200)

self.location2_updated = SQLLocation.objects.get(location_id=self.location2.location_id)
self.assertEqual(self.location2_updated.name, "New Denver")
self.assertEqual(self.location2_updated.site_code, "new denver")
self.assertEqual(self.location2_updated.site_code, "new_denver")
self.assertEqual(float(self.location2_updated.longitude), 33.9012)
self.assertEqual(self.location2_updated.parent.location_id, self.south_park.location_id)

def test_put_location_type(self):
def test_successful_put2(self):
self.kansas = SQLLocation.objects.create(
domain=self.domain.name,
location_id="4",
name="Kansas",
site_code="kansas",
location_type=self.parent_type
)
put_data = {
"parent_location_id": self.kansas.location_id,
"location_type_code": self.county.code
}
response = self._assert_auth_post_resource(self.single_endpoint(self.location2.location_id),
put_data, method='PUT')
self.assertTrue(response.status_code, 200)
self.assertEqual(response.status_code, 200)

self.location2_updated = SQLLocation.objects.get(location_id=self.location2.location_id)
self.assertEqual(self.location2_updated.location_type.code, self.county.code)

def test_change_location_type_with_children(self):
self.kansas = SQLLocation.objects.create(
domain=self.domain.name,
location_id="4",
name="Kansas",
site_code="kansas",
location_type=self.parent_type
)
put_data = {
"parent_location_id": self.kansas.location_id,
"location_type_code": self.county.code
}
response = self._assert_auth_post_resource(self.single_endpoint(self.location1.location_id),
put_data, method='PUT')
self.assertEqual(response.json(),
{'error': 'You cannot change the location type of a location with children'})
self.assertEqual(response.status_code, 400)

def test_invalid_parent(self):
put_data = {
"parent_location_id": self.south_park.location_id,
}
response = self._assert_auth_post_resource(self.single_endpoint(self.location2.location_id),
put_data, method='PUT')
self.assertEqual(response.json(),
{'error': 'The selected parent location cannot have child locations!'})
self.assertEqual(response.status_code, 400)

def test_name_unique_among_siblings(self):
post_data = post_data = {
"location_type_code": "city",
"name": "Denver",
"parent_location_id": "1",
"site_code": "second_denver"
}
response = self._assert_auth_post_resource(self.list_endpoint, post_data, method='POST')
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json(),
{'error': 'Location with same name and parent already exists.'})

def test_site_code_special_chars(self):
put_data = {
"site_code": "special$char",
}
response = self._assert_auth_post_resource(self.single_endpoint(self.location2.location_id),
put_data, method='PUT')
self.assertEqual(response.status_code, 400)

def test_site_code_unique(self):
put_data = {
"site_code": "south_park",
}
response = self._assert_auth_post_resource(self.single_endpoint(self.location2.location_id),
put_data, method='PUT')
self.assertEqual(response.status_code, 400)

0 comments on commit df18983

Please sign in to comment.