Skip to content

Commit

Permalink
CLI Show Workflow Namespaces Option (#200)
Browse files Browse the repository at this point in the history
* add get namespaces option

* use tab writer to aligned table

* add print namespace obj fn

* add unnest long and column swap fns

Signed-off-by: Matthew F Leader <[email protected]>
---------
  • Loading branch information
mfleader authored Jul 23, 2024
1 parent c284ae0 commit 43c7889
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 13 deletions.
35 changes: 22 additions & 13 deletions cmd/arcaflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"

"go.arcalot.io/log/v2"
"go.flow.arcalot.io/engine"
"go.flow.arcalot.io/engine/config"
"go.flow.arcalot.io/engine/loadfile"
"go.flow.arcalot.io/engine/workflow"
"gopkg.in/yaml.v3"
"os"
"os/signal"
"path/filepath"
)

// These variables are filled using ldflags during the build process with Goreleaser.
Expand Down Expand Up @@ -58,19 +58,22 @@ func main() {
dir := "."
workflowFile := "workflow.yaml"
printVersion := false
getNamespaces := false

const (
versionUsage = "Print Arcaflow Engine version and exit."
configUsage = "The path to the Arcaflow configuration file to load, if any."
inputUsage = "The path to the workflow input file to load, if any."
contextUsage = "The path to the workflow directory to run from."
workflowUsage = "The path to the workflow file to load."
versionUsage = "Print Arcaflow Engine version and exit."
configUsage = "The path to the Arcaflow configuration file to load, if any."
inputUsage = "The path to the workflow input file to load, if any."
contextUsage = "The path to the workflow directory to run from."
workflowUsage = "The path to the workflow file to load."
getNamespacesUsage = "Show the namespaces available to this workflow."
)
flag.BoolVar(&printVersion, "version", printVersion, versionUsage)
flag.StringVar(&configFile, "config", configFile, configUsage)
flag.StringVar(&input, "input", input, inputUsage)
flag.StringVar(&dir, "context", dir, contextUsage)
flag.StringVar(&workflowFile, "workflow", workflowFile, workflowUsage)
flag.BoolVar(&getNamespaces, "get-namespaces", getNamespaces, getNamespacesUsage)

flag.Usage = func() {
w := flag.CommandLine.Output()
Expand Down Expand Up @@ -183,10 +186,10 @@ func main() {
}
}

os.Exit(runWorkflow(flow, fileCtx, RequiredFileKeyWorkflow, logger, inputData))
os.Exit(runWorkflow(flow, fileCtx, RequiredFileKeyWorkflow, logger, inputData, getNamespaces))
}

func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflowFile string, logger log.Logger, inputData []byte) int {
func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflowFile string, logger log.Logger, inputData []byte, getNamespaces bool) int {
ctx, cancel := context.WithCancel(context.Background())
ctrlC := make(chan os.Signal, 4) // We expect up to three ctrl-C inputs. Plus one extra to buffer in case.
signal.Notify(ctrlC, os.Interrupt)
Expand All @@ -197,13 +200,18 @@ func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflo
cancel()
}()

workflow, err := flow.Parse(fileCtx, workflowFile)
workFlowObj, err := flow.Parse(fileCtx, workflowFile)
if err != nil {
logger.Errorf("Invalid workflow (%v)", err)
return ExitCodeInvalidData
}

outputID, outputData, outputError, err := workflow.Run(ctx, inputData)
if getNamespaces {
workflow.PrintObjectNamespaceTable(os.Stdout, workFlowObj.Namespaces(), logger)
return ExitCodeOK
}

outputID, outputData, outputError, err := workFlowObj.Run(ctx, inputData)
if err != nil {
logger.Errorf("Workflow execution failed (%v)", err)
return ExitCodeWorkflowFailed
Expand All @@ -222,6 +230,7 @@ func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflo
if outputError {
return ExitCodeWorkflowErrorOutput
}

return ExitCodeOK
}

Expand Down
5 changes: 5 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Workflow interface {
InputSchema() schema.Scope
// Outputs returns the list of possible outputs and their schema for the workflow.
Outputs() map[string]schema.StepOutput
Namespaces() map[string]map[string]*schema.ObjectSchema
}

