Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Create job after the upload is done (#3608)
Browse files Browse the repository at this point in the history
* Create job after the upload is done
- Decoupled the job creation from the JobHelper initialization
- Move the job creation after the all the files have been uploaded

* build fix

* isort
  • Loading branch information
chkeita authored Oct 31, 2023
1 parent d50fd48 commit 902557f
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 71 deletions.
5 changes: 3 additions & 2 deletions src/cli/examples/domato.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ def main() -> None:
]

of.logger.info("Creating generic_crash_report task")
job = helper.create_job()
of.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.generic_crash_report,
helper.setup_relative_blob_name(args.target_exe, args.setup_dir),
containers,
Expand Down Expand Up @@ -202,7 +203,7 @@ def main() -> None:
]

of.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.generic_generator,
helper.setup_relative_blob_name(args.target_exe, args.setup_dir),
containers,
Expand Down
6 changes: 4 additions & 2 deletions src/cli/examples/honggfuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ def main() -> None:
]

of.logger.info("Creating generic_crash_report task")

job = helper.create_job()
of.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.generic_crash_report,
helper.setup_relative_blob_name(args.target_exe, args.setup_dir),
containers,
Expand Down Expand Up @@ -133,7 +135,7 @@ def main() -> None:
]

of.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.generic_supervisor,
helper.setup_relative_blob_name(args.target_exe, args.setup_dir),
containers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def main() -> None:
args.duration,
pool_name=args.pool_name,
target_exe=args.target_exe,
job=job,
)

helper.define_containers(
Expand All @@ -86,8 +85,9 @@ def main() -> None:
]

of.logger.info("Creating generic_analysis task")
job = helper.create_job()
of.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.generic_analysis,
helper.setup_relative_blob_name(args.target_coverage_exe, args.setup_dir),
containers,
Expand All @@ -97,7 +97,7 @@ def main() -> None:
analyzer_options=["{target_exe}", "{output_dir}", "{input}"],
)

print(f"job:{helper.job.json(indent=4)}")
print(f"job:{job.json(indent=4)}")


if __name__ == "__main__":
Expand Down
5 changes: 3 additions & 2 deletions src/cli/examples/llvm-source-coverage/source-coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ def main() -> None:
]

of.logger.info("Creating generic_analysis task")
job = helper.create_job()
of.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.generic_analysis,
helper.setup_relative_blob_name(args.target_coverage_exe, args.setup_dir),
containers,
Expand All @@ -84,7 +85,7 @@ def main() -> None:
analyzer_options=["{target_exe}", "{output_dir}", "{input}"],
)

print(f"job:{helper.job.json(indent=4)}")
print(f"job:{job.json(indent=4)}")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/cli/onefuzz/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def get(self, container: primitives.Container, filename: str) -> bytes:
"""get a file from a container"""
self.logger.debug("getting file from container: %s:%s", container, filename)
client = self._get_client(container)
downloaded = client.download_blob(filename)
downloaded: bytes = client.download_blob(filename)
return downloaded

