Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI Show Workflow Namespaces Option #200

Merged
merged 30 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions cmd/arcaflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"

"go.arcalot.io/log/v2"
"go.flow.arcalot.io/engine"
"go.flow.arcalot.io/engine/config"
"go.flow.arcalot.io/engine/loadfile"
"go.flow.arcalot.io/engine/workflow"
"gopkg.in/yaml.v3"
"os"
"os/signal"
"path/filepath"
)

// These variables are filled using ldflags during the build process with Goreleaser.
Expand Down Expand Up @@ -58,19 +58,22 @@ func main() {
dir := "."
workflowFile := "workflow.yaml"
printVersion := false
getNamespaces := false

const (
versionUsage = "Print Arcaflow Engine version and exit."
configUsage = "The path to the Arcaflow configuration file to load, if any."
inputUsage = "The path to the workflow input file to load, if any."
contextUsage = "The path to the workflow directory to run from."
workflowUsage = "The path to the workflow file to load."
versionUsage = "Print Arcaflow Engine version and exit."
configUsage = "The path to the Arcaflow configuration file to load, if any."
inputUsage = "The path to the workflow input file to load, if any."
contextUsage = "The path to the workflow directory to run from."
workflowUsage = "The path to the workflow file to load."
getNamespacesUsage = "Show the namespaces available to this workflow."
dbutenhof marked this conversation as resolved.
Show resolved Hide resolved
)
flag.BoolVar(&printVersion, "version", printVersion, versionUsage)
flag.StringVar(&configFile, "config", configFile, configUsage)
flag.StringVar(&input, "input", input, inputUsage)
flag.StringVar(&dir, "context", dir, contextUsage)
flag.StringVar(&workflowFile, "workflow", workflowFile, workflowUsage)
flag.BoolVar(&getNamespaces, "get-namespaces", getNamespaces, getNamespacesUsage)

flag.Usage = func() {
w := flag.CommandLine.Output()
Expand Down Expand Up @@ -183,10 +186,10 @@ func main() {
}
}

os.Exit(runWorkflow(flow, fileCtx, RequiredFileKeyWorkflow, logger, inputData))
os.Exit(runWorkflow(flow, fileCtx, RequiredFileKeyWorkflow, logger, inputData, getNamespaces))
}

func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflowFile string, logger log.Logger, inputData []byte) int {
func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflowFile string, logger log.Logger, inputData []byte, getNamespaces bool) int {
ctx, cancel := context.WithCancel(context.Background())
ctrlC := make(chan os.Signal, 4) // We expect up to three ctrl-C inputs. Plus one extra to buffer in case.
signal.Notify(ctrlC, os.Interrupt)
Expand All @@ -197,13 +200,18 @@ func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflo
cancel()
}()

workflow, err := flow.Parse(fileCtx, workflowFile)
workFlowObj, err := flow.Parse(fileCtx, workflowFile)
if err != nil {
logger.Errorf("Invalid workflow (%v)", err)
return ExitCodeInvalidData
}

outputID, outputData, outputError, err := workflow.Run(ctx, inputData)
if getNamespaces {
workflow.PrintObjectNamespaceTable(os.Stdout, workFlowObj.Namespaces(), logger)
return ExitCodeOK
}

