Skip to content

Commit

Permalink
Infer and apply schema from query results (#22)
Browse files Browse the repository at this point in the history
* Infer and apply schema from query results

- Pull query results in application/sparql-results+json format
- Map metadata to Table Schema
- Apply schema to query results (table and dataframe)
- Expose query result schema
- Extra: Keep legacy ~/.data.world post-migration in case it's used by R SDK

TODO: Implement more deterministic behavior related to schema inferencing when it's not possible to deterministically infer schema from SPARQL query results.
  • Loading branch information
rflprr authored Mar 30, 2017
1 parent 59c7f95 commit b25557b
Show file tree
Hide file tree
Showing 35 changed files with 2,828 additions and 203 deletions.
66 changes: 40 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ It will download a given dataset's [datapackage](http://specs.frictionlessdata.i
and store it under `~/.dw/cache`. When used subsequently, `load_dataset()` will use the copy stored on disk and will
work offline, unless it's called with `force_update=True`.

Once loaded, a dataset (data and metadata) can be conveniently access via the object returned by `load_dataset()`.
Once loaded, a dataset (data and metadata) can be conveniently accessed via the object returned by `load_dataset()`.

Start by importing the `datadotworld` module:
```python
Expand All @@ -45,7 +45,7 @@ intro_dataset = dw.load_dataset('jonloyens/an-intro-to-dataworld-dataset')

Dataset objects allow access to data via three different properties `raw_data`, `tables` and `dataframes`.
Each of these properties is a mapping (dict) whose values are of type `bytes`, `list` and `pandas.DataFrame`,
respectively. Values are lazy loaded and cached once loaded. Their keys are the names of the resources (files)
respectively. Values are lazy loaded and cached once loaded. Their keys are the names of the files
contained in the dataset.

For example:
Expand All @@ -65,36 +65,39 @@ For example:
```python
>>> stats_table = intro_dataset.tables['datadotworldbballstats']
>>> stats_table[0]
OrderedDict([('Name', 'Jon'), ('PointsPerGame', Decimal('20.4')), ('AssistsPerGame', Decimal('1.3'))])
OrderedDict([('Name', 'Jon'),
('PointsPerGame', Decimal('20.4')),
('AssistsPerGame', Decimal('1.3'))])
```

You can also review the metadata associated with a file or the entire dataset, using the `describe` function.
For example:
```python
>>> intro_dataset.describe()
{
'name': 'jonloyens_an-intro-to-dataworld-dataset',
'homepage': 'https://data.world/jonloyens/an-intro-to-dataworld-dataset',
'resources': [
{'path': 'data/ChangeLog.csv', 'name': 'changelog', 'format': 'csv'},
{'path': 'data/DataDotWorldBBallStats.csv', 'name': 'datadotworldbballstats', 'format': 'csv'},
{'path': 'data/DataDotWorldBBallTeam.csv', 'name': 'datadotworldbballteam', 'format': 'csv'}
]
}
{'homepage': 'https://data.world/jonloyens/an-intro-to-dataworld-dataset',
'name': 'jonloyens_an-intro-to-dataworld-dataset',
'resources': [{'format': 'csv',
'name': 'changelog',
'path': 'data/ChangeLog.csv'},
{'format': 'csv',
'name': 'datadotworldbballstats',
'path': 'data/DataDotWorldBBallStats.csv'},
{'format': 'csv',
'name': 'datadotworldbballteam',
'path': 'data/DataDotWorldBBallTeam.csv'}]}


>>> intro_dataset.describe('datadotworldbballstats')
{
'path': 'data/DataDotWorldBBallStats.csv',
'name': 'datadotworldbballstats',
'format': 'csv',
'schema': {
'fields': [
{'name': 'Name', 'type': 'string', 'title': 'Name'},
{'name': 'PointsPerGame', 'type': 'number', 'title': 'PointsPerGame'},
{'name': 'AssistsPerGame', 'type': 'number', 'title': 'AssistsPerGame'}
]
}
}
{'format': 'csv',
'name': 'datadotworldbballstats',
'path': 'data/DataDotWorldBBallStats.csv',
'schema': {'fields': [{'name': 'Name', 'title': 'Name', 'type': 'string'},
{'name': 'PointsPerGame',
'title': 'PointsPerGame',
'type': 'number'},
{'name': 'AssistsPerGame',
'title': 'AssistsPerGame',
'type': 'number'}]}}
```

### Query a dataset
Expand All @@ -107,7 +110,7 @@ For example:
results = dw.query('jonloyens/an-intro-to-dataworld-dataset', 'SELECT * FROM DataDotWorldBBallStats')
```

Query result objects allow access to the data via `raw_data`, `table` and `dataframe` properties, of type `str`, `list`
Query result objects allow access to the data via `raw_data`, `table` and `dataframe` properties, of type `json`, `list`
and `pandas.DataFrame`, respectively.

For example:
Expand All @@ -128,12 +131,23 @@ Tables are lists of rows, each represented by a mapping (dict) of column names t
For example:
```python
>>> results.table[0]
OrderedDict([('Name', 'Jon'), ('PointsPerGame', '20.4'), ('AssistsPerGame', '1.3')])
OrderedDict([('Name', 'Jon'),
('PointsPerGame', Decimal('20.4')),
('AssistsPerGame', Decimal('1.3'))])
```

To query using `SPARQL` invoke `query()` using `query_type='sparql'`, or else, it will assume
the query to be a `SQL` query.

Just like in the dataset case, you can view the metadata associated with a query result using the `describe()` function.
For example:
```python
>>> results.describe()
{'fields': [{'name': 'Name', 'type': 'string'},
{'name': 'PointsPerGame', 'type': 'number'},
{'name': 'AssistsPerGame', 'type': 'number'}]}
```

### Create and update datasets

To create and update datasets, start by calling the `api_client` function.
Expand Down
2 changes: 1 addition & 1 deletion datadotworld/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@

from datadotworld.datadotworld import load_dataset, query, api_client

__version__ = '1.0.0-beta.4'
__version__ = '1.0.0-beta.5'
74 changes: 52 additions & 22 deletions datadotworld/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,42 +45,47 @@ class Config(object):

def __init__(self, profile='default', **kwargs):
# Overrides, for testing
config_file_path = path.expanduser(
self._config_file_path = path.expanduser(
kwargs.get('config_file_path', '~/.dw/config'))
legacy_file_path = path.expanduser(
kwargs.get('legacy_file_path', '~/.data.world'))

if not path.isdir(path.dirname(config_file_path)):
os.makedirs(path.dirname(config_file_path))
if not path.isdir(path.dirname(self._config_file_path)):
os.makedirs(path.dirname(self._config_file_path))

config_parser = (configparser.ConfigParser()
if six.PY3 else configparser.SafeConfigParser())
if path.isfile(config_file_path):
config_parser.read_file(open(config_file_path))
self._config_parser = (configparser.ConfigParser()
if six.PY3 else configparser.SafeConfigParser())

if path.isfile(self._config_file_path):
self._config_parser.read_file(open(self._config_file_path))
if self.__migrate_invalid_defaults(self._config_parser) > 0:
self.save()
elif path.isfile(legacy_file_path):
config_parser = self.__migrate_config(legacy_file_path,
config_file_path)
self._config_parser = self.__migrate_config(legacy_file_path,
self._config_file_path)
self.save()

self._config_file_path = config_file_path
self._config_parser = config_parser
self._profile = profile
self._section = (profile
if profile.lower() != configparser.DEFAULTSECT.lower()
else configparser.DEFAULTSECT)

self.tmp_dir = path.expanduser(tempfile.gettempdir())

self.cache_dir = path.expanduser('~/.dw/cache')
if not path.isdir(path.dirname(self.cache_dir)):
os.makedirs(path.dirname(self.cache_dir))

@property
def auth_token(self):
self.__validate_config()
return self._config_parser.get(self._profile, 'auth_token')
return self._config_parser.get(self._section, 'auth_token')

@auth_token.setter
def auth_token(self, auth_token):
if not self._config_parser.has_section(self._profile):
self._config_parser.add_section(self._profile)
self._config_parser.set(self._profile, 'auth_token', auth_token)
if (self._section != configparser.DEFAULTSECT and
not self._config_parser.has_section(self._section)):
self._config_parser.add_section(self._section)
self._config_parser.set(self._section, 'auth_token', auth_token)

def save(self):
"""Persist config changes"""
Expand All @@ -93,7 +98,7 @@ def __validate_config(self):
'Configuration file not found at {}.'
'To fix this issue, run dw configure'.format(
self._config_file_path))
if not self._config_parser.has_option(self._profile, 'auth_token'):
if not self._config_parser.has_option(self._section, 'auth_token'):
raise RuntimeError(
'The {0} profile is not properly configured. '
'To fix this issue, run dw -p {0} configure'.format(
Expand All @@ -103,15 +108,40 @@ def __validate_config(self):
def __migrate_config(legacy_file_path, target_file_path):
config_parser = configparser.ConfigParser()

with open(legacy_file_path, 'r') as legacy, open(target_file_path,
'w') as target:
with open(legacy_file_path, 'r') as legacy:
regex = re.compile(r"^token\s*=\s*(\S.*)$")
token = next(iter(
[regex.match(line.strip()).group(1) for line in legacy if
regex.match(line)]),
None)
config_parser['default'] = {'auth_token': token}
config_parser.write(target)
config_parser[configparser.DEFAULTSECT] = {'auth_token': token}

# Will leave legacy in case R SDK may still need it
# os.remove(legacy_file_path)

os.remove(legacy_file_path)
return config_parser

@staticmethod
def __migrate_invalid_defaults(config_parser):
# This fixes an issue related to us having referred to the default
# section in the config file as 'default' as opposed to using
# configparser.DEFAULTSECT
# That may result in 'ValueError: Invalid section name: default'
# https://github.com/datadotworld/data.world-py/issues/18
invalid_defaults = []
for section in config_parser.sections():
# Doesn't include DEFAULTSECT, but checking nonetheless
if (section != configparser.DEFAULTSECT and
section.lower() == configparser.DEFAULTSECT.lower()):
invalid_defaults.append(section)

if len(invalid_defaults) == 1:
old_default = invalid_defaults[0]
config_parser[configparser.DEFAULTSECT] = {
option: config_parser.get(old_default, option)
for option in config_parser.options(old_default)}

for section in invalid_defaults:
config_parser.remove_section(section)

return len(invalid_defaults)
7 changes: 4 additions & 3 deletions datadotworld/datadotworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ def query(self, dataset_key, query, query_type="sql"):
query_type, owner_id, dataset_id)
headers = {
'User-Agent': _user_agent(),
'Accept': 'text/csv',
'Accept': 'application/sparql-results+json',
'Authorization': 'Bearer {0}'.format(self._config.auth_token)
}
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
return QueryResults(response.text)
raise RuntimeError('Error executing query: {}'.format(response.text))
return QueryResults(response.json())
raise RuntimeError(
'Error executing query: {}'.format(response.content))

def load_dataset(self, dataset_key, force_update=False):
"""
Expand Down
17 changes: 12 additions & 5 deletions datadotworld/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

import datapackage
import six
from datadotworld.models.util import (sanitize_table_schema,
align_table_fields,
patch_jsontableschema_pandas)
from datadotworld.models.table_schema import (sanitize_resource_schema,
order_columns_in_row,
patch_jsontableschema_pandas)
from datadotworld.util import LazyLoadedDict, memoized
from datapackage.resource import TabularResource
from jsontableschema.exceptions import SchemaValidationError
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(self, descriptor_file):
# Index resources by name
self.__resources = {r.descriptor['name']: r
for r in self._datapackage.resources}
self.__tabular_resources = {k: sanitize_table_schema(r)
self.__tabular_resources = {k: sanitize_resource_schema(r)
for (k, r) in self.__resources.items()
if type(r) is TabularResource}
self.__invalid_schemas = [] # Resource names with invalid schemas
Expand Down Expand Up @@ -111,6 +111,7 @@ def describe(self, resource=None):
``resource`` is specified in the call.
"""
if resource is None:
# Show simpler descriptor, omitting schema definitions
simple_descriptor = copy.deepcopy(self._datapackage.descriptor)
for resource in simple_descriptor['resources']:
resource.pop('schema', None)
Expand All @@ -130,6 +131,7 @@ def _load_raw_data(self, resource_name):

@memoized(key_mapper=lambda self, resource_name: resource_name)
def _load_table(self, resource_name):
"""Build table structure from resource data"""
tabular_resource = self.__tabular_resources[resource_name]

try:
Expand All @@ -142,7 +144,7 @@ def _load_table(self, resource_name):
elif len(tabular_resource.data) > 0:
fields = tabular_resource.data[0].keys()

return [align_table_fields(fields, row) for row in
return [order_columns_in_row(fields, row) for row in
tabular_resource.data]
except (SchemaValidationError, ValueError, TypeError) as e:
warnings.warn(
Expand All @@ -159,6 +161,11 @@ def _load_table(self, resource_name):

@memoized(key_mapper=lambda self, resource_name: resource_name)
def _load_dataframe(self, resource_name):
"""Build pandas.DataFrame from resource data
Lazy load any optional dependencies in order to allow users to
use package without installing pandas if so they wish.
"""
self.__initialize_storage()

rows = self.tables[resource_name]
Expand Down
Loading

0 comments on commit b25557b

Please sign in to comment.