Skip to content

Commit

Permalink
1. support add_columns in Dataset; 2. support run().to_df(); 3. add d…
Browse files Browse the repository at this point in the history
…emo in df-newinterface.py (mitdbg#78)

* Support add_columns in Dataset. Support demo in df-newinterface.py

Currently we have to do

records, _ = qr3.run()
outputDf = DataRecord.to_df(records)

I'll try to make qr3.run().to_df() work in another PR.

* ruff check --fix

* Support run().to_df()

Update run() to DataRecordCollection, so that it will be easier for use to support more features for run() output.

We support to_df() in this change.

I'll send out following commits to update other demos.

* run check --fix

* fix typo in DataRecordCollection

* Update records.py

* fix tiny bug in mab processor.

The code will run into issue if we don't return any stats for this function in

```
                            max_quality_record_set = self.pick_highest_quality_output(all_source_record_sets)
                            if (
                                not prev_logical_op_is_filter
                                or (
                                    prev_logical_op_is_filter
                                    and max_quality_record_set.record_op_stats[0].passed_operator
                                )
```

* update record.to_df interface

update to record.to_df(records: list[DataRecord], project_cols: list[str] | None = None) which is consistent with other function in this class.

* Update demo for the new execute() output format

* better way to get plan from output.run()

* fix getting plan from DataRecordCollection.

people used to get plan from execute() of streaming processor, which is not a good practice.

I update plan_str to plan_stats, and they need to get physical plan from processor.

Consider use better ways to provide executed physical plan to  DataRecordCollection, possibly from stats.

* Update df-newinterface.py

* update code based on comments from Matt.

1. add cardinality param in add_columns
2. remove extra testdata files
3. add __iter__ in DataRecordCollection to help iter over streaming output.
  • Loading branch information
chjuncn authored and sivaprasadsudhir committed Jan 28, 2025
1 parent c3cbca9 commit 3275864
Show file tree
Hide file tree
Showing 23 changed files with 277 additions and 111 deletions.
15 changes: 8 additions & 7 deletions demos/askem-var.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ class Variable(Schema):
file_path = "testdata/askem-tiny/"

if run_pz:
# reference, plan, stats = run_workload()
df_input = pd.DataFrame(dict_of_excerpts)
excerpts = Dataset(dataset, schema=Papersnippet)
df_input = pd.DataFrame(list_of_strings)
excerpts = Dataset(df_input, schema=Papersnippet)
output = excerpts.convert(
Variable, desc="A variable used or introduced in the context", cardinality=Cardinality.ONE_TO_MANY
).filter("The value name is 'a'", depends_on="name")
Expand All @@ -64,14 +63,16 @@ class Variable(Schema):
execution_strategy="sequential",
optimizer_strategy="pareto",
)

# Option 1: Use QueryProcessorFactory to create a processor and generate a plan
processor = QueryProcessorFactory.create_processor(excerpts, config)
plan = processor.generate_plan(output, policy)
print(processor.plan)

with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(processor.plan.operators):
for idx, op in enumerate(plan.operators):
strop = f"{idx + 1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)
Expand All @@ -85,7 +86,7 @@ class Variable(Schema):
start_time = time.time()
# for idx, (vars, plan, stats) in enumerate(iterable):
for idx, record in enumerate(input_records):
print(f"idx: {idx}\n vars: {vars}")
print(f"idx: {idx}\n record: {record}")
index = idx
vars = processor.execute_opstream(processor.plan, record)
if idx == len(input_records) - 1:
Expand Down Expand Up @@ -130,8 +131,8 @@ class Variable(Schema):
st.write(" **value:** ", var.value, "\n")

# write variables to a json file with readable format
# with open(f"askem-variables-{dataset}.json", "w") as f:
# json.dump(variables, f, indent=4)
with open(f"askem-variables-{dataset}.json", "w") as f:
json.dump(variables, f, indent=4)
vars_df = pd.DataFrame(variables)

