Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1. support add_columns in Dataset; 2. support run().to_df(); 3. add demo in df-newinterface.py #78

Merged
merged 13 commits into from
Jan 28, 2025
15 changes: 8 additions & 7 deletions demos/askem-var.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ class Variable(Schema):
file_path = "testdata/askem-tiny/"

if run_pz:
# reference, plan, stats = run_workload()
df_input = pd.DataFrame(dict_of_excerpts)
excerpts = Dataset(dataset, schema=Papersnippet)
df_input = pd.DataFrame(list_of_strings)
excerpts = Dataset(df_input, schema=Papersnippet)
output = excerpts.convert(
Variable, desc="A variable used or introduced in the context", cardinality=Cardinality.ONE_TO_MANY
).filter("The value name is 'a'", depends_on="name")
Expand All @@ -64,14 +63,16 @@ class Variable(Schema):
execution_strategy="sequential",
optimizer_strategy="pareto",
)

# Option 1: Use QueryProcessorFactory to create a processor and generate a plan
processor = QueryProcessorFactory.create_processor(excerpts, config)
plan = processor.generate_plan(output, policy)
print(processor.plan)

with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(processor.plan.operators):
for idx, op in enumerate(plan.operators):
strop = f"{idx + 1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)
Expand All @@ -85,7 +86,7 @@ class Variable(Schema):
start_time = time.time()
# for idx, (vars, plan, stats) in enumerate(iterable):
for idx, record in enumerate(input_records):
print(f"idx: {idx}\n vars: {vars}")
print(f"idx: {idx}\n record: {record}")
index = idx
vars = processor.execute_opstream(processor.plan, record)
if idx == len(input_records) - 1:
Expand Down Expand Up @@ -130,8 +131,8 @@ class Variable(Schema):
st.write(" **value:** ", var.value, "\n")

# write variables to a json file with readable format
# with open(f"askem-variables-{dataset}.json", "w") as f:
# json.dump(variables, f, indent=4)
with open(f"askem-variables-{dataset}.json", "w") as f:
json.dump(variables, f, indent=4)
vars_df = pd.DataFrame(variables)

