Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Socrata Data Nodes #306

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions knime_extension/geospatial_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ dependencies:
- pip:
- ipinfo==4.4.3
- pulp==2.7.0
- sodapy==2.2.0
216 changes: 216 additions & 0 deletions knime_extension/src/nodes/opendata.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,3 +878,219 @@ def execute(self, exec_context: knext.ExecutionContext):
crs="EPSG:4326",
)
return knext.Table.from_pandas(gdf)


############################################
# Socrata Search
############################################
@knext.node(
name="Socrata Search",
node_type=knext.NodeType.SOURCE,
icon_path=__NODE_ICON_PATH + "Socrata Search.png",
category=__category,
after="",
)
@knext.output_table(
name="Socrata dataset list",
description="Socrata dataset list from a wealth of open data resources from governments, non-profits, and NGOs around the world based on the query term.",
)
class SocrataSearchNode:
"""Access open datasets from various well-known data resources and organizations effortlessly using the SODA interface.

US Centers for Disease Control and Prevention (CDC): CDC data includes information on infectious diseases, chronic conditions, environmental health hazards,
injury prevention, maternal and child health, immunization coverage, and much more. These datasets are collected through surveillance systems, population surveys,
epidemiological studies, and collaborative research efforts conducted by the CDC and its partners.

Data.gov: The official open data platform of the United States government, offering datasets from various U.S. government agencies covering fields such as education,
healthcare, transportation, and the environment.

Chicago Data Portal: The open data platform provided by the City of Chicago, offering datasets related to the city, including crime data, transportation data, demographic statistics, and more.

NYC Open Data: The open data platform provided by the City of New York, offering datasets covering urban planning, public transportation, health, and various other aspects of the city.

UK Government Data Service: The open data platform provided by the UK government, offering datasets from various governmental bodies covering economics, social issues, the environment, and more.

World Bank Data: The open data platform provided by the World Bank, offering a wide range of economic, social, and environmental datasets from around the world for research and analysis of global development trends.

The Socrata Open Data API (SODA) is a powerful tool designed for programmatically accessing a vast array of open data resources from various organizations around the world, including governments, non-profits,and NGOs..
This node uses the [SODA Consumer API](https://dev.socrata.com/consumers/getting-started.html) to get the dataset list.
"""

queryitem = knext.StringParameter(
label="Input searching item",
description="""Enter search keywords or dataset names to find relevant datasets in the Socrata database.
This search is not case-sensitive and can include multiple words separated by spaces. """,
default_value="Massachusetts",
)

timeout = knext.IntParameter(
label="Request timeout in seconds",
description="The timeout in seconds for the request API.",
default_value=120,
min_value=1,
is_advanced=True,
)

def configure(self, configure_context):
# TODO Create combined schema
return None

def execute(self, exec_context: knext.ExecutionContext):
from urllib.request import Request, urlopen
import pandas as pd
import json
from pandas import json_normalize
from urllib.parse import quote

query_item = self.queryitem
encoded_query_item = quote(query_item)
request = Request(
f"http://api.us.socrata.com/api/catalog/v1?q={encoded_query_item}&only=datasets&limit=10000"
)

response = urlopen(request, timeout=self.timeout)
response_body = response.read()

# Load the JSON response into a Python dictionary
data = json.loads(response_body)

# Extract the "results" key, which contains the dataset information
dataset_info = data["results"]

# Create a DataFrame from the dataset information, and flatten the nested dictionaries
df = json_normalize(dataset_info)
# Check if columns exist before dropping them
columns_to_drop = [
"classification.domain_tags",
"classification.domain_metadata",
]
columns_to_drop = [col for col in columns_to_drop if col in df.columns]
df = df.drop(columns=columns_to_drop)

# Find List
list_columns = [
col for col in df.columns if any(isinstance(item, list) for item in df[col])
]

# Drop error list column
for col in list_columns:
try:
df[col] = df[col].apply(
lambda x: ", ".join(x) if isinstance(x, list) else x
)
except Exception as e:
df.drop(columns=[col], inplace=True)

