-
Notifications
You must be signed in to change notification settings - Fork 2
/
prefect-pipeline.py
110 lines (90 loc) · 3.09 KB
/
prefect-pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#
# This is a toy example of a prefect.io pipeline for
# processsing SSPs using
#
import tempfile
from pathlib import Path
import click
from prefect import Flow
from prefect import Parameter
from prefect import task
from prefect.tasks.shell import ShellTask
from prefect.utilities.edges import unmapped
@task
def discover_ssps(src_dir):
ssps = Path(src_dir).glob("*.jsonl")
return list(ssps)
@task
def match_ssp(ssp, component_spec):
command = "python ssp.py --reader json-l match --components {} {}".format(
component_spec, ssp
)
output = ShellTask(command=command, return_all=True).run()
return "\n".join(output)
@task
def combine(components):
paths = []
for component in components:
component_file = tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".json"
)
component_file.write(component)
paths.append(component_file.name)
component_file.close()
paths_arg = " ".join(paths)
command = "python ssp.py combine {}".format(paths_arg)
return "\n".join(ShellTask(command=command, return_all=True).run())
@task
def oscalize(combined, title):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json") as combined_file:
combined_file.write(combined)
command = "python oscalize.py --title {} {}".format(title, combined_file.name)
return "\n".join(ShellTask(command=command, return_all=True).run())
@task
def markdown(combined, dest_dir):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json") as combined_file:
combined_file.write(combined)
command = "python component_report.py {} {}".format(
combined_file.name, Path(dest_dir) / "markdown"
)
return "\n".join(ShellTask(command=command, return_all=True).run())
@task
def write_file(s, path):
with Path(path).open("w") as f:
f.write(s)
@click.command()
@click.option("--title", default="Components")
@click.argument(
"component_spec_file",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.argument(
"src_dir",
type=click.Path(exists=True, dir_okay=True, file_okay=False, readable=True),
required=True,
)
@click.argument(
"dest_dir",
type=click.Path(dir_okay=True, file_okay=False, writable=True),
required=True,
)
def main(component_spec_file, src_dir, dest_dir, title):
"""
Runs a parameterized pipeline that processes all the SSP's in the
SRC_DIR and places output files in the DEST_DIR. The COMPONENT_SPEC_FILE
is supplied to the *match* phase.
"""
with Flow("SSP Pipeline") as flow:
src = Parameter("src")
dest = Parameter("dest")
component_spec = Parameter("component_spec")
ssps = discover_ssps(src)
components = match_ssp.map(ssps, unmapped(component_spec))
combined = combine(components)
oscal = oscalize(combined, title)
write_file(oscal, Path(dest_dir) / "oscal.json")
markdown(combined, dest)
flow.run(component_spec=component_spec_file, src=src_dir, dest=dest_dir)
if __name__ == "__main__":
main()