Skip to content

Commit

Permalink
post-stacking cast function
Browse files Browse the repository at this point in the history
  • Loading branch information
cklunch committed Oct 31, 2024
1 parent a076f61 commit 7447e90
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 38 deletions.
Binary file modified dist/neonutilities-1.0.1-py3-none-any.whl
Binary file not shown.
Binary file modified dist/neonutilities-1.0.1.tar.gz
Binary file not shown.
176 changes: 170 additions & 6 deletions src/neonutilities/read_table_neon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,63 @@
# -*- coding: utf-8 -*-

import pandas as pd
import numpy as np
import pyarrow as pa
from pyarrow import dataset
import logging
from .unzip_and_stack import get_variables
logging.basicConfig(level=logging.INFO, format='%(message)s')


def get_variables(v):
"""
Get correct data types for each field in a data table
Parameters
--------
v: A pandas table containing variable definitions
Return
--------
A pyarrow schema for data types based on the variables file
Created on Wed Apr 17 2024
@author: Zachary Nickerson
"""

# function assumes variables are loaded as a pandas data frame.

# create pyarrow schema by translating NEON data types to pyarrow types
for i in range(0, len(v)):
nm = v.fieldName[i]
typ = pa.string()
if v.dataType[i] == "real":
typ = pa.float64()
if v.dataType[i] in ["integer", "unsigned integer", "signed integer"]:
typ = pa.int64()
if v.dataType[i] in ["string", "uri"]:
typ = pa.string()
if v.dataType[i] == "dateTime":
if v.pubFormat[i] in ["yyyy-MM-dd'T'HH:mm:ss'Z'(floor)", "yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss'Z'(round)"]:
typ = pa.timestamp("s", tz="UTC")
else:
if v.pubFormat[i] in ["yyyy-MM-dd(floor)", "yyyy-MM-dd"]:
typ = pa.date64()
else:
if v.pubFormat[i] in ["yyyy(floor)", "yyyy(round)"]:
typ = pa.int64()
else:
typ = pa.string()
if i==0:
vschema = pa.schema([(nm, typ)])
else:
nfield = pa.field(nm, typ)
vschema = vschema.append(nfield)

return vschema


