Skip to content

Commit

Permalink
feat(client): progressive results
Browse files Browse the repository at this point in the history
  • Loading branch information
rcarriga committed Jul 23, 2022
1 parent 1b0962e commit af767c6
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 90 deletions.
43 changes: 28 additions & 15 deletions lua/neotest/client/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ end

---Run the given tree
---@async
---@param tree? neotest.Tree
---@param tree neotest.Tree
---@param args table
---@field adapter string: Adapter ID
---@field strategy string: Strategy to run commands with
Expand All @@ -48,28 +48,41 @@ function NeotestClient:run_tree(tree, args)
table.insert(pos_ids, pos.id)
end

local pos = tree:data()
local adapter_id, adapter = self:_get_adapter(pos.id, args.adapter)
if not adapter_id then
logger.error("Adapter not found for position", pos.id)
local root = tree:data()
local adapter_id, adapter = self:_get_adapter(root.id, args.adapter)
if not adapter_id or not adapter then
logger.error("Adapter not found for position", root.id)
return
end
self._state:update_running(adapter_id, pos.id, pos_ids)
local success, results = pcall(self._runner._run_tree, self._runner, tree, args, adapter)
self._state:update_running(adapter_id, root.id, pos_ids)
local all_results = {}
local success, error = pcall(
self._runner.run_tree,
self._runner,
tree,
args,
adapter,
function(results)
for pos_id, result in pairs(results) do
all_results[pos_id] = result
end
self._state:update_results(adapter_id, results)
end
)
if not success then
lib.notify(("%s: %s"):format(adapter.name, results), "warn")
results = {}
lib.notify(("%s: %s"):format(adapter.name, error), "warn")
all_results = {}
for _, pos in tree:iter() do
results[pos.id] = { status = "skipped" }
all_results[pos.id] = { status = "skipped" }
end
end
if pos.type ~= "test" then
self._runner:collect_results(tree, results)
if root.type ~= "test" then
self._runner:fill_results(tree, all_results)
end
if pos.type == "test" or pos.type == "namespace" then
results[pos.path] = nil
if root.type == "test" or root.type == "namespace" then
all_results[root.path] = nil
end
self._state:update_results(adapter_id, results)
self._state:update_results(adapter_id, all_results)
end

---@async
Expand Down
192 changes: 134 additions & 58 deletions lua/neotest/client/runner.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,83 +19,146 @@ end
---@param tree neotest.Tree
---@param args table
---@param adapter neotest.Adapter
---@return table<string, neotest.Result>
function TestRunner:_run_tree(tree, args, adapter)
args = args or {}
args.strategy = args.strategy or "integrated"
local position = tree:data()
function TestRunner:run_tree(tree, args, adapter, on_results)
local results = {}
local results_callback = function(results_)
on_results(results_)
for pos_id, result in ipairs(results_) do
results[pos_id] = result
end
end

local spec = adapter.build_spec(vim.tbl_extend("force", args, {
tree = tree,
}))
args = vim.tbl_extend("keep", args or {}, { strategy = "integrated" })

local results = {}
self:_run_tree(tree, args, adapter, results_callback)
on_results(results)
end

function TestRunner:_run_tree(tree, args, adapter, results_callback)
local spec = adapter.build_spec(vim.tbl_extend("force", args, { tree = tree }))

if not spec then
local function run_pos_types(pos_type)
local async_runners = {}
for _, node in tree:iter_nodes() do
if node:data().type == pos_type then
table.insert(async_runners, function()
return self:_run_tree(node, args, adapter)
end)
end
self:_run_broken_down_tree(tree, args, adapter, results_callback)
return
end
self:_run_spec(spec, tree, args, adapter, results_callback)
end

function TestRunner:_stream_queue()
local sender, receiver = async.control.channel.mpsc()