# G = nx.DiGraph()
Expand Down
23 changes: 13 additions & 10 deletions demos/bdf-suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def extract_references(processing_strategy, execution_strategy, optimizer_strate
run_pz = st.button("Run Palimpzest on dataset")

# st.radio("Biofabric Data Integration")
run_pz = False
run_pz = True
dataset = "bdf-usecase3-tiny"

if run_pz:
Expand All @@ -220,24 +220,27 @@ def extract_references(processing_strategy, execution_strategy, optimizer_strate
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

data_record_collection = output.run(config)
references = []
statistics = []

for idx, (reference, plan, stats) in enumerate(iterable):
for idx, record_collection in enumerate(data_record_collection):
chjuncn marked this conversation as resolved.
Show resolved Hide resolved
record_time = time.time()
stats = record_collection.plan_stats
references = record_collection.data_records
plan = record_collection.executed_plans[0]
statistics.append(stats)

if not idx:
with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(plan.operators):
strop = f"{idx+1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)
for ref in reference:
st.write(" " + str(plan).replace("\n", " \n "))
# for idx, op in enumerate(stats.plan_strs[0].operators):
# strop = f"{idx+1}. {str(op)}"
# strop = strop.replace("\n", " \n")
# st.write(strop)
for ref in references:
try:
index = ref.index
except Exception:
Expand Down
23 changes: 15 additions & 8 deletions demos/bdf-usecase3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality, MinCost
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.query.processor.query_processor_factory import QueryProcessorFactory
from palimpzest.sets import Dataset

if not os.environ.get("OPENAI_API_KEY"):
Expand Down Expand Up @@ -75,11 +76,15 @@ def run_workload():

tables = []
statistics = []
for table, plan, stats in iterable: # noqa: B007
for data_record_collection in iterable: # noqa: B007
chjuncn marked this conversation as resolved.
Show resolved Hide resolved
# record_time = time.time()
table = data_record_collection.data_records
stats = data_record_collection.plan_stats
tables += table
statistics.append(stats)

processor = QueryProcessorFactory.create_processor(output, config)
plan = processor.generate_plan(output, policy)
return tables, plan, stats


Expand Down Expand Up @@ -115,24 +120,26 @@ def run_workload():
execution_strategy="sequential",
optimizer_strategy="pareto",
)
processor = QueryProcessorFactory.create_processor(output, config)
plan =processor.generate_plan(output, policy)
iterable = output.run(config)

references = []
statistics = []

for idx, (reference, plan, stats) in enumerate(iterable):
for idx, data_record_collection in enumerate(iterable):
record_time = time.time()
references = data_record_collection.data_records
stats = data_record_collection.plan_stats
plan = data_record_collection.executed_plans[0]
statistics.append(stats)

if not idx:
with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(plan.operators):
strop = f"{idx+1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)
for ref in reference:
st.write(" " + str(plan).replace("\n", " \n "))

for ref in references:
try:
index = ref.index
except Exception:
Expand Down
44 changes: 22 additions & 22 deletions demos/biofabric-demo-matching.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@
" nocache=True,\n",
" processing_strategy=\"no_sentinel\",\n",
")\n",
"records, plan, stats = output.run(config)\n",
"data_record_collection = output.run(config)\n",
"\n",
"print_tables(records)"
"print_tables(data_record_collection.data_records)"
]
},
{
Expand Down Expand Up @@ -229,9 +229,9 @@
" nocache=True,\n",
" processing_strategy=\"no_sentinel\",\n",
")\n",
"tables, plan, stats = patient_tables.run(config)\n",
"data_record_collection = patient_tables.run(config)\n",
"\n",
"for table in tables:\n",
"for table in data_record_collection.data_records:\n",
" header = table.header\n",
" subset_rows = table.rows[:3]\n",
"\n",
Expand All @@ -241,7 +241,7 @@
" print(\" | \".join(row)[:100], \"...\")\n",
" print()\n",
"\n",
"print(stats)"
"print(data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -287,9 +287,9 @@
" processing_strategy=\"no_sentinel\",\n",
" execution_strategy=\"pipelined_parallel\",\n",
")\n",
"tables, plan, stats = patient_tables.run(config)\n",
"data_record_collection = patient_tables.run(config)\n",
"\n",
"for table in tables:\n",
"for table in data_record_collection.data_records:\n",
" header = table.header\n",
" subset_rows = table.rows[:3]\n",
"\n",
Expand All @@ -299,7 +299,7 @@
" print(\" | \".join(row)[:100], \"...\")\n",
" print()\n",
"\n",
"print(stats)"
"print(data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -330,8 +330,8 @@
],
"source": [
"print(\"Chosen plan:\")\n",
"print(plan, \"\\n\")\n",
"print(\"Stats:\", stats)"
"print(data_record_collection.executed_plans, \"\\n\")\n",
"print(\"Stats:\", data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -606,10 +606,10 @@
" processing_strategy=\"no_sentinel\",\n",
" execution_strategy=\"pipelined_parallel\",\n",
")\n",
"matched_tables, plan, stats = case_data.run(config) \n",
"data_record_collection = case_data.run(config) \n",
"\n",
"output_rows = []\n",
"for output_table in matched_tables:\n",
"for output_table in data_record_collection.data_records:\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
Expand Down Expand Up @@ -650,8 +650,8 @@
}
],
"source": [
"print(plan, \"\\n\")\n",
"print(\"Stats:\", stats)"
"print(data_record_collection.executed_plans, \"\\n\")\n",
"print(\"Stats:\", data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -903,10 +903,10 @@
" processing_strategy=\"no_sentinel\",\n",
" execution_strategy=\"pipelined_parallel\",\n",
")\n",
"matched_tables, plan, stats = case_data.run(config)\n",
"data_record_collection = case_data.run(config)\n",
"\n",
"output_rows = []\n",
"for output_table in matched_tables:\n",
"for output_table in data_record_collection.data_records:\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
Expand Down Expand Up @@ -946,8 +946,8 @@
}
],
"source": [
"print(plan, \"\\n\")\n",
"print(\"Stats:\", \"\")"
"print(data_record_collection.executed_plans, \"\\n\")\n",
"print(\"Stats:\", data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -1128,8 +1128,8 @@
"iterable = case_data.run(config)\n",
"\n",
"output_rows = []\n",
"for matched_tables, plan, stats in iterable: # noqa: B007\n",
" for output_table in matched_tables:\n",
"for data_record_collection in iterable: # noqa: B007\n",
" for output_table in data_record_collection.data_records:\n",
" print(output_table.to_dict().keys())\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
Expand Down Expand Up @@ -1619,8 +1619,8 @@
"iterable = case_data.run(config)\n",
"\n",
"output_rows = []\n",
"for matched_tables, plan, stats in iterable: # noqa: B007\n",
" for output_table in matched_tables:\n",
"for data_record_collection in iterable: # noqa: B007\n",
" for output_table in data_record_collection.data_records:\n",
" print(output_table.to_dict().keys())\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
Expand Down
8 changes: 4 additions & 4 deletions demos/biofabric-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ def print_table(output):
processing_strategy="no_sentinel",
execution_strategy=executor,
)
tables, plan, stats = output.run(config)
data_record_collection = output.run(config)

