diff --git a/README.md b/README.md index de79472..845b56c 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,7 @@ takes one mandatory parameter: the language the text is in. ```python from luiginlp.engine import Task +from luiginlp.util import replaceextension from luigi import Parameter class Ucto_txt2tok(Task): @@ -352,7 +353,7 @@ class Ucto(StandardWorkflowComponent): return ucto ``` -Dynamic workflows aka Inception +Dynamic dependencies aka Inception --------------------------------- Workflows are static in the sense that based on the format of the input file @@ -365,13 +366,97 @@ At times, however, more dynamic workflows are needed. In such cases, the common theme is that input data has to be inspected and decisions made accordingly. The **only** stage at which input files can be inspected is in a task's ``run()`` method. Fortunately, there are facilities here to implement more -dynamic workflows, a task's ``run()`` method is allowd to **yield** (in the -Python sense) a list of other tasks that it depends on. +dynamic dependencies, a task's ``run()`` method is allowed to **yield** (in the +Python sense of the word) a list of other tasks that it depends on. A good example would be if we create a new tokenisation component that does not -just take an input file, but takes a directory containing input files. +just take an input file, but takes a **directory** containing input files and +produces a directory of output files. The proper way to implement this is to +reuse the component that performs on the individual files (i.e. our ``Ucto`` +component). Consider the following task and component: + +```python +import glob +from luiginlp.engine import Task, StandardWorkflowComponent +from luiginlp.util import replaceextension +from luigi import Parameter + +class Ucto_txtdir2tokdir(Task): + language = Parameter() + + in_txtdir = None + + def out_tokdir(self): + return TargetInfo(self, replaceextension(self.in_txtdir().path, '.txtdir','.tokdir')) + + def run(self): + #setup the output directory + # this creates the directory and also moved it out of the way again when failures occur in this task + self.setup_output_dir(self.out_tokdir().path) + + #gather input files + inputfiles = [ filename for filename in glob.glob(self.in_txtdir().path + '/*.txt' ] + + #inception aka dynamic dependencies: we yield a list of components which could not have been predicted statically + #in this case we run the Ucto component for each input file in the directory + yield [ Ucto(inputfile=inputfile,outputdir=self.out_tokdir().path,language=self.language) for inputfile in inputfiles ] + + +class Ucto_collection(StandardWorkflowComponent): + def accepts(self): + return ( + InputFormat(self, format_id='txtdir',extension='txtdir', directory=True), + ) + + def autosetup(self): + return Ucto_txtdir2tokdir + +``` + +The magic happens in the task's ``run()`` method, as that it the only stage +where we can examine the contents of any input files, in this case: the contents +of the input directory. First we set up the output directory with a call to +``self.setup_output_dir()``. This creates the directory if it doesn't exist +yet, but also makes sure the directory is stashed away when the task fails, +ensuring you can always rerun the pipeline if happens to break off. (in +technical terms, this preserves idempotency). + +Mext, we construct a list of all the txt files in the directory. We use this +list to yield a list of components to run, one component for each input file. +Now, when the task's ``run()`` method is called, a series of components will be +scheduled and run **in parallel** (up to the number of workers). + +Note that we added an ``outputdir`` parameter to the Ucto component which we +hadn't implemented yet. This is necessary to ensure all individual output files +end up in the directory that groups our output. The Ucto component should +simply pass this parameter on to the ``Ucto_txt2tok`` task, and there we +implement it as an optional parameter as follows: + +```python +class Ucto_txt2tok(Task): + + outputdir = Parameter(default="") + + #Define an output slot, output slots are methods that start with out_ + def out_tok(self): + if self.outputdir: + return TargetInfo(self, os.path.join(outputdir, os.path.basename(replaceextension(self.in_txt().path, '.txt','.tok')))) + else: + return TargetInfo(self, replaceextension(self.in_txt().path, '.txt','.tok')) +``` + +Assuming you have a collecting of text files in a directory ``corpus.txtdir/``, +you can now invoke LuigiNLP as follows and end up with a ``corpus.tokdir/`` +directory with tokenised output texts: + + $ luiginlp Ucto_collection --module mymodule --inputfile corpus.txtdir --language en --workers 4 + +Note the ``--workers`` parameter, which is the generic way to tell LuigiNLP how +many workers may run in parallel. You will want to explicitly set this to a +value that approximates the number of free CPU cores as the default value is +one (no parallellisation). + -(...TODO... finish this section) Plans/TODO -------------