Skip to content

Commit

Permalink
Merge pull request #29 from fact-project/forSPEonMCs
Browse files Browse the repository at this point in the history
Add a processing to write output directly to a certain folder
  • Loading branch information
maxnoe authored May 23, 2017
2 parents 8b7646c + f752da5 commit 11f2d8f
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 55 deletions.
2 changes: 1 addition & 1 deletion erna/scripts/process_fact_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v
for num, df in df_mapping.groupby("bunch_index"):
df=df.copy()
df["bunch_index"] = num
job = Job(stream_runner.run, [jar, xml, df, num, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem))
job = Job(stream_runner.run, [jar, xml, df, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem))
jobs.append(job)

return jobs
Expand Down
63 changes: 55 additions & 8 deletions erna/scripts/process_fact_mc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from os import path

import erna
from erna import stream_runner
from erna import stream_runner as stream_runner_std
from erna import stream_runner_local_output as stream_runner_local

import gridmap
from gridmap import Job
Expand All @@ -15,15 +16,38 @@
logger = logging.getLogger(__name__)


def make_jobs(jar, xml, data_paths, drs_paths, engine, queue, vmem, num_jobs, walltime):
def make_jobs(jar, xml, data_paths, drs_paths,
engine, queue, vmem, num_jobs, walltime, output_path=None):
jobs = []
# create job objects

data_partitions = np.array_split(data_paths, num_jobs)
drs_partitions = np.array_split(drs_paths, num_jobs)
if output_path:
logger.info("Using stream runner für local output")
else:
logger.debug("Using std stream runner gathering output from all nodes")

for num, (data, drs) in enumerate(zip(data_partitions, drs_partitions)):
df = pd.DataFrame({'data_path':data, 'drs_path':drs})
job = Job(stream_runner.run, [jar, xml, df, num], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem))
jobs.append(job)
df = pd.DataFrame({'data_path': data, 'drs_path': drs})
if output_path:
file_name, _ = path.splitext(path.basename(output_path))
file_name += "_{}.json".format(num)
out_path = path.dirname(output_path)
run = [jar, xml, df, path.join(out_path, file_name)]
stream_runner = stream_runner_local
else:
run = [jar, xml, df]
stream_runner = stream_runner_std

jobs.append(
Job(stream_runner.run,
run,
queue=queue,
walltime=walltime,
engine=engine,
mem_free='{}mb'.format(vmem)
)
)

avg_num_files = np.mean([len(part) for part in data_partitions])
logger.info("Created {} jobs with {} files each.".format(len(jobs), avg_num_files))
Expand All @@ -44,7 +68,14 @@ def make_jobs(jar, xml, data_paths, drs_paths, engine, queue, vmem, num_jobs, w
@click.option("--log_level", type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO')
@click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int)
@click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy.',show_default=True)
def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local):
@click.option('--local_output', default=False,is_flag=True,
help='Flag indicating whether jobs write their output localy'
+ 'to disk without gathering everything in the mother'
+ 'process. In this case the output file only contains a'
+ 'summary oth the processed jobs. The data ouput will be'
+ 'inseparate files',
show_default=True)
def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output):
'''
Script to execute fact-tools on MonteCarlo files. Use the MC_PATH argument to specifiy the folders containing the MC
'''
Expand All @@ -59,7 +90,12 @@ def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_l
logging.captureWarnings(True)
logging.basicConfig(format=('%(asctime)s - %(name)s - %(levelname)s - ' + '%(message)s'), level=level)

if local_output:
name, _ = path.splitext(path.basename(out))
local_output_dir = path.join(path.dirname(out), name)
erna.ensure_output(local_output_dir)
erna.ensure_output(out)

jarpath = path.abspath(jar)
xmlpath = path.abspath(xml)
drspath = erna.mc_drs_file()
Expand Down Expand Up @@ -87,7 +123,18 @@ def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_l
mc_paths_array = np.array(files)
drs_paths_array = np.repeat(np.array(drspath), len(mc_paths_array))

job_list = make_jobs(jarpath, xmlpath, mc_paths_array, drs_paths_array, engine, queue, vmem, num_jobs, walltime)
if local_output:
job_list = make_jobs(
jarpath, xmlpath, mc_paths_array,
drs_paths_array, engine, queue,
vmem, num_jobs, walltime, output_path=local_output_dir
)
else:
job_list = make_jobs(
jarpath, xmlpath, mc_paths_array,
drs_paths_array, engine, queue,
vmem, num_jobs, walltime
)

job_outputs = gridmap.process_jobs(job_list, max_processes=num_jobs, local=local)
erna.collect_output(job_outputs, out)
Expand Down
2 changes: 1 addition & 1 deletion erna/scripts/process_fact_run_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue,
for num, indices in enumerate(split_indices):
df = df_mapping[indices.min(): indices.max()]

