Skip to content

Commit

Permalink
Merge pull request #207 from daniel-thom/optimize-time-series-cache
Browse files Browse the repository at this point in the history
Make TimeSeriesCache structs immutable
  • Loading branch information
daniel-thom authored Mar 23, 2021
2 parents 0bd9002 + 85bf587 commit b0f471f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 71 deletions.
152 changes: 89 additions & 63 deletions src/time_series_cache.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,27 @@ end
Base.length(c::TimeSeriesCache) = _get_num_iterations(c)

"""
Return the next TimeSeries.TimeArray.
Return the TimeSeries.TimeArray starting at timestamp.
Reads from storage if the data is not already in cache.
Returns `nothing` when all data has been read. Call [`reset!`](@ref) to restart.
Call [`get_next_time`](@ref) to check the start time.
Timestamps must be read sequentially. Random access may be added in the future.
# Arguments
- `cache::StaticTimeSeriesCache`: cached instance
- `timestamp::Dates.DateTime`: starting timestamp for the time series array
Reads from storage if the data is not already in cache.
"""
function get_next_time_series_array!(cache::TimeSeriesCache)
if _get_iterations_remaining(cache) == 0
return
function get_time_series_array!(cache::TimeSeriesCache, timestamp::Dates.DateTime)
next_time = get_next_time(cache)
if next_time === nothing
throw(InvalidValue("timestamp = $timestamp is beyond the range of the time series"))
end

next_time = get_next_time(cache)
if _get_time_series(cache) === nothing || next_time > _get_last_cached_time(cache)
if timestamp != next_time
throw(InvalidValue("random access is not currently supported. next = $next_time"))
end

if next_time > _get_last_cached_time(cache)
@debug "get_next_time_series_array! update cache" next_time
_update!(cache)
else
Expand All @@ -56,6 +60,26 @@ function get_next_time_series_array!(cache::TimeSeriesCache)
return ta
end

"""
Return the next TimeSeries.TimeArray.
Returns `nothing` when all data has been read. Call [`reset!`](@ref) to restart.
Call [`get_next_time`](@ref) to check the start time.
Reads from storage if the data is not already in cache.
# Arguments
- `cache::StaticTimeSeriesCache`: cached instance
"""
function get_next_time_series_array!(cache::TimeSeriesCache)
next_time = get_next_time(cache)
if next_time === nothing
return
end

return get_time_series_array!(cache, next_time)
end