local producer = function(output_stream)
local orig = ""
local pending_data = nil
for data in output_stream do
orig = orig .. data
local ends_with_newline = vim.endswith(data, "\n")
local next_lines = vim.split(data, "\n", { plain = true, trimempty = true })
if pending_data then
next_lines[1] = pending_data .. next_lines[1]
pending_data = nil
end
local all_results = {}
if #async_runners == 0 then
return {}
if not ends_with_newline then
pending_data = table.remove(next_lines, #next_lines)
end
for i, res in ipairs(async.util.join(async_runners)) do
all_results[i] = res[1]
for _, line in ipairs(next_lines) do
sender.send(line)
end
return vim.tbl_extend("error", {}, unpack(all_results))
end
end

if position.type == "dir" then
logger.warn(("%s doesn't support running directories, attempting files"):format(adapter.name))
results = run_pos_types("file")
elseif position.type ~= "test" then
logger.warn(("%s doesn't support running %ss"):format(adapter.name, position.type))
results = run_pos_types("test")
else
error(("%s returned no data to run tests"):format(adapter.name))
end
else
spec.strategy =
vim.tbl_extend("force", spec.strategy or {}, config.strategies[args.strategy] or {})
local consumer = function()
return receiver.recv()
end
return producer, consumer
end

---@param spec neotest.RunSpec
---@param adapter neotest.Adapter
function TestRunner:_run_spec(spec, tree, args, adapter, results_callback)
local position = tree:data()
spec.strategy =
vim.tbl_extend("force", spec.strategy or {}, config.strategies[args.strategy] or {})
spec.env = vim.tbl_extend("force", spec.env or {}, args.env or {})
spec.cwd = args.cwd or spec.cwd
if vim.tbl_isempty(spec.env or {}) then
spec.env = nil
end
local process_result =
self._processes:run(self:_create_process_key(adapter.name, position.id), spec, args)
results = adapter.results(spec, process_result, tree)
if vim.tbl_isempty(results) then
if #tree:children() ~= 0 then
logger.warn("Results returned were empty, setting all positions to failed")
for _, pos in tree:iter() do
results[pos.id] = {
status = "failed",
errors = {},
output = process_result.output,
}
end
else
results[tree:data().id] = { status = "skipped", output = process_result.output }
end
else
for _, result in pairs(results) do
if not result.output then
result.output = process_result.output
end


local proc_key = self:_create_process_key(adapter.name, position.id)
local producer, consumer = self:_stream_queue()

local process_result = self._processes:run(proc_key, spec, args, spec.stream and producer)
if spec.stream then
async.run(function()
for stream_results in spec.stream(consumer) do
results_callback(stream_results)
end
end)
end

local results = adapter.results(spec, process_result, tree)

if vim.tbl_isempty(results) then
results_callback(self:_fill_empty_results(tree, process_result.output))
return
end

self:fill_results(tree, results)

for _, result in pairs(results) do
if not result.output then
result.output = process_result.output
end
end

results_callback(results)
end

function TestRunner:_fill_empty_results(tree, output_path)
if #tree:children() == 0 then
return { [tree:data().id] = { status = "skipped", output = output_path } }
end
local results = {}
logger.warn("Results returned were empty, setting all positions to failed")
for _, pos in tree:iter() do
results[pos.id] = {
status = "failed",
errors = {},
output = output_path,
}
end
return results
end

function TestRunner:_run_broken_down_tree(tree, args, adapter, results_callback)
local position = tree:data()
local function run_pos_types(pos_type)
local async_runners = {}
for _, node in tree:iter_nodes() do
if node:data().type == pos_type then
table.insert(async_runners, function()
self:_run_tree(node, args, adapter, results_callback)
end)
end
end
if #async_runners == 0 then
return {}
end
async.util.join(async_runners)
end

if position.type == "dir" then
logger.warn(("%s doesn't support running directories, attempting files"):format(adapter.name))
return run_pos_types("file")
elseif position.type ~= "test" then
logger.warn(("%s doesn't support running %ss"):format(adapter.name, position.type))
return run_pos_types("test")
end
error(("%s returned no data to run tests"):format(adapter.name))
end

function TestRunner:_create_process_key(adapter_id, pos_id)
return adapter_id .. "-" .. pos_id
end
Expand Down Expand Up @@ -146,8 +209,11 @@ function TestRunner:attach(position, adapter_id)
end

---@async
function TestRunner:collect_results(tree, results)
---@param tree neotest.Tree
---@param results table<string, neotest.Result>
function TestRunner:fill_results(tree, results)
local root = tree:data()
local missing_tests = {}
for _, node in tree:iter_nodes() do
local pos = node:data()

Expand Down Expand Up @@ -176,6 +242,10 @@ function TestRunner:collect_results(tree, results)

results[parent_pos.id] = parent_result
end
else
if pos.type == "test" then
missing_tests[#missing_tests + 1] = pos.id
end
end
end

Expand All @@ -186,7 +256,7 @@ function TestRunner:collect_results(tree, results)
if pos.type == "file" then
-- Files not being present means that they were skipped (probably)
if not results[pos.id] and root_result then
results[pos.id] = { status = "skipped", output = root.output }
results[pos.id] = { status = "skipped", output = root_result.output }
end
else
-- Tests and namespaces not being present means that they failed to even start, count as root result
Expand All @@ -196,6 +266,12 @@ function TestRunner:collect_results(tree, results)
end
end
end

for _, test_id in ipairs(missing_tests) do
for parent in tree:get_key(test_id):iter_parents() do
results[parent:data().id] = nil
end
end
end

return function(processes)
Expand Down
10 changes: 9 additions & 1 deletion lua/neotest/client/strategies/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ end
---@param spec neotest.RunSpec
---@param args? table
---@return neotest.StrategyResult
function NeotestProcessTracker:run(pos_id, spec, args)
function NeotestProcessTracker:run(pos_id, spec, args, process_stream)
--TODO Break this up so we can use instance.output_stream before awaiting finish
local strategy = self:_get_strategy(args)
logger.info("Starting process", pos_id, "with strategy", args.strategy)
logger.debug("Strategy spec", spec)
Expand All @@ -38,6 +39,13 @@ function NeotestProcessTracker:run(pos_id, spec, args)
return { code = 1, output = output_path }
end
self._instances[pos_id] = instance
if process_stream then
async.run(function()
for data in instance.output_stream() do
process_stream(data)
end
end)
end
local code = instance.result()
logger.info("Process for position", pos_id, "exited with code", code)
local output = instance.output()
Expand Down
Loading

0 comments on commit af767c6

Please sign in to comment.