def read_table_neon(data_file,
var_file
):
Expand Down Expand Up @@ -100,16 +151,129 @@ def date_convert(dates):
"""

try:
dout = pd.to_datetime(dates, format="%Y-%m-%dT%H:%M:%S", utc=True)
dout = pd.to_datetime(dates, format="ISO8601", utc=True)
except Exception:
try:
dout = pd.to_datetime(dates, format="%Y-%m-%dT%H:%M", utc=True)
dout = pd.to_datetime(dates, format="%Y-%m-%dT%H:%M:%S", utc=True)
except Exception:
try:
dout = pd.to_datetime(dates, format="%Y-%m-%dT%H", utc=True)
dout = pd.to_datetime(dates, format="%Y-%m-%dT%H:%M", utc=True)
except Exception:
try:
dout = pd.to_datetime(dates, format="%Y-%m-%d", utc=True)
dout = pd.to_datetime(dates, format="%Y-%m-%dT%H", utc=True)
except Exception:
dout = dates
try:
dout = pd.to_datetime(dates, format="%Y-%m-%d", utc=True)
except Exception:
dout = dates
return dout


def get_variables_pandas(v):
"""
Get correct data types for each field in a data table
Parameters
--------
v: A pandas table containing variable definitions
Return
--------
A dictionary of field names and pandas data types based on the variables file
Created on Oct 31 2024
@author: Claire Lunch
"""

dtdict = {}
vf = v["fieldName"]
for i in range(0, len(vf)):
nm = vf[i]
if v["dataType"][i] == "real":
typ = "Float64"
if v["dataType"][i] in ["integer", "unsigned integer", "signed integer"]:
typ = "Int64"
if v["dataType"][i] in ["string", "uri"]:
typ = "string"
if v["dataType"][i] == "dateTime":
typ = "datetime64[ns, UTC]"
dtdict[nm] = typ

return dtdict


def cast_table_neon(data_table,
var_table
):
"""
Cast a NEON data table to the correct data types for each variable, if possible.
Parameters
--------
data_table: NEON data table as a pandas table.
var_table: NEON variables file as a pandas table.
Return
--------
A data frame of a NEON data table, with column classes assigned by data type.
Example
--------
>>> dattab = cast_table_neon(data_table=brd_perpoint,
var_table=variables_10003)
Created on Oct 30 2024
@author: Claire Lunch
"""

# Check inputs formatting
if not isinstance(data_table, pd.DataFrame):
logging.info("Data table input is not a pandas data frame.")
return None

if not isinstance(var_table, pd.DataFrame):
logging.info("Variables table input is not a pandas data frame.")
return None

# Check this is a valid variables file
if any(x in ['category', 'system', 'stat'] for x in list(var_table.columns)):
logging.info('var_table appears to match DP4.00200.001. Data wrangling for surface-atmosphere exchange data is currently only available in the R package version of neonUtilities.')
return None
else:
if not any(x in ['table', 'fieldName', 'dataType'] for x in list(var_table.columns)):
logging.info('var_table is not a variables file, or is missing critical values.')
return None

# get data types
vdt = get_variables_pandas(var_table)

# get field names from the data table
tabcols = list(data_table.columns)
cast_table = data_table

# iterate over columns and try to cast each
for i in tabcols:
if i not in vdt.keys():
continue
else:
if vdt[i] in ["Float64", "Int64"]:
try:
dtemp = cast_table[i].replace(r'^\s*$', np.nan, regex=True)
cast_table[i] = dtemp.astype(vdt[i])
except Exception:
logging.info(f"Field {i} could not be cast to type {vdt[i]}. Data read as string type.")
cast_table[i] = data_table[i]
continue
if vdt[i]=="datetime64[ns, UTC]" and not i=="publicationDate":
try:
cast_table[i] = date_convert(data_table[i])
except Exception:
logging.info(f"Field {i} could not be cast to type {vdt[i]}. Data read as string type.")
cast_table[i] = data_table[i]
continue

return cast_table
69 changes: 37 additions & 32 deletions src/neonutilities/unzip_and_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .get_issue_log import get_issue_log
from .citation import get_citation
from .helper_mods.api_helpers import readme_url
from .read_table_neon import get_variables, cast_table_neon
from . import __resources__
import logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
Expand Down Expand Up @@ -164,51 +165,31 @@ def get_recent_publication(filepaths):
return recent_files


def get_variables(v):

def string_schema(v):
"""
Get correct data types
Assign all variables to string. Fallback option if table read with true schema fails
Parameters
--------
v: A file that contains variable definition
v: A pandas table containing variable definitions
Return
--------
A pyarrow schema for data types based on the variables file
A pyarrow schema for all string data types based on the variable names
Created on Wed Apr 17 2024
Created on Oct 29 2024
@author: Zachary Nickerson
@author: Claire Lunch
"""

# function assumes variables are loaded as a pandas data frame.

# create pyarrow schema by translating NEON data types to pyarrow types
for i in range(0, len(v)):
nm = v.fieldName[i]
typ = pa.string()
if v.dataType[i] == "real":
typ = pa.float64()
if v.dataType[i] in ["integer", "unsigned integer", "signed integer"]:
typ = pa.int64()
if v.dataType[i] in ["string", "uri"]:
typ = pa.string()
if v.dataType[i] == "dateTime":
if v.pubFormat[i] in ["yyyy-MM-dd'T'HH:mm:ss'Z'(floor)", "yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss'Z'(round)"]:
typ = pa.timestamp("s", tz="UTC")
else:
if v.pubFormat[i] in ["yyyy-MM-dd(floor)", "yyyy-MM-dd"]:
typ = pa.date64()
else:
if v.pubFormat[i] in ["yyyy(floor)", "yyyy(round)"]:
typ = pa.int64()
else:
typ = pa.string()
if i==0:
vschema = pa.schema([(nm, typ)])
if i == 0:
vschema = pa.schema([(nm, pa.string())])
else:
nfield = pa.field(nm, typ)
nfield = pa.field(nm, pa.string())
vschema = vschema.append(nfield)

return vschema
Expand Down Expand Up @@ -838,15 +819,39 @@ def stack_data_files_parallel(folder,
repl="", string=b) for b in tablepaths]
dat = dataset.dataset(source=tablebuckets, filesystem=gcs,
format="csv", schema=tableschema)

else:
dat = dataset.dataset(source=tablepaths,
format="csv", schema=tableschema)

cols = tableschema.names
cols.append("__filename")
dattab = dat.to_table(columns=cols)

# attempt to stack to table. if it fails, stack as all string fields and warn
stringset = False
try:
dattab = dat.to_table(columns=cols)
except Exception:
try:
stringschema = string_schema(tablepkgvar)
if cloud_mode:
dat = dataset.dataset(source=tablebuckets, filesystem=gcs,
format="csv", schema=stringschema)
else:
dat = dataset.dataset(source=tablepaths,
format="csv", schema=stringschema)
dattab = dat.to_table(columns=cols)
logging.info(f"Table {j} schema did not match data; all variable types set to string. Data type casting will be attempted after stacking step.")
stringset = True
except Exception:
logging.info(f"Failed to stack table {j}. Check input data and variables file.")
continue

pdat = dattab.to_pandas()
if stringset:
try:
pdat = cast_table_neon(pdat, tablepkgvar)
except Exception:
logging.info(f"Data type casting failed for table {j}. Variable types set to string.")

# append publication date
pubr = re.compile("20[0-9]{6}T[0-9]{6}Z")
Expand Down

0 comments on commit 7447e90

Please sign in to comment.