type workflowEngine struct {
Expand Down Expand Up @@ -234,3 +235,7 @@ func (e engineWorkflow) Outputs() map[string]schema.StepOutput {
}
return outputs
}

func (e engineWorkflow) Namespaces() map[string]map[string]*schema.ObjectSchema {
return e.workflow.Namespaces()
}
53 changes: 53 additions & 0 deletions internal/tablefmt/tablefmt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Package tablefmt provides functions to create tabular data where
// 1. Each variable is a column; each column is a variable.
// 2. Each observation is a row; each row is an observation.
// 3. Each value is a cell; each cell is a single value.
//
// Its behavior is inspired by the R package tidyr.
// https://tidyr.tidyverse.org/index.html
package tablefmt

import "sort"

// UnnestLongerSorted turns each element of a list-group
// into a row. Each key in the map represents a group and
// each group is associated with a list of values.
func UnnestLongerSorted(twoColDf map[string][]string) [][]string {
df := [][]string{}
groupNames := []string{}
for name := range twoColDf {
groupNames = append(groupNames, name)
}
sort.Strings(groupNames)
for _, name := range groupNames {
groupRows := twoColDf[name]
sort.Strings(groupRows)
for _, rowValue := range groupRows {
df = append(df, []string{name, rowValue})
}
}
return df
}

// SwapColumns swaps the row values between the first and second column, if
// the row has a length of two.
func SwapColumns(df [][]string) [][]string {
for k := range df {
if len(df[k]) == 2 {
df[k][0], df[k][1] = df[k][1], df[k][0]
}
}
return df
}

// ExtractGroupedLists transforms a map of maps into a map of strings. The keys
// in the nested map become a list of values.
func ExtractGroupedLists[T any](data map[string]map[string]T) map[string][]string {
groupLists := map[string][]string{}
for namespace, objects := range data {
for objName := range objects {
groupLists[namespace] = append(groupLists[namespace], objName)
}
}
return groupLists
}
122 changes: 122 additions & 0 deletions internal/tablefmt/tablefmt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package tablefmt_test

import (
"go.arcalot.io/assert"
"go.flow.arcalot.io/engine/internal/tablefmt"
"sort"
"testing"
)

func TestUnnestLongerSortedHappy(t *testing.T) {
astromech := []string{"q7", "bb", "r2", "r4"}
protocol := []string{"3po", "000", "chatty"}
battle := []string{"b1", "ig"}
probe := []string{"cobra"}
astromechGroup := "astromech"
protocolGroup := "protocol"
battleGroup := "battle"
probeGroup := "probe"
astromechSorted := make([]string, len(astromech))
protocolSorted := make([]string, len(protocol))
battleSorted := make([]string, len(battle))
probeSorted := make([]string, len(probe))
sort.Strings(astromechSorted)
sort.Strings(battleSorted)
sort.Strings(probeSorted)
sort.Strings(protocolSorted)
expOut := [][]string{
{"astromech", "bb"},
{"astromech", "q7"},
{"astromech", "r2"},
{"astromech", "r4"},
{"battle", "b1"},
{"battle", "ig"},
{"probe", "cobra"},
{"protocol", "000"},
{"protocol", "3po"},
{"protocol", "chatty"},
}
input := map[string][]string{
protocolGroup: protocol,
astromechGroup: astromech,
battleGroup: battle,
probeGroup: probe,
}
assert.Equals(t, tablefmt.UnnestLongerSorted(input), expOut)
}

func TestUnnestLongerSortedEmpty(t *testing.T) {
input := map[string][]string{}
assert.Equals(t, tablefmt.UnnestLongerSorted(input), [][]string{})
}

func TestUnnestLongerSortedEmptyGroup(t *testing.T) {
input := map[string][]string{
"G": {"g"},
"D": {},
}
expOut := [][]string{
{"G", "g"},
}
assert.Equals(t, tablefmt.UnnestLongerSorted(input), expOut)
}