outputID, outputData, outputError, err := workFlowObj.Run(ctx, inputData)
if err != nil {
logger.Errorf("Workflow execution failed (%v)", err)
return ExitCodeWorkflowFailed
Expand All @@ -222,6 +230,7 @@ func runWorkflow(flow engine.WorkflowEngine, fileCtx loadfile.FileCache, workflo
if outputError {
return ExitCodeWorkflowErrorOutput
}

return ExitCodeOK
}

Expand Down
5 changes: 5 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Workflow interface {
InputSchema() schema.Scope
// Outputs returns the list of possible outputs and their schema for the workflow.
Outputs() map[string]schema.StepOutput
Namespaces() map[string]map[string]*schema.ObjectSchema
}

type workflowEngine struct {
Expand Down Expand Up @@ -234,3 +235,7 @@ func (e engineWorkflow) Outputs() map[string]schema.StepOutput {
}
return outputs
}

func (e engineWorkflow) Namespaces() map[string]map[string]*schema.ObjectSchema {
return e.workflow.Namespaces()
}
53 changes: 53 additions & 0 deletions internal/tablefmt/tablefmt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Package tablefmt provides functions to create tabular data where
// 1. Each variable is a column; each column is a variable.
// 2. Each observation is a row; each row is an observation.
// 3. Each value is a cell; each cell is a single value.
//
// Its behavior is inspired by the R package tidyr.
// https://tidyr.tidyverse.org/index.html
package tablefmt

import "sort"

// UnnestLongerSorted turns each element of a list-group
// into a row. Each key in the map represents a group and
// each group is associated with a list of values.
func UnnestLongerSorted(twoColDf map[string][]string) [][]string {
df := [][]string{}
groupNames := []string{}
for name := range twoColDf {
groupNames = append(groupNames, name)
}
sort.Strings(groupNames)
for _, name := range groupNames {
groupRows := twoColDf[name]
sort.Strings(groupRows)
for _, rowValue := range groupRows {
df = append(df, []string{name, rowValue})
}
}
return df
}

// SwapColumns swaps the row values between the first and second column, if
// the row has a length of two.
func SwapColumns(df [][]string) [][]string {
for k := range df {
if len(df[k]) == 2 {
df[k][0], df[k][1] = df[k][1], df[k][0]
}
}
return df
}

// ExtractGroupedLists transforms a map of maps into a map of strings. The keys
// in the nested map become a list of values.
func ExtractGroupedLists[T any](data map[string]map[string]T) map[string][]string {
groupLists := map[string][]string{}
for namespace, objects := range data {
for objName := range objects {
groupLists[namespace] = append(groupLists[namespace], objName)
}
}
return groupLists
}
122 changes: 122 additions & 0 deletions internal/tablefmt/tablefmt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package tablefmt_test

import (
"go.arcalot.io/assert"
"go.flow.arcalot.io/engine/internal/tablefmt"
"sort"
"testing"
)

func TestUnnestLongerSortedHappy(t *testing.T) {
astromech := []string{"q7", "bb", "r2", "r4"}
protocol := []string{"3po", "000", "chatty"}
battle := []string{"b1", "ig"}
probe := []string{"cobra"}
astromechGroup := "astromech"
protocolGroup := "protocol"
battleGroup := "battle"
probeGroup := "probe"
astromechSorted := make([]string, len(astromech))
protocolSorted := make([]string, len(protocol))
battleSorted := make([]string, len(battle))
probeSorted := make([]string, len(probe))
sort.Strings(astromechSorted)
sort.Strings(battleSorted)
sort.Strings(probeSorted)
sort.Strings(protocolSorted)
expOut := [][]string{
{"astromech", "bb"},
{"astromech", "q7"},
{"astromech", "r2"},
{"astromech", "r4"},
{"battle", "b1"},
{"battle", "ig"},
{"probe", "cobra"},
{"protocol", "000"},
{"protocol", "3po"},
{"protocol", "chatty"},
}
input := map[string][]string{
protocolGroup: protocol,
astromechGroup: astromech,
battleGroup: battle,
probeGroup: probe,
}
assert.Equals(t, tablefmt.UnnestLongerSorted(input), expOut)
}

func TestUnnestLongerSortedEmpty(t *testing.T) {
input := map[string][]string{}
assert.Equals(t, tablefmt.UnnestLongerSorted(input), [][]string{})
}

func TestUnnestLongerSortedEmptyGroup(t *testing.T) {
input := map[string][]string{
"G": {"g"},
"D": {},
}
expOut := [][]string{
{"G", "g"},
}
assert.Equals(t, tablefmt.UnnestLongerSorted(input), expOut)
}

func TestSwapColumnsHappy(t *testing.T) {
testData := map[string]struct {
input [][]string
expected [][]string
}{
"happy": {
input: [][]string{{"fruit", "tomato"},
{"fruit", "cucumber"},
{"veggie", "spinach"},
{"veggie", "carrot"},
},
expected: [][]string{
{"tomato", "fruit"},
{"cucumber", "fruit"},
{"spinach", "veggie"},
{"carrot", "veggie"},
},
},
"row_with_zero_len": {
input: [][]string{
{"a", "b"},
{},
{"a", "c"},
},
expected: [][]string{
{"b", "a"},
{},
{"c", "a"},
},
},
"row_with_one_len": {
input: [][]string{
{"a", "b"},
{"z"},
{"a", "c"},
},
expected: [][]string{
{"b", "a"},
{"z"},
{"c", "a"},
},
},
"row_with_three_len": {
input: [][]string{
{"a", "b"},
{"x", "y", "z"},
{"a", "c"},
},
expected: [][]string{
{"b", "a"},
{"x", "y", "z"},
{"c", "a"},
},
},
}
for _, tc := range testData {
assert.Equals(t, tablefmt.SwapColumns(tc.input), tc.expected)
}
}
43 changes: 43 additions & 0 deletions internal/tableprinter/tableprinter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Package tableprinter provides behavior to write tabular data to a given
// destination.
package tableprinter

import (
"fmt"
"io"
"strings"
"text/tabwriter"
)

const (
tabwriterMinWidth = 6
tabwriterWidth = 4
tabwriterPadding = 3
tabwriterPadChar = ' '
tabwriterFlags = tabwriter.FilterHTML
)

// newTabWriter returns a tabwriter that transforms tabbed columns into aligned
// text.
func newTabWriter(output io.Writer) *tabwriter.Writer {
return tabwriter.NewWriter(output, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
}

// PrintTwoColumnTable uses a list of two item records (rows) to write a two
// column table with headers to a given output destination.
func PrintTwoColumnTable(output io.Writer, headers []string, rows [][]string) {
w := newTabWriter(output)

// column headers are at the top, so they are written first
for _, col := range headers {
_, _ = fmt.Fprint(w, strings.ToUpper(col), "\t")
}
_, _ = fmt.Fprintln(w)

// rows form the body of the table
for _, row := range rows {
_, _ = fmt.Fprintln(w, row[0], "\t", row[1])
dbutenhof marked this conversation as resolved.
Show resolved Hide resolved
}

_ = w.Flush()
}
26 changes: 26 additions & 0 deletions internal/tableprinter/tableprinter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package tableprinter_test

import (
"bytes"
"go.arcalot.io/assert"
"go.flow.arcalot.io/engine/internal/tableprinter"
"testing"
)

const basicTwoColTable = `FUNCTION MODEL
a 1
b 2
c 3
`

func TestPrintTwoColumnTable(t *testing.T) {
buf := bytes.NewBuffer(nil)
headers := []string{"function", "model"}
rows := [][]string{
{"a", "1"},
{"b", "2"},
{"c", "3"},
}
tableprinter.PrintTwoColumnTable(buf, headers, rows)
assert.Equals(t, buf.String(), basicTwoColTable)
}
16 changes: 16 additions & 0 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package workflow
import (
"context"
"fmt"
"go.flow.arcalot.io/engine/internal/tablefmt"
"go.flow.arcalot.io/engine/internal/tableprinter"
"io"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -568,3 +571,16 @@ func (s stageChangeHandler) OnStepComplete(
) {
s.onStepComplete(step, previousStage, previousStageOutputID, previousStageOutput, wg)
}

// PrintObjectNamespaceTable constructs and writes a tidy table of workflow
// Objects and their namespaces to the given output destination.
func PrintObjectNamespaceTable(output io.Writer, allNamespaces map[string]map[string]*schema.ObjectSchema, logger log.Logger) {
if len(allNamespaces) == 0 {
logger.Warningf("No namespaces found in workflow")
return
}
groupLists := tablefmt.ExtractGroupedLists[*schema.ObjectSchema](allNamespaces)
df := tablefmt.UnnestLongerSorted(groupLists)
df = tablefmt.SwapColumns(df)
tableprinter.PrintTwoColumnTable(output, []string{"object", "namespace"}, df)
}
Loading
Loading