diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 12d78ce..c76a379 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -43,7 +43,7 @@ jobs: run: bundle exec rspec - name: Send test coverage to CodeClimate - uses: paambaati/codeclimate-action@v8.0.0 + uses: paambaati/codeclimate-action@v9.0.0 if: ${{ env.CC_TEST_REPORTER_ID }} with: coverageCommand: true diff --git a/Gemfile b/Gemfile index fe2c87d..fb89fcc 100644 --- a/Gemfile +++ b/Gemfile @@ -13,7 +13,7 @@ gem 'csv' gem 'base64' # A toolkit of support libraries and Ruby core extensions extracted from the Rails framework. (https://rubyonrails.org) -gem 'activesupport' +gem 'activesupport', '~> 7.1', '< 7.2' # A Ruby client library for Redis (https://github.com/redis/redis-rb) gem 'redis' diff --git a/Gemfile.lock b/Gemfile.lock index 6da15df..4a95e48 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -18,7 +18,7 @@ GEM base64 (0.2.0) bigdecimal (3.1.8) coderay (1.1.3) - concurrent-ruby (1.3.3) + concurrent-ruby (1.3.4) connection_pool (2.4.1) crack (1.0.0) bigdecimal @@ -55,14 +55,14 @@ GEM rb-inotify (~> 0.9, >= 0.9.10) lumberjack (1.2.10) method_source (1.1.0) - minitest (5.24.1) + minitest (5.25.1) mutex_m (0.2.0) nenv (0.3.0) notiffany (0.1.3) nenv (~> 0.1) shellany (~> 0.0) - parallel (1.25.1) - parser (3.3.4.0) + parallel (1.26.3) + parser (3.3.4.2) ast (~> 2.4.1) racc pry (0.14.2) @@ -75,12 +75,12 @@ GEM rb-fsevent (0.11.2) rb-inotify (0.11.1) ffi (~> 1.0) - redis (5.2.0) + redis (5.3.0) redis-client (>= 0.22.0) redis-client (0.22.2) connection_pool regexp_parser (2.9.2) - rexml (3.3.4) + rexml (3.3.6) strscan rspec (3.13.0) rspec-core (~> 3.13.0) @@ -88,7 +88,7 @@ GEM rspec-mocks (~> 3.13.0) rspec-core (3.13.0) rspec-support (~> 3.13.0) - rspec-expectations (3.13.1) + rspec-expectations (3.13.2) diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.13.0) rspec-mocks (3.13.1) @@ -106,7 +106,7 @@ GEM rubocop-ast (>= 1.31.1, < 2.0) ruby-progressbar (~> 1.7) unicode-display_width (>= 2.4.0, < 3.0) - rubocop-ast (1.32.0) + rubocop-ast (1.32.1) parser (>= 3.3.1.0) rubocop-performance (1.21.1) rubocop (>= 1.48.1, < 2.0) @@ -130,7 +130,8 @@ GEM tzinfo (2.0.6) concurrent-ruby (~> 1.0) unicode-display_width (2.5.0) - vcr (6.2.0) + vcr (6.3.1) + base64 webmock (3.23.1) addressable (>= 2.8.0) crack (>= 0.3.2) @@ -140,7 +141,7 @@ PLATFORMS ruby DEPENDENCIES - activesupport + activesupport (~> 7.1, < 7.2) amazing_print base64 csv diff --git a/README.md b/README.md index 4585ae4..50b7c8f 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,10 @@ This enables SOLECTRUS to accurately calculate the electricity usage and costs f The Docker image supports multiple platforms: `linux/amd64`, `linux/arm64` -To force a data rebuild, you can delete the measurement from the InfluxDB database: +To force a data rebuild, you can send USR1 signal to the container: ```bash -influx delete --bucket ${INFLUX_BUCKET} --start '1970-01-01T00:00:00Z' --stop $(date -u +"%Y-%m-%dT%H:%M:%SZ") --predicate '_measurement="power_splitter"' --org ${INFLUX_ORG} --token ${INFLUX_TOKEN} +docker compose kill --signal USR1 power-splitter ``` ## Development diff --git a/app.rb b/app.rb index ddb9c19..9c7f277 100755 --- a/app.rb +++ b/app.rb @@ -14,7 +14,7 @@ logger = StdoutLogger.new -buildtime = ENV.fetch('BUILDTIME', nil) +buildtime = ENV.fetch('BUILDTIME', nil).presence buildtime = buildtime ? Time.parse(buildtime).localtime : '' logger.info 'Power Splitter for SOLECTRUS, ' \ diff --git a/lib/loop.rb b/lib/loop.rb index 1759e7c..e7c37b4 100644 --- a/lib/loop.rb +++ b/lib/loop.rb @@ -4,28 +4,72 @@ require 'redis_cache' class Loop - def initialize(config:) + def initialize(config:, max_count: nil) @config = config + @max_count = max_count end - attr_reader :config + attr_reader :config, :thread, :restarting, :max_count def start - # config.logger.info "--- Deleting all records from InfluxDB measurement '#{config.influx_measurement}'" - # influx_push.delete_measurement(config.influx_measurement) - # config.logger.info " Ok, deleted sucessfully\n\n" + Signal.trap('USR1') { restart } - process_historical_data - process_current_data + loop do + start_thread + + # There are two reasons we get here: + # 1. The thread finished accidentally, so we should stop by breaking the loop. + # 2. We received a USR1 signal and should restart. + break unless restarting + + # Restart requsted, so delete all data and loop again. + delete_all + @restarting = false + end rescue SystemExit, Interrupt config.logger.warn 'Exiting...' end + def restart + unless thread + config.logger.warn "\n--- No thread to restart..." + return + end + + config.logger.info "\n--- Restarting..." + @restarting = true + + # Terminate the thread + thread.exit + + # Wait for the thread to finish + timeout = Time.current + 5 + sleep(1) while thread.alive? && Time.current < timeout + + if thread.alive? + config.logger.warn 'Thread did not finish in time.' + thread.kill + config.logger.warn 'Thread killed.' + else + config.logger.info 'Thread exited cleanly.' + end + end + private + def start_thread + @thread = + Thread.new do # rubocop:disable ThreadSafety/NewThread + process_historical_data + process_current_data + end + thread.join + end + def process_current_data config.logger.info "\nStarting endless loop for processing current data..." + count = 0 last_time = nil loop do # Ensure that the last minutes of yesterday are processed @@ -37,6 +81,9 @@ def process_current_data last_time = Time.current process_day(Date.current) + count += 1 + break if max_count && count >= max_count + config.logger.info " Sleeping for #{config.influx_interval} seconds...\n\n" sleep(config.influx_interval) end @@ -69,6 +116,12 @@ def process_day(day) influx_push.push(splitted_powers) end + def delete_all + config.logger.info "\n--- Deleting all records from InfluxDB measurement '#{config.influx_measurement}'" + influx_push.delete_measurement(config.influx_measurement) + config.logger.info " Ok, deleted sucessfully\n\n" + end + def influx_push @influx_push ||= InfluxPush.new(config:) end diff --git a/lib/processor.rb b/lib/processor.rb index d0b46d7..665856c 100644 --- a/lib/processor.rb +++ b/lib/processor.rb @@ -13,7 +13,7 @@ def initialize(day_records:, config:) attr_reader :day_records, :config, :wallbox_present, :heatpump_present def call - group_by_hour( + group_by_5min( day_records.reduce([]) { |acc, elem| acc << split_power(elem) }, ).map { |elem| point(elem) } end @@ -40,12 +40,12 @@ def point(record) result end - def group_by_hour(splitted) + def group_by_5min(splitted) splitted - .group_by { |item| item[:time].hour } - .map do |_hour, items| + .group_by { |item| (item[:time].to_i - 1.minute) / 5.minutes } + .map do |_interval, items| { - time: items.first[:time].beginning_of_hour, + time: items.first[:time].beginning_of_minute, house_power_grid: sum(items, :house_power_grid), wallbox_power_grid: wallbox_present ? sum(items, :wallbox_power_grid) : nil, diff --git a/lib/splitter/base.rb b/lib/splitter/base.rb index 7e6d7e0..37f4163 100644 --- a/lib/splitter/base.rb +++ b/lib/splitter/base.rb @@ -18,7 +18,9 @@ def initialize( :heatpump_power def call + #:nocov: raise NotImplementedError + #:nocov: end private diff --git a/spec/cassettes/loop-restart.yml b/spec/cassettes/loop-restart.yml new file mode 100644 index 0000000..e3acd8f --- /dev/null +++ b/spec/cassettes/loop-restart.yml @@ -0,0 +1,129 @@ +--- +http_interactions: +- request: + method: post + uri: http://:8086/api/v2/query?org= + body: + encoding: UTF-8 + string: '{"query":"from(bucket: \"\")\n|\u003e range(start: 1970-01-01T01:00:00+01:00)\n|\u003e + filter(fn: (r) =\u003e r[\"_measurement\"] == \"power_splitter\")\n|\u003e + last()\n|\u003e keep(columns: [\"_time\"])\n|\u003e max(column: \"_time\")\n","dialect":{"header":true,"delimiter":",","annotations":["datatype","group","default"],"commentPrefix":"#","dateTimeFormat":"RFC3339"}}' + headers: + Accept-Encoding: + - gzip;q=1.0,deflate;q=0.6,identity;q=0.3 + Accept: + - "*/*" + User-Agent: + - influxdb-client-ruby/3.1.0 + Authorization: + - Token + Content-Type: + - application/json + response: + status: + code: 200 + message: OK + headers: + Content-Type: + - text/csv; charset=utf-8 + Vary: + - Accept-Encoding + X-Influxdb-Build: + - OSS + X-Influxdb-Version: + - v2.7.10 + Date: + - Wed, 21 Aug 2024 08:29:52 GMT + Transfer-Encoding: + - chunked + body: + encoding: UTF-8 + string: "\r\n" + recorded_at: Wed, 21 Aug 2024 08:29:52 GMT +- request: + method: post + uri: http://:8086/api/v2/query?org= + body: + encoding: UTF-8 + string: '{"query":"from(bucket: \"\")\n|\u003e range(start: 1970-01-01T01:00:00+01:00)\n|\u003e + filter(fn: (r) =\u003e r[\"_measurement\"] == \"SENEC\" and (r[\"_field\"] + == \"grid_power_plus\" or r[\"_field\"] == \"house_power\" or r[\"_field\"] + == \"wallbox_charge_power\") or r[\"_measurement\"] == \"Consumer\" and (r[\"_field\"] + == \"power\"))\n|\u003e first()\n|\u003e keep(columns: [\"_time\"])\n|\u003e + min(column: \"_time\")\n","dialect":{"header":true,"delimiter":",","annotations":["datatype","group","default"],"commentPrefix":"#","dateTimeFormat":"RFC3339"}}' + headers: + Accept-Encoding: + - gzip;q=1.0,deflate;q=0.6,identity;q=0.3 + Accept: + - "*/*" + User-Agent: + - influxdb-client-ruby/3.1.0 + Authorization: + - Token + Content-Type: + - application/json + response: + status: + code: 200 + message: OK + headers: + Content-Type: + - text/csv; charset=utf-8 + Vary: + - Accept-Encoding + X-Influxdb-Build: + - OSS + X-Influxdb-Version: + - v2.7.10 + Date: + - Wed, 21 Aug 2024 08:29:52 GMT + Transfer-Encoding: + - chunked + body: + encoding: UTF-8 + string: "\r\n" + recorded_at: Wed, 21 Aug 2024 08:29:52 GMT +- request: + method: post + uri: http://:8086/api/v2/query?org= + body: + encoding: UTF-8 + string: '{"query":"from(bucket: \"\")\n|\u003e range(start: 2024-08-21T00:00:00+02:00, + stop: 2024-08-21T10:00:00+02:00)\n|\u003e filter(fn: (r) =\u003e r[\"_measurement\"] + == \"SENEC\" and (r[\"_field\"] == \"grid_power_plus\" or r[\"_field\"] == + \"house_power\" or r[\"_field\"] == \"wallbox_charge_power\") or r[\"_measurement\"] + == \"Consumer\" and (r[\"_field\"] == \"power\"))\n|\u003e aggregateWindow(every: + 1m, fn: mean)\n|\u003e fill(usePrevious: true)\n","dialect":{"header":true,"delimiter":",","annotations":["datatype","group","default"],"commentPrefix":"#","dateTimeFormat":"RFC3339"}}' + headers: + Accept-Encoding: + - gzip;q=1.0,deflate;q=0.6,identity;q=0.3 + Accept: + - "*/*" + User-Agent: + - influxdb-client-ruby/3.1.0 + Authorization: + - Token + Content-Type: + - application/json + response: + status: + code: 200 + message: OK + headers: + Content-Type: + - text/csv; charset=utf-8 + Vary: + - Accept-Encoding + X-Influxdb-Build: + - OSS + X-Influxdb-Version: + - v2.7.10 + Date: + - Wed, 21 Aug 2024 08:29:52 GMT + Transfer-Encoding: + - chunked + body: + encoding: UTF-8 + string: "\r\n" + recorded_at: Wed, 21 Aug 2024 08:29:52 GMT +recorded_with: VCR 6.3.1 diff --git a/spec/cassettes/loop-start.yml b/spec/cassettes/loop-start.yml new file mode 100644 index 0000000..e3acd8f --- /dev/null +++ b/spec/cassettes/loop-start.yml @@ -0,0 +1,129 @@ +--- +http_interactions: +- request: + method: post + uri: http://:8086/api/v2/query?org= + body: + encoding: UTF-8 + string: '{"query":"from(bucket: \"\")\n|\u003e range(start: 1970-01-01T01:00:00+01:00)\n|\u003e + filter(fn: (r) =\u003e r[\"_measurement\"] == \"power_splitter\")\n|\u003e + last()\n|\u003e keep(columns: [\"_time\"])\n|\u003e max(column: \"_time\")\n","dialect":{"header":true,"delimiter":",","annotations":["datatype","group","default"],"commentPrefix":"#","dateTimeFormat":"RFC3339"}}' + headers: + Accept-Encoding: + - gzip;q=1.0,deflate;q=0.6,identity;q=0.3 + Accept: + - "*/*" + User-Agent: + - influxdb-client-ruby/3.1.0 + Authorization: + - Token + Content-Type: + - application/json + response: + status: + code: 200 + message: OK + headers: + Content-Type: + - text/csv; charset=utf-8 + Vary: + - Accept-Encoding + X-Influxdb-Build: + - OSS + X-Influxdb-Version: + - v2.7.10 + Date: + - Wed, 21 Aug 2024 08:29:52 GMT + Transfer-Encoding: + - chunked + body: + encoding: UTF-8 + string: "\r\n" + recorded_at: Wed, 21 Aug 2024 08:29:52 GMT +- request: + method: post + uri: http://:8086/api/v2/query?org= + body: + encoding: UTF-8 + string: '{"query":"from(bucket: \"\")\n|\u003e range(start: 1970-01-01T01:00:00+01:00)\n|\u003e + filter(fn: (r) =\u003e r[\"_measurement\"] == \"SENEC\" and (r[\"_field\"] + == \"grid_power_plus\" or r[\"_field\"] == \"house_power\" or r[\"_field\"] + == \"wallbox_charge_power\") or r[\"_measurement\"] == \"Consumer\" and (r[\"_field\"] + == \"power\"))\n|\u003e first()\n|\u003e keep(columns: [\"_time\"])\n|\u003e + min(column: \"_time\")\n","dialect":{"header":true,"delimiter":",","annotations":["datatype","group","default"],"commentPrefix":"#","dateTimeFormat":"RFC3339"}}' + headers: + Accept-Encoding: + - gzip;q=1.0,deflate;q=0.6,identity;q=0.3 + Accept: + - "*/*" + User-Agent: + - influxdb-client-ruby/3.1.0 + Authorization: + - Token + Content-Type: + - application/json + response: + status: + code: 200 + message: OK + headers: + Content-Type: + - text/csv; charset=utf-8 + Vary: + - Accept-Encoding + X-Influxdb-Build: + - OSS + X-Influxdb-Version: + - v2.7.10 + Date: + - Wed, 21 Aug 2024 08:29:52 GMT + Transfer-Encoding: + - chunked + body: + encoding: UTF-8 + string: "\r\n" + recorded_at: Wed, 21 Aug 2024 08:29:52 GMT +- request: + method: post + uri: http://:8086/api/v2/query?org= + body: + encoding: UTF-8 + string: '{"query":"from(bucket: \"\")\n|\u003e range(start: 2024-08-21T00:00:00+02:00, + stop: 2024-08-21T10:00:00+02:00)\n|\u003e filter(fn: (r) =\u003e r[\"_measurement\"] + == \"SENEC\" and (r[\"_field\"] == \"grid_power_plus\" or r[\"_field\"] == + \"house_power\" or r[\"_field\"] == \"wallbox_charge_power\") or r[\"_measurement\"] + == \"Consumer\" and (r[\"_field\"] == \"power\"))\n|\u003e aggregateWindow(every: + 1m, fn: mean)\n|\u003e fill(usePrevious: true)\n","dialect":{"header":true,"delimiter":",","annotations":["datatype","group","default"],"commentPrefix":"#","dateTimeFormat":"RFC3339"}}' + headers: + Accept-Encoding: + - gzip;q=1.0,deflate;q=0.6,identity;q=0.3 + Accept: + - "*/*" + User-Agent: + - influxdb-client-ruby/3.1.0 + Authorization: + - Token + Content-Type: + - application/json + response: + status: + code: 200 + message: OK + headers: + Content-Type: + - text/csv; charset=utf-8 + Vary: + - Accept-Encoding + X-Influxdb-Build: + - OSS + X-Influxdb-Version: + - v2.7.10 + Date: + - Wed, 21 Aug 2024 08:29:52 GMT + Transfer-Encoding: + - chunked + body: + encoding: UTF-8 + string: "\r\n" + recorded_at: Wed, 21 Aug 2024 08:29:52 GMT +recorded_with: VCR 6.3.1 diff --git a/spec/lib/loop_spec.rb b/spec/lib/loop_spec.rb index 4ad6681..a9314e3 100644 --- a/spec/lib/loop_spec.rb +++ b/spec/lib/loop_spec.rb @@ -2,7 +2,7 @@ require 'config' describe Loop do - subject(:loop) { described_class.new(config:) } + subject(:loop) { described_class.new(config:, max_count: 1) } let(:config) { Config.new(ENV.to_h, logger:) } let(:logger) { MemoryLogger.new } @@ -10,4 +10,30 @@ it 'can be initialized' do expect(loop).to be_a(described_class) end + + describe '#start', vcr: 'loop-start' do + subject(:start) { loop.start } + + it 'starts the loop' do + expect { start }.to(change { config.logger.info_messages.size }) + end + end + + describe '#restart', vcr: 'loop-restart' do + subject(:start) { loop.restart } + + context "when there's a thread" do + before { loop.__send__ :start_thread } + + it 'restarts the loop' do + expect { start }.to(change { config.logger.info_messages.size }) + end + end + + context "when there's no thread" do + it 'does nothing' do + expect { start }.not_to(change { config.logger.info_messages.size }) + end + end + end end diff --git a/spec/lib/processor_spec.rb b/spec/lib/processor_spec.rb index 0a9a49a..2600c88 100644 --- a/spec/lib/processor_spec.rb +++ b/spec/lib/processor_spec.rb @@ -32,7 +32,8 @@ expect(lines).to eq( [ - 'power_splitter heatpump_power_grid=10i,house_power_grid=25i,wallbox_power_grid=15i 1641034800', + 'power_splitter heatpump_power_grid=20i,house_power_grid=50i,wallbox_power_grid=30i 1641034800', + 'power_splitter heatpump_power_grid=0i,house_power_grid=0i,wallbox_power_grid=0i 1641036600', ], ) end