func TestSwapColumnsHappy(t *testing.T) {
testData := map[string]struct {
input [][]string
expected [][]string
}{
"happy": {
input: [][]string{{"fruit", "tomato"},
{"fruit", "cucumber"},
{"veggie", "spinach"},
{"veggie", "carrot"},
},
expected: [][]string{
{"tomato", "fruit"},
{"cucumber", "fruit"},
{"spinach", "veggie"},
{"carrot", "veggie"},
},
},
"row_with_zero_len": {
input: [][]string{
{"a", "b"},
{},
{"a", "c"},
},
expected: [][]string{
{"b", "a"},
{},
{"c", "a"},
},
},
"row_with_one_len": {
input: [][]string{
{"a", "b"},
{"z"},
{"a", "c"},
},
expected: [][]string{
{"b", "a"},
{"z"},
{"c", "a"},
},
},
"row_with_three_len": {
input: [][]string{
{"a", "b"},
{"x", "y", "z"},
{"a", "c"},
},
expected: [][]string{
{"b", "a"},
{"x", "y", "z"},
{"c", "a"},
},
},
}
for _, tc := range testData {
assert.Equals(t, tablefmt.SwapColumns(tc.input), tc.expected)
}
}
43 changes: 43 additions & 0 deletions internal/tableprinter/tableprinter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Package tableprinter provides behavior to write tabular data to a given
// destination.
package tableprinter

import (
"fmt"
"io"
"strings"
"text/tabwriter"
)

const (
tabwriterMinWidth = 6
tabwriterWidth = 4
tabwriterPadding = 3
tabwriterPadChar = ' '
tabwriterFlags = tabwriter.FilterHTML
)

// newTabWriter returns a tabwriter that transforms tabbed columns into aligned
// text.
func newTabWriter(output io.Writer) *tabwriter.Writer {
return tabwriter.NewWriter(output, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
}

// PrintTwoColumnTable uses a list of two item records (rows) to write a two
// column table with headers to a given output destination.
func PrintTwoColumnTable(output io.Writer, headers []string, rows [][]string) {
w := newTabWriter(output)

// column headers are at the top, so they are written first
for _, col := range headers {
_, _ = fmt.Fprint(w, strings.ToUpper(col), "\t")
}
_, _ = fmt.Fprintln(w)

// rows form the body of the table
for _, row := range rows {
_, _ = fmt.Fprintln(w, row[0], "\t", row[1])
}

_ = w.Flush()
}
26 changes: 26 additions & 0 deletions internal/tableprinter/tableprinter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package tableprinter_test

import (
"bytes"
"go.arcalot.io/assert"
"go.flow.arcalot.io/engine/internal/tableprinter"
"testing"
)

const basicTwoColTable = `FUNCTION MODEL
a 1
b 2
c 3
`

func TestPrintTwoColumnTable(t *testing.T) {
buf := bytes.NewBuffer(nil)
headers := []string{"function", "model"}
rows := [][]string{
{"a", "1"},
{"b", "2"},
{"c", "3"},
}
tableprinter.PrintTwoColumnTable(buf, headers, rows)
assert.Equals(t, buf.String(), basicTwoColTable)
}
16 changes: 16 additions & 0 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package workflow
import (
"context"
"fmt"
"go.flow.arcalot.io/engine/internal/tablefmt"
"go.flow.arcalot.io/engine/internal/tableprinter"
"io"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -568,3 +571,16 @@ func (s stageChangeHandler) OnStepComplete(
) {
s.onStepComplete(step, previousStage, previousStageOutputID, previousStageOutput, wg)
}

// PrintObjectNamespaceTable constructs and writes a tidy table of workflow
// Objects and their namespaces to the given output destination.
func PrintObjectNamespaceTable(output io.Writer, allNamespaces map[string]map[string]*schema.ObjectSchema, logger log.Logger) {
if len(allNamespaces) == 0 {
logger.Warningf("No namespaces found in workflow")
return
}
groupLists := tablefmt.ExtractGroupedLists[*schema.ObjectSchema](allNamespaces)
df := tablefmt.UnnestLongerSorted(groupLists)
df = tablefmt.SwapColumns(df)
tableprinter.PrintTwoColumnTable(output, []string{"object", "namespace"}, df)
}
Loading

0 comments on commit 43c7889

Please sign in to comment.