Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Save cleaned data to tsv to make upstream clean up easier #1126

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Create a list of fields to clean outside of worker
Signed-off-by: Olga Bulat <obulat@gmail.com>
  • Loading branch information
obulat committed Feb 5, 2023
commit c04973776f2ab8311c449da0a5ae18ed0b2488e8
6 changes: 3 additions & 3 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def test_tls_supported(cls, url):
return True


def _clean_data_worker(rows, temp_table, sources_config, table):
def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
log.info("Starting data cleaning worker")
global_field_to_func = sources_config["*"]["fields"]
worker_conn = database_connect()
Expand All @@ -226,7 +226,7 @@ def _clean_data_worker(rows, temp_table, sources_config, table):
}

start_time = time.time()
cleaned_values = {field: [] for field in _get_cleanable_fields(table)}
cleaned_values = {field: [] for field in all_fields}
for row in rows:
# Map fields that need updating to their cleaning functions
source = row["source"]
Expand Down Expand Up @@ -359,7 +359,7 @@ def clean_image_data(table):
end = job_size * n
last_end = end
# Arguments for parallel _clean_data_worker calls
jobs.append((batch[start:end], temp_table, source_config, table))
jobs.append((batch[start:end], temp_table, source_config, _get_cleanable_fields("image")))
pool = multiprocessing.Pool(processes=num_workers)
log.info(f"Starting {len(jobs)} cleaning jobs")
conn.commit()
Expand Down