print_table(tables)
print(plan)
print(stats)
print_table(data_record_collection.data_records)
print(data_record_collection.executed_plans)
# print(data_record_collection.execution_stats)

end_time = time.time()
print("Elapsed time:", end_time - start_time)
6 changes: 3 additions & 3 deletions demos/demo_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ def execute_task(task, datasetid, policy, verbose=False, profile=False, processi
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
records, execution_stats = root_set.run(config)
data_record_collection = root_set.run(config)

if profile:
os.makedirs("profiling-data", exist_ok=True)
with open(stat_path, "w") as f:
json.dump(execution_stats.to_json(), f)
json.dump(data_record_collection.execution_stats.to_json(), f)

return records, execution_stats, cols
return data_record_collection.data_records, data_record_collection.execution_stats, cols

def format_results_table(records: list[DataRecord], cols=None):
"""Format records as a table"""
Expand Down
23 changes: 23 additions & 0 deletions demos/df-newinterface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pandas as pd

import palimpzest as pz
from palimpzest.query.processor.config import QueryProcessorConfig

df = pd.read_csv("testdata/enron-tiny.csv")
qr2 = pz.Dataset(df)
qr2 = qr2.add_columns({"sender": ("The email address of the sender", "string"),
"subject": ("The subject of the email", "string"),#
"date": ("The date the email was sent", "string")})
qr3 = qr2.filter("It is an email").filter("It has Vacation in the subject")

config = QueryProcessorConfig(
verbose=True,
execution_strategy="pipelined_parallel",
)

output = qr3.run(config)
output_df = output.to_df()
print(output_df)

output_df = output.to_df(project_cols=["sender", "subject", "date"])
print(output_df)
6 changes: 3 additions & 3 deletions demos/fever-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def get_item(self, idx: int, val: bool=False, include_label: bool=False):
verbose=verbose,
allow_code_synth=allow_code_synth
)
records, execution_stats = output.run(config)
data_record_collection = output.run(config)

# create filepaths for records and stats
records_path = (
Expand All @@ -306,7 +306,7 @@ def get_item(self, idx: int, val: bool=False, include_label: bool=False):
)

record_jsons = []
for record in records:
for record in data_record_collection.data_records:
chjuncn marked this conversation as resolved.
Show resolved Hide resolved
record_dict = record.to_dict()
### field_to_keep = ["claim", "id", "label"]
### record_dict = {k: v for k, v in record_dict.items() if k in fields_to_keep}
Expand All @@ -316,6 +316,6 @@ def get_item(self, idx: int, val: bool=False, include_label: bool=False):
json.dump(record_jsons, f)

# save statistics
execution_stats_dict = execution_stats.to_json()
execution_stats_dict = data_record_collection.execution_stats.to_json()
with open(stats_path, "w") as f:
json.dump(execution_stats_dict, f)
Loading