"""
Return the timestamp for the next read with [`get_next_time_series_array!`](@ref).
Expand All @@ -66,7 +90,7 @@ function get_next_time(cache::TimeSeriesCache)
return
end

return cache.common.next_time
return cache.common.next_time[]
end

"""
Expand All @@ -75,78 +99,79 @@ Reset parameters in order to start reading data from the beginning with
"""
reset!(cache::TimeSeriesCache) = _reset!(cache.common)

_get_component(c::TimeSeriesCache) = c.common.component
_get_last_cached_time(c::TimeSeriesCache) = c.common.last_cached_time
_get_length_available(c::TimeSeriesCache) = c.common.length_available
_set_length_available!(c::TimeSeriesCache, len) = c.common.length_available = len
_get_length_remaining(c::TimeSeriesCache) = c.common.length_remaining
_decrement_length_remaining!(c::TimeSeriesCache, num) = c.common.length_remaining -= num
_get_component(c::TimeSeriesCache) = _get_component(c.common)
_get_last_cached_time(c::TimeSeriesCache) = c.common.last_cached_time[]
_get_length_available(c::TimeSeriesCache) = c.common.length_available[]
_set_length_available!(c::TimeSeriesCache, len) = c.common.length_available[] = len
_get_length_remaining(c::TimeSeriesCache) = c.common.length_remaining[]
_decrement_length_remaining!(c::TimeSeriesCache, num) = c.common.length_remaining[] -= num
_get_name(c::TimeSeriesCache) = c.common.name
_get_num_iterations(c::TimeSeriesCache) = c.common.num_iterations
_get_ignore_scaling_factors(c::TimeSeriesCache) = c.common.ignore_scaling_factors
_get_type(c::TimeSeriesCache) = c.common.time_series_type
_get_time_series(c::TimeSeriesCache) = c.common.ts
_set_time_series!(c::TimeSeriesCache, ts) = c.common.ts = ts
_get_iterations_remaining(c::TimeSeriesCache) = c.common.iterations_remaining
_decrement_iterations_remaining!(c::TimeSeriesCache) = c.common.iterations_remaining -= 1

mutable struct TimeSeriesCacheCommon
ts::Union{Nothing, TimeSeriesData}
time_series_type::DataType
component::InfrastructureSystemsComponent
_get_type(c::TimeSeriesCache) = typeof(c.common.ts[])
_get_time_series(c::TimeSeriesCache) = c.common.ts[]
_set_time_series!(c::TimeSeriesCache, ts) = c.common.ts[] = ts
_get_iterations_remaining(c::TimeSeriesCache) = c.common.iterations_remaining[]
_decrement_iterations_remaining!(c::TimeSeriesCache) = c.common.iterations_remaining[] -= 1

struct TimeSeriesCacheCommon{T <: TimeSeriesData, U <: InfrastructureSystemsComponent}
ts::Base.RefValue{T}
component::U
name::String
orig_next_time::Dates.DateTime
next_time::Dates.DateTime
last_cached_time::Dates.DateTime
next_time::Base.RefValue{Dates.DateTime}
last_cached_time::Base.RefValue{Dates.DateTime}
"Total length"
len::Int
"Cached data available to read"
length_available::Int
length_available::Base.RefValue{Int}
"Length remaining to be read on disk"
length_remaining::Int
length_remaining::Base.RefValue{Int}
"Total iterations to traverse all data"
num_iterations::Int
iterations_remaining::Int
iterations_remaining::Base.RefValue{Int}
ignore_scaling_factors::Bool

function TimeSeriesCacheCommon(;
ts,
time_series_type,
component,
name,
next_time,
len,
num_iterations,
ignore_scaling_factors,
)
new(
ts,
time_series_type,
new{typeof(ts), typeof(component)}(
Ref(ts),
component,
name,
next_time,
next_time,
next_time - Dates.Minute(1),
Ref(next_time),
Ref(next_time - Dates.Minute(1)),
len,
0,
len,
num_iterations,
Ref(0),
Ref(len),
num_iterations,
Ref(num_iterations),
ignore_scaling_factors,
)
end
end

_get_component(c::TimeSeriesCacheCommon) = c.component

function _reset!(common::TimeSeriesCacheCommon)
common.next_time = common.orig_next_time
common.length_available = common.len
common.length_remaining = common.len
common.iterations_remaining = common.num_iterations
common.ts = nothing
common.next_time[] = common.orig_next_time
common.last_cached_time[] = common.orig_next_time - Dates.Minute(1)
common.length_available[] = common.len
common.length_remaining[] = common.len
common.iterations_remaining[] = common.num_iterations
return
end

mutable struct ForecastCache <: TimeSeriesCache
common::TimeSeriesCacheCommon
struct ForecastCache{T <: TimeSeriesData, U <: InfrastructureSystemsComponent} <:
TimeSeriesCache
common::TimeSeriesCacheCommon{T, U}
in_memory_count::Int
horizon::Int
end
Expand Down Expand Up @@ -187,18 +212,19 @@ function ForecastCache(
end

# Get one instance to assess data size.
vals = get_time_series_values(
ts = get_time_series(
T,
component,
name;
start_time = start_time,
len = get_horizon(ts_metadata),
)
vals = get_time_series_values(component, ts, start_time, len = get_horizon(ts_metadata))
row_size = _get_row_size(vals)

count = get_count(ts_metadata)
if start_time != initial_timestamp
count -= (start_time - initial_timestamp) / get_interval(ts_metadata)
count -= (start_time - initial_timestamp) ÷ get_interval(ts_metadata)
end

window_size = row_size * horizon
Expand All @@ -207,8 +233,7 @@ function ForecastCache(

return ForecastCache(
TimeSeriesCacheCommon(
ts = nothing,
time_series_type = T,
ts = ts,
component = component,
name = name,
next_time = start_time,
Expand Down Expand Up @@ -250,16 +275,17 @@ function _update!(cache::ForecastCache)
end

function _increment_next_time!(cache::ForecastCache, len)
cache.common.next_time += get_interval(cache.common.ts)
cache.common.next_time[] += get_interval(_get_time_series(cache))
end

function _set_last_cached_time!(cache::ForecastCache, next_time)
interval = get_interval(cache.common.ts)
cache.common.last_cached_time = next_time + (cache.in_memory_count - 1) * interval
interval = get_interval(_get_time_series(cache))
cache.common.last_cached_time[] = next_time + (cache.in_memory_count - 1) * interval
end

struct StaticTimeSeriesCache <: TimeSeriesCache
common::TimeSeriesCacheCommon
struct StaticTimeSeriesCache{T <: TimeSeriesData, U <: InfrastructureSystemsComponent} <:
TimeSeriesCache
common::TimeSeriesCacheCommon{T, U}
in_memory_rows::Int
end

Expand Down Expand Up @@ -294,11 +320,12 @@ function StaticTimeSeriesCache(

len = length(ts_metadata)
if start_time != initial_timestamp
len -= (start_time - initial_timestamp) / get_resolution(ts_metadata)
len -= (start_time - initial_timestamp) ÷ get_resolution(ts_metadata)
end

# Get an instance to assess data size.
vals = get_time_series_values(T, component, name; len = 1)
ts = get_time_series(T, component, name; start_time = start_time, len = 1)
vals = get_time_series_values(component, ts, start_time, len = 1)
row_size = _get_row_size(vals)
max_chunk_size = row_size * len
in_memory_rows = minimum((trunc(Int, cache_size_bytes / row_size), len))
Expand All @@ -307,8 +334,7 @@ function StaticTimeSeriesCache(
num_iterations = ceil(Int, len / in_memory_rows)
return StaticTimeSeriesCache(
TimeSeriesCacheCommon(
ts = nothing,
time_series_type = T,
ts = ts,
component = component,
name = name,
next_time = start_time,
Expand Down Expand Up @@ -347,12 +373,12 @@ function _update!(cache::StaticTimeSeriesCache)
end

function _set_last_cached_time!(c::StaticTimeSeriesCache, next_time)
resolution = get_resolution(c.common.ts)
c.common.last_cached_time = next_time + (c.in_memory_rows - 1) * resolution
resolution = get_resolution(_get_time_series(c))
c.common.last_cached_time[] = next_time + (c.in_memory_rows - 1) * resolution
end

function _increment_next_time!(cache::StaticTimeSeriesCache, len)
cache.common.next_time += len * get_resolution(cache.common.ts)
cache.common.next_time[] += len * get_resolution(_get_time_series(cache))
end

function _get_row_size(vals)
Expand Down
14 changes: 6 additions & 8 deletions test/test_time_series_cache.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@

cache = IS.ForecastCache(IS.Deterministic, component, "test")
@test cache.in_memory_count == 168
@test cache.common.next_time == initial_timestamp
@test cache.common.ts === nothing
@test IS.get_next_time(cache) == initial_timestamp
@test length(cache) == cache.common.num_iterations == 168

# Iterate over all initial times with default cache size.
Expand Down Expand Up @@ -70,18 +69,18 @@
# Test caching internals.
cache = IS.ForecastCache(IS.Deterministic, component, "test"; cache_size_bytes = 1024)
@test cache.in_memory_count == 5
@test cache.common.next_time == initial_timestamp
@test IS.get_next_time(cache) == initial_timestamp
for it in initial_times[1:(cache.in_memory_count)]
ta = IS.get_next_time_series_array!(cache)
@test cache.common.last_cached_time == initial_times[5]
@test IS._get_last_cached_time(cache) == initial_times[5]
@test TimeSeries.timestamp(ta) ==
IS.get_time_series_timestamps(component, forecast, it)
@test TimeSeries.values(ta) == IS.get_time_series_values(component, forecast, it)
end

# The next access should trigger a read.
ta = IS.get_next_time_series_array!(cache)
@test cache.common.last_cached_time == initial_times[10]
@test IS._get_last_cached_time(cache) == initial_times[10]
@test TimeSeries.timestamp(ta) ==
IS.get_time_series_timestamps(component, forecast, initial_times[6])
@test TimeSeries.values(ta) ==
Expand All @@ -107,8 +106,7 @@ end

cache = IS.StaticTimeSeriesCache(IS.SingleTimeSeries, component, "test")
@test cache.in_memory_rows == 365
@test cache.common.next_time == initial_timestamp
@test cache.common.ts === nothing
@test IS.get_next_time(cache) == initial_timestamp
@test length(cache) == cache.common.num_iterations == 1

# Iterate over all initial times with default cache size.
Expand Down Expand Up @@ -168,7 +166,7 @@ end
)
@test cache.in_memory_rows == 128
@test cache.common.num_iterations == 2
@test cache.common.length_remaining == 365 - 128
@test IS._get_length_remaining(cache) == 365 - 128
for i in 1:2
ta = IS.get_next_time_series_array!(cache)
it = initial_timestamp + i * cache.in_memory_rows * resolution
Expand Down

0 comments on commit b0f471f

Please sign in to comment.