Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stack cache and acceleration (rebased) #785

Merged
merged 12 commits into from
Jun 14, 2022
16 changes: 1 addition & 15 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,4 @@ ScientificTypes = "3"
StatisticalTraits = "3"
StatsBase = "0.32, 0.33"
Tables = "0.2, 1.0"
julia = "1.6"

[extras]
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DecisionTree = "7806a523-6efd-50cb-b5f6-3fa6f1930dbb"
Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
MultivariateStats = "6f286f6a-111f-5878-ab1e-185364afe411"
NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce"
StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9"

[targets]
test = ["DataFrames", "DecisionTree", "Distances", "Logging", "MultivariateStats", "NearestNeighbors", "StableRNGs", "Test", "TypedTables"]
julia = "1.6"
9 changes: 5 additions & 4 deletions src/composition/learning_networks/machines.jl
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ end

"""

return!(mach::Machine{<:Surrogate}, model, verbosity)
return!(mach::Machine{<:Surrogate}, model, verbosity; acceleration=CPU1())

The last call in custom code defining the `MLJBase.fit` method for a
new composite model type. Here `model` is the instance of the new type
Expand All @@ -345,7 +345,7 @@ the following:
handles smart updating (namely, an `MLJBase.update` fallback for
composite models).

- Calls `fit!(mach, verbosity=verbosity)`.
- Calls `fit!(mach, verbosity=verbosity, acceleration=acceleration)`.

- Moves any data in source nodes of the learning network into `cache`
(for data-anonymization purposes).
Expand Down Expand Up @@ -388,11 +388,12 @@ end
"""
function return!(mach::Machine{<:Surrogate},
model::Union{Model,Nothing},
verbosity)
verbosity;
acceleration=CPU1())

network_model_names_ = network_model_names(model, mach)

verbosity isa Nothing || fit!(mach, verbosity=verbosity)
verbosity isa Nothing || fit!(mach, verbosity=verbosity, acceleration=acceleration)
setfield!(mach.fitresult, :network_model_names, network_model_names_)

# anonymize the data
Expand Down
25 changes: 23 additions & 2 deletions src/composition/learning_networks/nodes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,17 @@ end
acceleration=CPU1())

Train all machines required to call the node `N`, in an appropriate
order. These machines are those returned by `machines(N)`.
order, but parallelizing where possible using specified `acceleration`
mode. These machines are those returned by `machines(N)`.

Supported modes of `acceleration`: `CPU1()`, `CPUThreads()`.

"""
fit!(y::Node; acceleration=CPU1(), kwargs...) =
fit!(y::Node, acceleration; kwargs...)

fit!(y::Node, ::AbstractResource; kwargs...) =
error("Only `acceleration=CPU1()` currently supported")
error("Only `acceleration=CPU1()` and `acceleration=CPUThreads()` currently supported")

function fit!(y::Node, ::CPU1; kwargs...)

Expand All @@ -230,6 +233,24 @@ function fit!(y::Node, ::CPU1; kwargs...)

return y
end

function fit!(y::Node, ::CPUThreads; kwargs...)
_machines = machines(y)

# flush the fit_okay channels:
for mach in _machines
flush!(mach.fit_okay)
end

# fit the machines in Multithreading mode
@sync for mach in _machines
Threads.@spawn fit_only!(mach, true; kwargs...)
end

return y

end

fit!(S::Source; args...) = S

# allow arguments of `Nodes` and `Machine`s to appear
Expand Down
60 changes: 37 additions & 23 deletions src/composition/models/stacking.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ mutable struct DeterministicStack{modelnames, inp_scitype, tg_scitype} <: Determ
metalearner::Deterministic
resampling
measures::Union{Nothing,AbstractVector}
function DeterministicStack(modelnames, models, metalearner, resampling, measures)
cache::Bool
acceleration::AbstractResource
function DeterministicStack(modelnames, models, metalearner, resampling, measures, cache, acceleration)
inp_scitype, tg_scitype = input_target_scitypes(models, metalearner)
return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures)
return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures, cache, acceleration)
end
end

Expand All @@ -42,9 +44,11 @@ mutable struct ProbabilisticStack{modelnames, inp_scitype, tg_scitype} <: Probab
metalearner::Probabilistic
resampling
measures::Union{Nothing,AbstractVector}
function ProbabilisticStack(modelnames, models, metalearner, resampling, measures)
cache::Bool
acceleration::AbstractResource
function ProbabilisticStack(modelnames, models, metalearner, resampling, measures, cache, acceleration)
inp_scitype, tg_scitype = input_target_scitypes(models, metalearner)
return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures)
return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures, cache, acceleration)
end
end

Expand All @@ -54,7 +58,7 @@ const Stack{modelnames, inp_scitype, tg_scitype} =
ProbabilisticStack{modelnames, inp_scitype, tg_scitype}}

"""
Stack(;metalearner=nothing, resampling=CV(), name1=model1, name2=model2, ...)
Stack(; metalearner=nothing, name1=model1, name2=model2, ..., keyword_options...)