def download(
Expand Down
41 changes: 20 additions & 21 deletions src/cli/onefuzz/templates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def __init__(
pool_name: Optional[str] = None,
target_exe: File,
platform: Optional[OS] = None,
job: Optional[Job] = None,
):
self.onefuzz = onefuzz
self.logger = logger
Expand All @@ -86,15 +85,16 @@ def __init__(
self.wait_for_stopped: bool = False
self.containers: Dict[ContainerType, ContainerTemplate] = {}
self.tags: Dict[str, str] = {"project": project, "name": name, "build": build}
if job is None:
self.onefuzz.versions.check()
self.logger.info("creating job (runtime: %d hours)", duration)
self.job = self.onefuzz.jobs.create(
self.project, self.name, self.build, duration=duration
)
self.logger.info("created job: %s" % self.job.job_id)
else:
self.job = job
self.duration = duration

def create_job(self) -> Job:
self.onefuzz.versions.check()
self.logger.info("creating job (runtime: %d hours)", self.duration)
job = self.onefuzz.jobs.create(
self.project, self.name, self.build, duration=self.duration
)
self.logger.info("created job: %s" % job)
return job

def add_existing_container(
self, container_type: ContainerType, container: Container
Expand Down Expand Up @@ -259,14 +259,13 @@ def wait_on(
}
self.wait_for_running = wait_for_running

def check_current_job(self) -> Job:
job = self.onefuzz.jobs.get(self.job.job_id)
def check_current_job(self, job: Job) -> Job:
if job.state in JobState.shutting_down():
raise StoppedEarly("job unexpectedly stopped early")

errors = []
for task in self.onefuzz.tasks.list(
job_id=self.job.job_id, state=TaskState.shutting_down()
job_id=job.job_id, state=TaskState.shutting_down()
):
if task.error:
errors.append("%s: %s" % (task.config.task.type, task.error))
Expand All @@ -277,21 +276,21 @@ def check_current_job(self) -> Job:
raise StoppedEarly("tasks stopped unexpectedly.\n%s" % "\n".join(errors))
return job

def get_waiting(self) -> List[str]:
tasks = self.onefuzz.tasks.list(job_id=self.job.job_id)
def get_waiting(self, job: Job) -> List[str]:
tasks = self.onefuzz.tasks.list(job_id=job.job_id)
waiting = [
"%s:%s" % (x.config.task.type.name, x.state.name)
for x in tasks
if x.state not in TaskState.has_started()
]
return waiting

def is_running(self) -> Tuple[bool, str, Any]:
waiting = self.get_waiting()
def is_running(self, job: Job) -> Tuple[bool, str, Any]:
waiting = self.get_waiting(job)
return (not waiting, "waiting on: %s" % ", ".join(sorted(waiting)), None)

def has_files(self) -> Tuple[bool, str, Any]:
self.check_current_job()
def has_files(self, job: Job) -> Tuple[bool, str, Any]:
self.check_current_job(job)

new = {
x: len(self.onefuzz.containers.files.list(x).files)
Expand All @@ -307,8 +306,8 @@ def has_files(self) -> Tuple[bool, str, Any]:
None,
)

def wait(self) -> None:
JobMonitor(self.onefuzz, self.job).wait(
def wait(self, job: Job) -> None:
JobMonitor(self.onefuzz, job).wait(
wait_for_running=self.wait_for_running,
wait_for_files=self.to_monitor,
wait_for_stopped=self.wait_for_stopped,
Expand Down
9 changes: 5 additions & 4 deletions src/cli/onefuzz/templates/afl.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ def basic(
)

self.logger.info("creating afl fuzz task")
job = helper.create_job()
fuzzer_task = self.onefuzz.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.generic_supervisor,
target_exe_blob_name,
containers,
Expand Down Expand Up @@ -190,7 +191,7 @@ def basic(

self.logger.info("creating generic_crash_report task")
self.onefuzz.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.generic_crash_report,
target_exe_blob_name,
report_containers,
Expand All @@ -206,5 +207,5 @@ def basic(
)

self.logger.info("done creating tasks")
helper.wait()
return helper.job
helper.wait(job)
return job
36 changes: 21 additions & 15 deletions src/cli/onefuzz/templates/libfuzzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,9 @@ def basic(
ContainerType.extra_output, extra_output_container
)

job = helper.create_job()
self._create_tasks(
job=helper.job,
job=job,
containers=helper.container_names(),
pool_name=pool_name,
target_exe=target_exe_blob_name,
Expand Down Expand Up @@ -518,8 +519,8 @@ def basic(
)

self.logger.info("done creating tasks")
helper.wait()
return helper.job
helper.wait(job)
return job

def merge(
self,
Expand Down Expand Up @@ -648,8 +649,9 @@ def merge(
merge_containers.append((ContainerType.inputs, existing_container))

self.logger.info("creating libfuzzer_merge task")
job = helper.create_job()
self.onefuzz.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.libfuzzer_merge,
target_exe_blob_name,
merge_containers,
Expand All @@ -670,8 +672,8 @@ def merge(
)

self.logger.info("done creating tasks")
helper.wait()
return helper.job
helper.wait(job)
return job

def dotnet(
self,
Expand Down Expand Up @@ -810,8 +812,10 @@ def dotnet(

helper.wait_on(wait_for_files, wait_for_running)

job = helper.create_job()

fuzzer_task = self.onefuzz.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.libfuzzer_dotnet_fuzz,
target_dll_blob_name, # Not used
fuzzer_containers,
Expand Down Expand Up @@ -870,7 +874,7 @@ def dotnet(

self.logger.info("creating `dotnet_coverage` task")
self.onefuzz.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.dotnet_coverage,
libfuzzer_dotnet_loader_dll,
coverage_containers,
Expand Down Expand Up @@ -911,7 +915,7 @@ def dotnet(

self.logger.info("creating `dotnet_crash_report` task")
self.onefuzz.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.dotnet_crash_report,
libfuzzer_dotnet_loader_dll,
report_containers,
Expand All @@ -932,8 +936,8 @@ def dotnet(
)

self.logger.info("done creating tasks")
helper.wait()
return helper.job
helper.wait(job)
return job

def qemu_user(
self,
Expand Down Expand Up @@ -1121,8 +1125,10 @@ def qemu_user(
libfuzzer_fuzz_target_options += fuzzing_target_options

self.logger.info("creating libfuzzer_fuzz task")

job = helper.create_job()
fuzzer_task = self.onefuzz.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.libfuzzer_fuzz,
wrapper_name,
fuzzer_containers,
Expand Down Expand Up @@ -1164,7 +1170,7 @@ def qemu_user(

self.logger.info("creating libfuzzer_crash_report task")
self.onefuzz.tasks.create(
helper.job.job_id,
job.job_id,
TaskType.libfuzzer_crash_report,
wrapper_name,
report_containers,
Expand All @@ -1187,5 +1193,5 @@ def qemu_user(
)

self.logger.info("done creating tasks")
helper.wait()
return helper.job
helper.wait(job)
return job
10 changes: 4 additions & 6 deletions src/cli/onefuzz/templates/ossfuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ def libfuzzer(
)
base_helper.platform = platform

helpers = []
job = base_helper.create_job()

for fuzzer in [File(x) for x in fuzzers]:
fuzzer_name = fuzzer.replace(".exe", "").replace("_fuzzer", "")
self.logger.info("creating tasks for %s", fuzzer)
Expand All @@ -197,14 +198,12 @@ def libfuzzer(
fuzzer_name,
build,
duration,
job=base_helper.job,
pool_name=pool_name,
target_exe=fuzzer,
)
helper.platform = platform
helper.add_tags(tags)
helper.platform = base_helper.platform
helper.job = base_helper.job
helper.define_containers(
ContainerType.setup,
ContainerType.inputs,
Expand Down Expand Up @@ -247,7 +246,7 @@ def libfuzzer(
fuzzer_blob_name = helper.setup_relative_blob_name(fuzzer, None)

self.onefuzz.template.libfuzzer._create_tasks(
job=base_helper.job,
job=job,
containers=helper.container_names(),
pool_name=pool_name,
target_exe=fuzzer_blob_name,
Expand All @@ -260,5 +259,4 @@ def libfuzzer(
ensemble_sync_delay=ensemble_sync_delay,
min_available_memory_mb=min_available_memory_mb,
)
helpers.append(helper)
base_helper.wait()
base_helper.wait(job)
Loading

0 comments on commit 902557f

Please sign in to comment.