# G = nx.DiGraph()
Expand Down
23 changes: 13 additions & 10 deletions demos/bdf-suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def extract_references(processing_strategy, execution_strategy, optimizer_strate
run_pz = st.button("Run Palimpzest on dataset")

# st.radio("Biofabric Data Integration")
run_pz = False
run_pz = True
dataset = "bdf-usecase3-tiny"

if run_pz:
Expand All @@ -220,24 +220,27 @@ def extract_references(processing_strategy, execution_strategy, optimizer_strate
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

data_record_collection = output.run(config)
references = []
statistics = []

for idx, (reference, plan, stats) in enumerate(iterable):
for idx, record_collection in enumerate(data_record_collection):
record_time = time.time()
stats = record_collection.plan_stats
references = record_collection.data_records
plan = record_collection.executed_plans[0]
statistics.append(stats)

if not idx:
with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(plan.operators):
strop = f"{idx+1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)
for ref in reference:
st.write(" " + str(plan).replace("\n", " \n "))
# for idx, op in enumerate(stats.plan_strs[0].operators):
# strop = f"{idx+1}. {str(op)}"
# strop = strop.replace("\n", " \n")
# st.write(strop)
for ref in references:
try:
index = ref.index
except Exception:
Expand Down
23 changes: 15 additions & 8 deletions demos/bdf-usecase3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality, MinCost
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.query.processor.query_processor_factory import QueryProcessorFactory
from palimpzest.sets import Dataset

if not os.environ.get("OPENAI_API_KEY"):
Expand Down Expand Up @@ -75,11 +76,15 @@ def run_workload():

tables = []
statistics = []
for table, plan, stats in iterable: # noqa: B007
for data_record_collection in iterable: # noqa: B007
# record_time = time.time()
table = data_record_collection.data_records
stats = data_record_collection.plan_stats
tables += table
statistics.append(stats)

processor = QueryProcessorFactory.create_processor(output, config)
plan = processor.generate_plan(output, policy)
return tables, plan, stats


Expand Down Expand Up @@ -115,24 +120,26 @@ def run_workload():
execution_strategy="sequential",
optimizer_strategy="pareto",
)
processor = QueryProcessorFactory.create_processor(output, config)
plan =processor.generate_plan(output, policy)
iterable = output.run(config)

references = []
statistics = []

for idx, (reference, plan, stats) in enumerate(iterable):
for idx, data_record_collection in enumerate(iterable):
record_time = time.time()
references = data_record_collection.data_records
stats = data_record_collection.plan_stats
plan = data_record_collection.executed_plans[0]
statistics.append(stats)

if not idx:
with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(plan.operators):
strop = f"{idx+1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)
for ref in reference:
st.write(" " + str(plan).replace("\n", " \n "))

for ref in references:
try:
index = ref.index
except Exception:
Expand Down
44 changes: 22 additions & 22 deletions demos/biofabric-demo-matching.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@
" nocache=True,\n",
" processing_strategy=\"no_sentinel\",\n",
")\n",
"records, plan, stats = output.run(config)\n",
"data_record_collection = output.run(config)\n",
"\n",
"print_tables(records)"
"print_tables(data_record_collection.data_records)"
]
},
{
Expand Down Expand Up @@ -229,9 +229,9 @@
" nocache=True,\n",
" processing_strategy=\"no_sentinel\",\n",
")\n",
"tables, plan, stats = patient_tables.run(config)\n",
"data_record_collection = patient_tables.run(config)\n",
"\n",
"for table in tables:\n",
"for table in data_record_collection.data_records:\n",
" header = table.header\n",
" subset_rows = table.rows[:3]\n",
"\n",
Expand All @@ -241,7 +241,7 @@
" print(\" | \".join(row)[:100], \"...\")\n",
" print()\n",
"\n",
"print(stats)"
"print(data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -287,9 +287,9 @@
" processing_strategy=\"no_sentinel\",\n",
" execution_strategy=\"pipelined_parallel\",\n",
")\n",
"tables, plan, stats = patient_tables.run(config)\n",
"data_record_collection = patient_tables.run(config)\n",
"\n",
"for table in tables:\n",
"for table in data_record_collection.data_records:\n",
" header = table.header\n",
" subset_rows = table.rows[:3]\n",
"\n",
Expand All @@ -299,7 +299,7 @@
" print(\" | \".join(row)[:100], \"...\")\n",
" print()\n",
"\n",
"print(stats)"
"print(data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -330,8 +330,8 @@
],
"source": [
"print(\"Chosen plan:\")\n",
"print(plan, \"\\n\")\n",
"print(\"Stats:\", stats)"
"print(data_record_collection.executed_plans, \"\\n\")\n",
"print(\"Stats:\", data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -606,10 +606,10 @@
" processing_strategy=\"no_sentinel\",\n",
" execution_strategy=\"pipelined_parallel\",\n",
")\n",
"matched_tables, plan, stats = case_data.run(config) \n",
"data_record_collection = case_data.run(config) \n",
"\n",
"output_rows = []\n",
"for output_table in matched_tables:\n",
"for output_table in data_record_collection.data_records:\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
Expand Down Expand Up @@ -650,8 +650,8 @@
}
],
"source": [
"print(plan, \"\\n\")\n",
"print(\"Stats:\", stats)"
"print(data_record_collection.executed_plans, \"\\n\")\n",
"print(\"Stats:\", data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -903,10 +903,10 @@
" processing_strategy=\"no_sentinel\",\n",
" execution_strategy=\"pipelined_parallel\",\n",
")\n",
"matched_tables, plan, stats = case_data.run(config)\n",
"data_record_collection = case_data.run(config)\n",
"\n",
"output_rows = []\n",
"for output_table in matched_tables:\n",
"for output_table in data_record_collection.data_records:\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
Expand Down Expand Up @@ -946,8 +946,8 @@
}
],
"source": [
"print(plan, \"\\n\")\n",
"print(\"Stats:\", \"\")"
"print(data_record_collection.executed_plans, \"\\n\")\n",
"print(\"Stats:\", data_record_collection.execution_stats)"
]
},
{
Expand Down Expand Up @@ -1128,8 +1128,8 @@
"iterable = case_data.run(config)\n",
"\n",
"output_rows = []\n",
"for matched_tables, plan, stats in iterable: # noqa: B007\n",
" for output_table in matched_tables:\n",
"for data_record_collection in iterable: # noqa: B007\n",
" for output_table in data_record_collection.data_records:\n",
" print(output_table.to_dict().keys())\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
Expand Down Expand Up @@ -1619,8 +1619,8 @@
"iterable = case_data.run(config)\n",
"\n",
"output_rows = []\n",
"for matched_tables, plan, stats in iterable: # noqa: B007\n",
" for output_table in matched_tables:\n",
"for data_record_collection in iterable: # noqa: B007\n",
" for output_table in data_record_collection.data_records:\n",
" print(output_table.to_dict().keys())\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
Expand Down
8 changes: 4 additions & 4 deletions demos/biofabric-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ def print_table(output):
processing_strategy="no_sentinel",
execution_strategy=executor,
)
tables, plan, stats = output.run(config)
data_record_collection = output.run(config)