# Drop columns that cannot be saved in KNIME
drop_columns = []
for col in df.columns:
try:
# Attempt to convert the column to a KNIME-compatible data type
knime_table = knext.Table.from_pandas(df[[col]])
except Exception as e:
# If an exception is raised, add the column to the list of columns to drop
drop_columns.append(col)

# Drop the columns that cannot be saved in KNIME
df.drop(columns=drop_columns, inplace=True)
df.replace("?", pd.NA, inplace=True)
df.replace("", pd.NA, inplace=True)
df.dropna(axis=1, how="all", inplace=True)
df = df.reset_index(drop=True)
return knext.Table.from_pandas(df)


############################################
# Socrata Data Query
############################################
@knext.node(
name="Socrata Data Query",
node_type=knext.NodeType.SOURCE,
icon_path=__NODE_ICON_PATH + "Socrata Data Query.png",
category=__category,
after="",
)
@knext.output_table(
name="Socrata dataset",
description="Socrata dataset based on search keywords",
)
class SocrataDataNode:
"""Retrieve the open data category via Socrata API.

US Centers for Disease Control and Prevention (CDC): CDC data includes information on infectious diseases, chronic conditions, environmental health hazards,
injury prevention, maternal and child health, immunization coverage, and much more. These datasets are collected through surveillance systems, population surveys,
epidemiological studies, and collaborative research efforts conducted by the CDC and its partners.

Data.gov: The official open data platform of the United States government, offering datasets from various U.S. government agencies covering fields such as education,
healthcare, transportation, and the environment.

Chicago Data Portal: The open data platform provided by the City of Chicago, offering datasets related to the city, including crime data, transportation data, demographic statistics, and more.

NYC Open Data: The open data platform provided by the City of New York, offering datasets covering urban planning, public transportation, health, and various other aspects of the city.

UK Government Data Service: The open data platform provided by the UK government, offering datasets from various governmental bodies covering economics, social issues, the environment, and more.

World Bank Data: The open data platform provided by the World Bank, offering a wide range of economic, social, and environmental datasets from around the world for research and analysis of global development trends.

The Socrata Open Data API (SODA) is a powerful tool designed for programmatically accessing a vast array of open data resources from various organizations around the world, including governments, non-profits,and NGOs..
This node uses the [SODA Consumer API](https://dev.socrata.com/consumers/getting-started.html) to get the dataset from a dataset list generated by Socrata Search Node.

For instance, this dataset [Incidence Rate Of Breast Cancer](https://opendata.utah.gov/Health/Incidence-Rate-Of-Breast-Cancer-Per-100-000-All-St/q22t-rbk9) has a resource_id of "q22t-rbk9" and a metadata domain of "opendata.utah.gov".
They can be found in the link under API,"https://opendata.utah.gov/resource/q22t-rbk9.json". Both the two items will be used for data retriving.
"""

metadata_domain = knext.StringParameter(
label="Metadata domain",
description="""The value in the column metadata.domain of a table generated by a Socrata Search node. """,
default_value="",
)

resource_id = knext.StringParameter(
label="Resource ID",
description="""The value in the column resource.id of a table generated by a Socrata Search node. """,
default_value="",
)

timeout = knext.IntParameter(
label="Request timeout in seconds",
description="The timeout in seconds for the request API.",
default_value=120,
min_value=1,
is_advanced=True,
)

def configure(self, configure_context):
# TODO Create combined schema
return None

def execute(self, exec_context: knext.ExecutionContext):
import pandas as pd
import json
import pandas as pd
from sodapy import Socrata

# Unauthenticated client only works with public data sets. Note 'None'
# in place of application token, and no username or password:
client = Socrata(self.metadata_domain, None)
client.timeout = self.timeout
limit = 100000
offset = 0
all_results = []
while True:
results = client.get(self.resource_id, limit=limit, offset=offset)
if not results:
break
all_results.extend(results)
offset += limit
# Example authenticated client (needed for non-public datasets):
# client = Socrata(data.cdc.gov,
# MyAppToken,
# username="user@example.com",
# password="AFakePassword")

# First 2000 results, returned as JSON from API / converted to Python list of
# dictionaries by sodapy.
# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(all_results)

return knext.Table.from_pandas(results_df)