Implements the two-layer generalized stack algorithm introduced by
[Wolpert
Expand Down Expand Up @@ -89,12 +93,17 @@ When training a machine bound to such an instance:
model will optimize the squared error.

- `resampling`: The resampling strategy used
to prepare out-of-sample predictions of the base learners.
to prepare out-of-sample predictions of the base learners.

- `measures`: A measure or iterable over measures, to perform an internal
- `measures`: A measure or iterable over measures, to perform an internal
evaluation of the learners in the Stack while training. This is not for the
evaluation of the Stack itself.

- `cache`: Whether machines created in the learning network will cache data or not.

- `acceleration`: A supported `AbstractResource` to define the training parallelization
mode of the stack.

- `name1=model1, name2=model2, ...`: the `Supervised` model instances
to be used as base learners. The provided names become properties
of the instance created to allow hyper-parameter access
Expand Down Expand Up @@ -139,15 +148,15 @@ evaluate!(mach; resampling=Holdout(), measure=rmse)

```

The internal evaluation report can be accessed like this
The internal evaluation report can be accessed like this
and provides a PerformanceEvaluation object for each model:

```julia
report(mach).cv_report
```

"""
function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures=measure, named_models...)
function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures=measure, cache=true, acceleration=CPU1(), named_models...)
metalearner === nothing &&
throw(ArgumentError("No metalearner specified. Use Stack(metalearner=...)"))

Expand All @@ -159,9 +168,9 @@ function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures=
end

if metalearner isa Deterministic
stack = DeterministicStack(modelnames, models, metalearner, resampling, measures)
stack = DeterministicStack(modelnames, models, metalearner, resampling, measures, cache, acceleration)
elseif metalearner isa Probabilistic
stack = ProbabilisticStack(modelnames, models, metalearner, resampling, measures)
stack = ProbabilisticStack(modelnames, models, metalearner, resampling, measures, cache, acceleration)
else
throw(ArgumentError("The metalearner should be a subtype
of $(Union{Deterministic, Probabilistic})"))
Expand Down Expand Up @@ -202,13 +211,16 @@ function MMI.clean!(stack::Stack{modelnames, inp_scitype, tg_scitype}) where {mo
end


Base.propertynames(::Stack{modelnames}) where modelnames = tuple(:resampling, :metalearner, modelnames...)
Base.propertynames(::Stack{modelnames}) where modelnames =
tuple(:metalearner, :resampling, :measures, :cache, :acceleration, modelnames...)


function Base.getproperty(stack::Stack{modelnames}, name::Symbol) where modelnames
name === :metalearner && return getfield(stack, :metalearner)
name === :resampling && return getfield(stack, :resampling)
name == :measures && return getfield(stack, :measures)
name === :cache && return getfield(stack, :cache)
name == :acceleration && return getfield(stack, :acceleration)
models = getfield(stack, :models)
for j in eachindex(modelnames)
name === modelnames[j] && return models[j]
Expand All @@ -221,6 +233,8 @@ function Base.setproperty!(stack::Stack{modelnames}, _name::Symbol, val) where m
_name === :metalearner && return setfield!(stack, :metalearner, val)
_name === :resampling && return setfield!(stack, :resampling, val)
_name === :measures && return setfield!(stack, :measures, val)
_name === :cache && return setfield!(stack, :cache, val)
_name === :acceleration && return setfield!(stack, :acceleration, val)
idx = findfirst(==(_name), modelnames)
idx isa Nothing || return getfield(stack, :models)[idx] = val
error("type Stack has no property $name")
Expand Down Expand Up @@ -272,7 +286,7 @@ internal_stack_report(m::Stack, verbosity::Int, tt_pairs, folds_evaluations::Var
"""
internal_stack_report(m::Stack, verbosity::Int, y::AbstractNode, folds_evaluations::Vararg{AbstractNode})

When measure/measures is provided, the folds_evaluation will have been filled by `store_for_evaluation`. This function is
When measure/measures is provided, the folds_evaluation will have been filled by `store_for_evaluation`. This function is
not doing any heavy work (not constructing nodes corresponding to measures) but just unpacking all the folds_evaluations in a single node that
can be evaluated later.
"""
Expand Down Expand Up @@ -304,10 +318,10 @@ function internal_stack_report(stack::Stack{modelnames,}, verbosity::Int, tt_pai
fitted_params_per_fold=[],
report_per_fold=[],
train_test_pairs=tt_pairs
)
)
for model in getfield(stack, :models)]
)

# Update the results
index = 1
for foldid in 1:nfolds
Expand All @@ -330,7 +344,7 @@ function internal_stack_report(stack::Stack{modelnames,}, verbosity::Int, tt_pai
end

# Update per_fold
model_results.per_fold[i][foldid] =
model_results.per_fold[i][foldid] =
reports_each_observation(measure) ? MLJBase.aggregate(loss, measure) : loss
end
index += 1
Expand Down Expand Up @@ -366,7 +380,7 @@ end
oos_set(m::Stack, folds::AbstractNode, Xs::Source, ys::Source)

This function is building the out-of-sample dataset that is later used by the `judge`
for its own training. It also returns the folds_evaluations object if internal
for its own training. It also returns the folds_evaluations object if internal
cross-validation results are requested.
"""
function oos_set(m::Stack, Xs::Source, ys::Source, tt_pairs)
Expand All @@ -384,7 +398,7 @@ function oos_set(m::Stack, Xs::Source, ys::Source, tt_pairs)
# predictions are subsequently used as an input to the metalearner
Zfold = []
for model in getfield(m, :models)
mach = machine(model, Xtrain, ytrain)
mach = machine(model, Xtrain, ytrain, cache=m.cache)
ypred = predict(mach, Xtest)
# Internal evaluation on the fold if required
push!(folds_evaluations, store_for_evaluation(mach, Xtest, ytest, m.measures))
Expand Down Expand Up @@ -417,15 +431,15 @@ function fit(m::Stack, verbosity::Int, X, y)

Xs = source(X)
ys = source(y)

Zval, yval, folds_evaluations = oos_set(m, Xs, ys, tt_pairs)

metamach = machine(m.metalearner, Zval, yval)
metamach = machine(m.metalearner, Zval, yval, cache=m.cache)

# Each model is retrained on the original full training set
Zpred = []
for model in getfield(m, :models)
mach = machine(model, Xs, ys)
mach = machine(model, Xs, ys, cache=m.cache)
ypred = predict(mach, Xs)
ypred = pre_judge_transform(ypred, typeof(model), target_scitype(model))
push!(Zpred, ypred)
Expand All @@ -438,6 +452,6 @@ function fit(m::Stack, verbosity::Int, X, y)

# We can infer the Surrogate by two calls to supertype
mach = machine(supertype(supertype(typeof(m)))(), Xs, ys; predict=ŷ, internal_report...)
return!(mach, m, verbosity)

return!(mach, m, verbosity, acceleration=m.acceleration)
end
26 changes: 26 additions & 0 deletions test/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[deps]
CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597"
ComputationalResources = "ed09eef8-17a6-5b46-8889-db040fac31e3"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DecisionTree = "7806a523-6efd-50cb-b5f6-3fa6f1930dbb"
Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
LossFunctions = "30fc2ffe-d236-52d8-8643-a9d8f7c094a7"
MLJModelInterface = "e80e1ace-859a-464e-9ed9-23947d8ae3ea"
MultivariateStats = "6f286f6a-111f-5878-ab1e-185364afe411"
NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce"
OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a"
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ScientificTypes = "321657f4-b219-11e9-178b-2701a2544e81"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9"
8 changes: 4 additions & 4 deletions test/composition/learning_networks/machines.jl
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ zhat = inverse_transform(standM, uhat)
yhat = exp(zhat)
enode = @node mae(ys, yhat)

@testset "replace method for learning network machines" begin
@testset "replace method for learning network machines, acceleration: $(typeof(accel))" for accel in (CPU1(), CPUThreads())

fit!(yhat, verbosity=0)
fit!(yhat, verbosity=0, acceleration=accel)

# test nested reporting:
r = MLJBase.report(yhat)
Expand Down Expand Up @@ -199,7 +199,7 @@ enode = @node mae(ys, yhat)
knnM2 = machines(yhat2, knn) |> first
hotM2 = machines(yhat2, hot) |> first

@test_mach_sequence(fit!(yhat2, force=true),
@test_mach_sequence(fit!(yhat2, force=true, acceleration=accel),
[(:train, standM2), (:train, hotM2),
(:train, knnM2), (:train, oakM2)],
[(:train, hotM2), (:train, standM2),
Expand All @@ -218,7 +218,7 @@ enode = @node mae(ys, yhat)
# this change should trigger retraining of all machines except the
# univariate standardizer:
hot2.drop_last = true
@test_mach_sequence(fit!(yhat2),
@test_mach_sequence(fit!(yhat2, acceleration=accel),
[(:skip, standM2), (:update, hotM2),
(:train, knnM2), (:train, oakM2)],
[(:update, hotM2), (:skip, standM2),
Expand Down
Loading