job = Job(stream_runner.run, [jar, xml, df, num, db_path],
job = Job(stream_runner.run, [jar, xml, df, db_path],
queue=queue, walltime=walltime, engine=engine,
mem_free='{}mb'.format(vmem))
jobs.append(job)
Expand Down
57 changes: 12 additions & 45 deletions erna/stream_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,60 +5,27 @@
import logging
import tempfile
from erna import ft_json_to_df
from erna.utils import (
assemble_facttools_call,
check_environment_on_node
)

def run(jar, xml, df, num, db_path=None):

def run(jar, xml, input_files_df, db_path=None):
'''
This is what will be executed on the cluster
'''
logger = logging.getLogger(__name__)
logger.info("stream runner has been started.")
# if 'DEFAULT_TEMP_DIR' not in os.environ:
# logger.error("No scratch directory given via environment variable DEFAULT_TEMP_DIR. Aborting")
# return "No default temp dir"
#
# output_directory = os.environ['DEFAULT_TEMP_DIR']
#
# if not os.path.isdir(output_directory):
# logger.warn("Output directory {} does not exist. Trying to create it.".format(output_directory))
# try:
# os.mkdir(output_directory, mode=0o755)
# except OSError: # Python >2.5
# pass
# #check if that worked or not
# if not os.access(output_directory, os.W_OK | os.X_OK) :
# logger.error("Cannot write to directory given DEFAULT_TEMP_DIR {} ".format(output_directory))
# return "Cannot write to dir"


with tempfile.TemporaryDirectory() as output_directory:
name, _ = os.path.splitext(os.path.basename(xml))
input_filename = "input_{}_{}.json".format(name ,num)
output_filename = "output_{}_{}.json".format(name, num)

input_path = os.path.join(output_directory, input_filename)
output_path = os.path.join(output_directory, output_filename)
input_path = os.path.join(output_directory, "input.json")
output_path = os.path.join(output_directory, "output.json")

# logger.info("Writing {} entries to json file {}".format(len(df), filename))
df.to_json(input_path, orient='records', date_format='epoch' )
call = [
'java',
'-XX:MaxHeapSize=1024m',
'-XX:InitialHeapSize=512m',
'-XX:CompressedClassSpaceSize=64m',
'-XX:MaxMetaspaceSize=128m',
'-XX:+UseConcMarkSweepGC',
'-XX:+UseParNewGC',
'-jar',
jar,
xml,
'-Dinput=file:{}'.format(input_path),
'-Doutput=file:{}'.format(output_path),
'-Ddb=file:{}'.format(db_path),
]
input_files_df.to_json(input_path, orient='records', date_format='epoch')
call = assemble_facttools_call(jar, xml, input_path, output_path, db_path)

subprocess.check_call(['which', 'java'])
subprocess.check_call(['free', '-m'])
subprocess.check_call(['java', '-Xmx512m', '-version'])
check_environment_on_node()

logger.info("Calling fact-tools with call: {}".format(call))
try:
Expand All @@ -71,5 +38,5 @@ def run(jar, xml, df, num, db_path=None):
else:
return "fact-tools error"

#try to read nans else return empty frame
# try to read nans else return empty frame
return ft_json_to_df(output_path)
48 changes: 48 additions & 0 deletions erna/stream_runner_local_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import subprocess
import pandas as pd
import os
import json
import logging
import tempfile
from shutil import copyfile
from erna import ft_json_to_df
from erna.utils import (
assemble_facttools_call,
check_environment_on_node
)


def run(jar, xml, input_files_df, output_path, db_path=None):
'''
This is a version of ernas stream runner that will be executed on the cluster,
but writes its results directly to disk without sending them
via zeroMq
'''
logger = logging.getLogger(__name__)
logger.info("stream runner has been started.")

with tempfile.TemporaryDirectory() as output_directory:
input_path = os.path.join(output_directory, "input.json")
tmp_output_path = os.path.join(output_directory, "output.json")

input_files_df.to_json(input_path, orient='records', date_format='epoch')
call = assemble_facttools_call(jar, xml, input_path, tmp_output_path, db_path)

check_environment_on_node()

logger.info("Calling fact-tools with call: {}".format(call))
try:
subprocess.check_call(call)
except subprocess.CalledProcessError as e:
logger.error("Fact tools returned an error:")
logger.error(e)
return "fact-tools error"

if not os.path.exists(tmp_output_path):
logger.error("Not output generated, returning no results")
return "fact-tools generated no output"

copyfile(tmp_output_path, output_path)
input_files_df['output_path'] = output_path

return input_files_df
31 changes: 31 additions & 0 deletions erna/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from sqlalchemy import create_engine
import grp
import pwd
import subprocess
from datetime import date

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,3 +63,33 @@ def night_int_to_date(night):
def date_to_night_int(night):
''' convert a date or datetime instance to the crazy FACT int '''
return 10000 * night.year + 100 * night.month + night.day


def assemble_facttools_call(jar, xml, input_path, output_path, db_path=None):
''' Assemble the call for fact-tools with the given combinations
of jar, xml, input_path and output_path. The db_path is optional
for the case where a db_file is needed
'''
call = [
'java',
'-XX:MaxHeapSize=1024m',
'-XX:InitialHeapSize=512m',
'-XX:CompressedClassSpaceSize=64m',
'-XX:MaxMetaspaceSize=128m',
'-XX:+UseConcMarkSweepGC',
'-XX:+UseParNewGC',
'-jar',
jar,
xml,
'-Dinput=file:{}'.format(input_path),
'-Doutput=file:{}'.format(output_path),
'-Ddb=file:{}'.format(db_path),
]
return call


def check_environment_on_node():
''' Check memory, java executalbe and version'''
subprocess.check_call(['which', 'java'])
subprocess.check_call(['free', '-m'])
subprocess.check_call(['java', '-Xmx512m', '-version'])
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
'process_fact_data = erna.scripts.process_fact_data:main',
'process_fact_data_qsub = erna.scripts.process_fact_data_qsub:main',
'process_fact_mc = erna.scripts.process_fact_mc:main',
'process_fact_mc_local_output = erna.scripts.process_fact_mc_local_output:main',
'fetch_fact_runs = erna.scripts.fetch_fact_runs:main',
'process_fact_run_list = erna.scripts.process_fact_run_list:main',
'read_aux_files_to_sqlite = erna.scripts.read_aux_files_to_sqlite:main',
Expand Down

0 comments on commit 11f2d8f

Please sign in to comment.