print_table(tables)
print(plan)
print(stats)
print_table(data_record_collection.data_records)
print(data_record_collection.executed_plans)
# print(data_record_collection.execution_stats)

end_time = time.time()
print("Elapsed time:", end_time - start_time)
6 changes: 3 additions & 3 deletions demos/demo_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ def execute_task(task, datasetid, policy, verbose=False, profile=False, processi
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
records, execution_stats = root_set.run(config)
data_record_collection = root_set.run(config)

if profile:
os.makedirs("profiling-data", exist_ok=True)
with open(stat_path, "w") as f:
json.dump(execution_stats.to_json(), f)
json.dump(data_record_collection.execution_stats.to_json(), f)

return records, execution_stats, cols
return data_record_collection.data_records, data_record_collection.execution_stats, cols

def format_results_table(records: list[DataRecord], cols=None):
"""Format records as a table"""
Expand Down
23 changes: 23 additions & 0 deletions demos/df-newinterface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pandas as pd

import palimpzest as pz
from palimpzest.query.processor.config import QueryProcessorConfig

df = pd.read_csv("testdata/enron-tiny.csv")
qr2 = pz.Dataset(df)
qr2 = qr2.add_columns({"sender": ("The email address of the sender", "string"),
"subject": ("The subject of the email", "string"),#
"date": ("The date the email was sent", "string")})
qr3 = qr2.filter("It is an email").filter("It has Vacation in the subject")

config = QueryProcessorConfig(
verbose=True,
execution_strategy="pipelined_parallel",
)

output = qr3.run(config)
output_df = output.to_df()
print(output_df)

output_df = output.to_df(project_cols=["sender", "subject", "date"])
print(output_df)
11 changes: 6 additions & 5 deletions demos/image-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ def build_image_plan(dataset_id):
if __name__ == "__main__":
# parse arguments
start_time = time.time()

parser = argparse.ArgumentParser(description="Run a simple demo")
parser.add_argument("--no-cache", action="store_true", help="Do not use cached results")
parser.add_argument("--no-cache", action="store_true", help="Do not use cached results", default=True)

args = parser.parse_args()
no_cache = args.no_cache
Expand All @@ -57,11 +58,11 @@ def build_image_plan(dataset_id):
verbose=True,
processing_strategy="no_sentinel"
)
records, execution_stats = plan.run(config)
data_record_collection = plan.run(config)

print("Obtained records", records)
print("Obtained records", data_record_collection.data_records)
imgs, breeds = [], []
for record in records:
for record in data_record_collection.data_records:
print("Trying to open ", record.filename)
path = os.path.join("testdata/images-tiny/", record.filename)
img = Image.open(path).resize((128, 128))
Expand All @@ -78,7 +79,7 @@ def build_image_plan(dataset_id):
with gr.Column():
breed_blocks.append(gr.Textbox(value=breed))

plan_str = list(execution_stats.plan_strs.values())[0]
plan_str = list(data_record_collection.execution_stats.plan_strs.values())[0]
gr.Textbox(value=plan_str, info="Query Plan")

end_time = time.time()
Expand Down
Loading

0 comments on commit 3275864

Please sign in to comment.