diff --git a/config.json b/config.json index 4c0f0ed..b2381f7 100644 --- a/config.json +++ b/config.json @@ -56,6 +56,11 @@ "MetadataErrorTo": "errors", "ErrorTopic": "errors", + "CacheTimeout": "60s", + "CacheInvalidationAllKafkaTopics": ["device-types", "protocols"], + "DeviceKafkaTopic": "devices", + "DeviceGroupKafkaTopic": "device-groups", + "KafkaTopicConfigs": { "camunda_incident": [ { diff --git a/go.mod b/go.mod index 2ecde91..5b77d59 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.21.0 require ( github.com/SENERGY-Platform/converter v0.0.0-20231027124924-d43b98b1b5cc - github.com/SENERGY-Platform/marshaller v0.0.0-20231101072104-348b10942fb3 + github.com/SENERGY-Platform/marshaller v0.0.0-20231114081003-1dab78bdb7d5 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 github.com/coocood/freecache v1.2.4 github.com/golang-jwt/jwt v3.2.2+incompatible @@ -16,10 +16,11 @@ require ( ) require ( - github.com/IBM/sarama v1.41.3 + github.com/IBM/sarama v1.42.1 github.com/SENERGY-Platform/models/go v0.0.0-20230824080159-16585960df38 + github.com/SENERGY-Platform/service-commons v0.0.0-20231114080900-3839f2f822b5 github.com/prometheus/client_golang v1.17.0 - github.com/testcontainers/testcontainers-go v0.23.0 + github.com/testcontainers/testcontainers-go v0.26.0 ) require ( @@ -27,7 +28,7 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Knetic/govaluate v3.0.0+incompatible // indirect github.com/Microsoft/go-winio v0.6.1 // indirect - github.com/Microsoft/hcsshim v0.11.2 // indirect + github.com/Microsoft/hcsshim v0.11.4 // indirect github.com/RyanCarrier/dijkstra v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect @@ -45,10 +46,10 @@ require ( github.com/eapache/go-resiliency v1.4.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect @@ -59,6 +60,7 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.17.2 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/moby/patternmatcher v0.6.0 // indirect @@ -68,20 +70,27 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc5 // indirect github.com/opencontainers/runc v1.1.10 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/shirou/gopsutil/v3 v3.23.10 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - golang.org/x/crypto v0.14.0 // indirect - golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect - golang.org/x/mod v0.13.0 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/tools v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect + golang.org/x/crypto v0.15.0 // indirect + golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect + golang.org/x/mod v0.14.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/tools v0.15.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/go-playground/colors.v1 v1.2.0 // indirect diff --git a/go.sum b/go.sum index ccceb4c..ce7f0df 100644 --- a/go.sum +++ b/go.sum @@ -4,22 +4,24 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= -github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= -github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= +github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/Knetic/govaluate v3.0.0+incompatible h1:7o6+MAPhYTCF0+fdvoz1xDedhRb4f6s9Tn1Tt7/WTEg= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= -github.com/Microsoft/hcsshim v0.11.2 h1:63w4x0s9PjbJGGTQTNgCTExPCkyDXhx2AUVQDPDBAek= -github.com/Microsoft/hcsshim v0.11.2/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w= +github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8= +github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w= github.com/RyanCarrier/dijkstra v1.3.0 h1:lqrLMwKy8PtyDpWqZpcbzr9KkqnLJrDdQ9u3iNyNxm8= github.com/RyanCarrier/dijkstra v1.3.0/go.mod h1:9egjhC7eVsfREX6NrYS+1wHzk9C/9v2Cz26/bqpjjTc= github.com/SENERGY-Platform/converter v0.0.0-20231027124924-d43b98b1b5cc h1:Tn2CzUFEP1gOJRWxHWOOhWo4dDMHBHqjFcvsl56MxPs= github.com/SENERGY-Platform/converter v0.0.0-20231027124924-d43b98b1b5cc/go.mod h1:btnyQ+HafCfU36KacJKMBjUKbGpoQr+/8rGr5X6k1ZI= -github.com/SENERGY-Platform/marshaller v0.0.0-20231101072104-348b10942fb3 h1:AWyBhZLcH/n/PqQeSIKPrCKBzqyn+I6IN1o+48nP9FA= -github.com/SENERGY-Platform/marshaller v0.0.0-20231101072104-348b10942fb3/go.mod h1:sgIgN3nvju3WTtTUTPbQo4vMOe5JdoaVoH/sIZ9uZ6w= +github.com/SENERGY-Platform/marshaller v0.0.0-20231114081003-1dab78bdb7d5 h1:VbTAInS9EQZWfb+P/GUsDY8kkCTnuKWVk0yYQvdYSxE= +github.com/SENERGY-Platform/marshaller v0.0.0-20231114081003-1dab78bdb7d5/go.mod h1:Cg03bmXW0dsRczohmyvQiqr5OcO6aAZ/4FMfQntXQfQ= github.com/SENERGY-Platform/models/go v0.0.0-20230824080159-16585960df38 h1:PDvFwIJBnFyKt1KeepkABNIS57+WhVj4vBw7X5JGZJw= github.com/SENERGY-Platform/models/go v0.0.0-20230824080159-16585960df38/go.mod h1:bCREPNRN4P8oxLgpC3/ZKK4jXSy4MSPXoiomhohE+aw= +github.com/SENERGY-Platform/service-commons v0.0.0-20231114080900-3839f2f822b5 h1:0R+NXsHYlU/RtBjt8HmUtUtYljzSeoO4JNBcXmWficY= +github.com/SENERGY-Platform/service-commons v0.0.0-20231114080900-3839f2f822b5/go.mod h1:1JjqDM3qfVrqU37VXRUlLIFRVWbnaXttsMEKNH8yCRA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous= @@ -62,6 +64,9 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= +github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= @@ -72,6 +77,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= @@ -107,6 +114,9 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed h1:036IscGBfJsFIgJQzlui7nK1Ncm0tp2ktmPj8xO4N/0= +github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= @@ -125,6 +135,8 @@ github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/ github.com/opencontainers/image-spec v1.1.0-rc5/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8= github.com/opencontainers/runc v1.1.10 h1:EaL5WeO9lv9wmS6SASjszOeQdSctvpbu0DdBQBizE40= github.com/opencontainers/runc v1.1.10/go.mod h1:+/R6+KmDlh+hOO8NkjmgkG9Qzvypzk0yXxAPYYR65+M= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -132,6 +144,9 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= @@ -144,6 +159,12 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/segmentio/kafka-go v0.4.44 h1:Vjjksniy0WSTZ7CuVJrz1k04UoZeTc77UV6Yyk6tLY4= github.com/segmentio/kafka-go v0.4.44/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzKsh9hM= +github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -156,8 +177,12 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/testcontainers/testcontainers-go v0.23.0 h1:ERYTSikX01QczBLPZpqsETTBO7lInqEP349phDOVJVs= -github.com/testcontainers/testcontainers-go v0.23.0/go.mod h1:3gzuZfb7T9qfcH2pHpV4RLlWrPjeWNQah6XlYQ32c4I= +github.com/testcontainers/testcontainers-go v0.26.0 h1:uqcYdoOHBy1ca7gKODfBd9uTHVK3a7UL848z09MVZ0c= +github.com/testcontainers/testcontainers-go v0.26.0/go.mod h1:ICriE9bLX5CLxL9OFQ2N+2N+f+803LNJ1utJb1+Inx0= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -167,21 +192,24 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= -golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= +golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= +golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= -golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -192,28 +220,34 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -225,24 +259,25 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= +golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8= +golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/lib/cacheinvalidator.go b/lib/cacheinvalidator.go new file mode 100644 index 0000000..4824156 --- /dev/null +++ b/lib/cacheinvalidator.go @@ -0,0 +1,59 @@ +/* + * Copyright 2023 InfAI (CC SES) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lib + +import ( + "context" + "github.com/SENERGY-Platform/external-task-worker/util" + "github.com/SENERGY-Platform/service-commons/pkg/cache/invalidator" + "github.com/SENERGY-Platform/service-commons/pkg/kafka" + "log" + "runtime/debug" + "time" +) + +func StartCacheInvalidator(ctx context.Context, conf util.Config) (err error) { + if conf.KafkaUrl == "" || conf.KafkaUrl == "-" { + return nil + } + kafkaConf := kafka.Config{ + KafkaUrl: conf.KafkaUrl, + StartOffset: kafka.LastOffset, + Debug: conf.Debug, + PartitionWatchInterval: time.Minute, + OnError: func(err error) { + log.Println("ERROR:", err) + debug.PrintStack() + }, + } + if len(conf.CacheInvalidationAllKafkaTopics) > 0 { + err = invalidator.StartCacheInvalidatorAll(ctx, kafkaConf, conf.CacheInvalidationAllKafkaTopics, nil) + if err != nil { + return err + } + } + + err = invalidator.StartKnownCacheInvalidators(ctx, kafkaConf, invalidator.KnownTopics{ + DeviceTopic: conf.DeviceKafkaTopic, + DeviceGroupTopic: conf.DeviceGroupKafkaTopic, + }, nil) + if err != nil { + return err + } + + return nil +} diff --git a/lib/camunda/cache/cache.go b/lib/camunda/cache/cache.go deleted file mode 100644 index 4e3c25e..0000000 --- a/lib/camunda/cache/cache.go +++ /dev/null @@ -1,132 +0,0 @@ -package cache - -import ( - "encoding/json" - "errors" - "github.com/bradfitz/gomemcache/memcache" - "github.com/coocood/freecache" - "log" - "sync" - "time" -) - -var DefaultL1Expiration = 10 //10sec -var DefaultL1Size = 100 * 1024 * 1024 //100MB -var DefaultL2Expiration int32 = 300 //60sec - -type LayeredCache struct { - l1 *freecache.Cache - l2 *memcache.Client - mux sync.Mutex - Debug bool - config *CacheConfig -} - -type Item struct { - Key string - Value []byte -} - -type CacheConfig struct { - L1Expiration int - L1Size int - L2Expiration int32 - L2MemcacheUrls []string - L2MemcachedTimeout time.Duration - L2MemcachedMaxIdleConns int -} - -var ErrNotFound = errors.New("key not found in cache") - -func New(config *CacheConfig) (result *LayeredCache) { - if config == nil { - config = &CacheConfig{} - } - if config.L1Expiration == 0 { - config.L1Expiration = DefaultL1Expiration - } - if config.L1Size == 0 { - config.L1Size = DefaultL1Size - } - if config.L2Expiration == 0 { - config.L2Expiration = DefaultL2Expiration - } - result = &LayeredCache{config: config, l1: freecache.NewCache(config.L1Size)} - if len(config.L2MemcacheUrls) > 0 { - result.l2 = memcache.New(config.L2MemcacheUrls...) - result.l2.Timeout = config.L2MemcachedTimeout - result.l2.MaxIdleConns = config.L2MemcachedMaxIdleConns - } - return -} - -func (this *LayeredCache) Use(key string, getter func() (interface{}, error), result interface{}) (err error) { - item, err := this.Get(key) - if err == nil { - err = json.Unmarshal(item.Value, result) - return - } - temp, err := getter() - if err != nil { - return err - } - value, err := json.Marshal(temp) - if err != nil { - return err - } - this.Set(key, value) - return json.Unmarshal(value, &result) -} - -func (this *LayeredCache) Invalidate(key string) (err error) { - this.l1.Del([]byte(key)) - if this.l2 != nil { - err = this.l2.Delete(key) - } - return -} - -func (this *LayeredCache) Get(key string) (item Item, err error) { - this.mux.Lock() - defer this.mux.Unlock() - item.Value, err = this.l1.Get([]byte(key)) - if err != nil && err != freecache.ErrNotFound { - log.Println("ERROR: in LayeredCache::l1.Get()", err) - } - if err != nil && this.l2 != nil { - if this.Debug { - log.Println("DEBUG: use l2 cache", key, err) - } - var temp *memcache.Item - temp, err = this.l2.Get(key) - if err == memcache.ErrCacheMiss { - err = ErrNotFound - return - } - if err != nil { - return - } - err := this.l1.Set([]byte(key), temp.Value, this.config.L1Expiration) - if err != nil { - log.Println("ERROR: in LayeredCache::l1.Set()", err) - } - item.Value = temp.Value - } - return -} - -func (this *LayeredCache) Set(key string, value []byte) { - this.mux.Lock() - defer this.mux.Unlock() - err := this.l1.Set([]byte(key), value, this.config.L1Expiration) - if err != nil { - log.Println("ERROR: in LayeredCache::l1.Set()", err) - } - if this.l2 != nil { - err = this.l2.Set(&memcache.Item{Value: value, Expiration: this.config.L2Expiration, Key: key}) - if err != nil { - log.Println("ERROR: in LayeredCache::l2.Set()", err) - } - } - return -} diff --git a/lib/camunda/cache/interface.go b/lib/camunda/cache/interface.go deleted file mode 100644 index 073f5d9..0000000 --- a/lib/camunda/cache/interface.go +++ /dev/null @@ -1,6 +0,0 @@ -package cache - -type Cache interface { - Use(key string, getter func() (interface{}, error), result interface{}) (err error) - Invalidate(key string) (err error) -} diff --git a/lib/camunda/cache/none.go b/lib/camunda/cache/none.go deleted file mode 100644 index ff5df68..0000000 --- a/lib/camunda/cache/none.go +++ /dev/null @@ -1,23 +0,0 @@ -package cache - -import "encoding/json" - -var None = &NoneCache{} - -type NoneCache struct{} - -func (this *NoneCache) Use(key string, getter func() (interface{}, error), result interface{}) (err error) { - temp, err := getter() - if err != nil { - return err - } - value, err := json.Marshal(temp) - if err != nil { - return err - } - return json.Unmarshal(value, &result) -} - -func (this *NoneCache) Invalidate(key string) (err error) { - return nil -} diff --git a/lib/camunda/camunda.go b/lib/camunda/camunda.go index 035122d..54d4ba9 100644 --- a/lib/camunda/camunda.go +++ b/lib/camunda/camunda.go @@ -21,12 +21,13 @@ import ( "encoding/json" "errors" "fmt" - "github.com/SENERGY-Platform/external-task-worker/lib/camunda/cache" "github.com/SENERGY-Platform/external-task-worker/lib/camunda/interfaces" "github.com/SENERGY-Platform/external-task-worker/lib/camunda/shards" "github.com/SENERGY-Platform/external-task-worker/lib/com" "github.com/SENERGY-Platform/external-task-worker/lib/messages" "github.com/SENERGY-Platform/external-task-worker/util" + "github.com/SENERGY-Platform/service-commons/pkg/cache" + "github.com/SENERGY-Platform/service-commons/pkg/signal" "io" "io/ioutil" "log" @@ -53,9 +54,15 @@ func NewCamundaWithShards(config util.Config, producer com.ProducerInterface, me } func NewCamunda(config util.Config, producer com.ProducerInterface, metrics interfaces.Metrics) (result *Camunda, err error) { - s, err := shards.New(config.ShardsDb, cache.New(&cache.CacheConfig{ - L1Expiration: 60, - })) + c, err := cache.New(cache.Config{ + CacheInvalidationSignalHooks: map[cache.Signal]cache.ToKey{ + signal.Known.CacheInvalidationAll: nil, + }, + }) + if err != nil { + return result, err + } + s, err := shards.New(config.ShardsDb, c) if err != nil { return result, err } @@ -99,7 +106,7 @@ func (this *Camunda) getShardTasks(shard string) (tasks []messages.CamundaExtern } defer resp.Body.Close() if resp.StatusCode != 200 { - temp, err := ioutil.ReadAll(resp.Body) + temp, err := io.ReadAll(resp.Body) err = errors.New(fmt.Sprintln(endpoint, resp.Status, resp.StatusCode, string(temp), err)) return tasks, err } diff --git a/lib/camunda/camunda_test.go b/lib/camunda/camunda_test.go index 709316a..3bd6812 100644 --- a/lib/camunda/camunda_test.go +++ b/lib/camunda/camunda_test.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "errors" - "github.com/SENERGY-Platform/external-task-worker/lib/camunda/cache" "github.com/SENERGY-Platform/external-task-worker/lib/camunda/shards" "github.com/SENERGY-Platform/external-task-worker/lib/messages" "github.com/SENERGY-Platform/external-task-worker/lib/prometheus" @@ -42,7 +41,7 @@ func TestGetTask(t *testing.T) { return } - s, err := shards.New(pgConn, cache.None) + s, err := shards.New(pgConn, nil) if err != nil { t.Error(err) return diff --git a/lib/camunda/shards/shards.go b/lib/camunda/shards/shards.go index b9c5d2b..6b8e580 100644 --- a/lib/camunda/shards/shards.go +++ b/lib/camunda/shards/shards.go @@ -4,12 +4,12 @@ import ( "context" "database/sql" "errors" - "github.com/SENERGY-Platform/external-task-worker/lib/camunda/cache" + "github.com/SENERGY-Platform/service-commons/pkg/cache" _ "github.com/lib/pq" "time" ) -func New(pgConnStr string, cache cache.Cache) (*Shards, error) { +func New(pgConnStr string, cache *cache.Cache) (*Shards, error) { db, err := initDbConnection(pgConnStr) if err != nil { return nil, err @@ -35,7 +35,7 @@ func initDbConnection(conStr string) (db *sql.DB, err error) { type Shards struct { db *sql.DB - cache cache.Cache + cache *cache.Cache } var ErrorNotFound = errors.New("no shard assigned to user") @@ -43,10 +43,9 @@ var ErrorNotFound = errors.New("no shard assigned to user") const CachePrefix = "user-shard." func (this *Shards) GetShardForUser(userId string) (shardUrl string, err error) { - err = this.cache.Use(CachePrefix+userId, func() (interface{}, error) { + return cache.Use(this.cache, CachePrefix+userId, func() (string, error) { return getShardForUser(this.db, userId) - }, &shardUrl) - return + }, time.Minute) } func getShardForUser(tx Tx, userId string) (shardUrl string, err error) { @@ -83,7 +82,10 @@ func (this *Shards) SetShardForUser(userId string, shardAddress string) (err err if err != nil { return } - return this.cache.Invalidate(CachePrefix + userId) + if this.cache != nil { + return this.cache.Remove(CachePrefix + userId) + } + return nil } func (this *Shards) EnsureShardForUser(userId string) (shardUrl string, err error) { @@ -93,9 +95,9 @@ func (this *Shards) EnsureShardForUser(userId string) (shardUrl string, err erro return shardUrl, err } - err = this.cache.Use(CachePrefix+userId, func() (interface{}, error) { + shardUrl, err = cache.Use(this.cache, CachePrefix+userId, func() (string, error) { return getShardForUser(tx, userId) - }, &shardUrl) + }, time.Minute) //more work is only necessary if no shard is assigned to the user if err != ErrorNotFound { @@ -121,7 +123,7 @@ func (this *Shards) EnsureShard(shardUrl string) (err error) { return } -//selects shard with the fewest users +// selects shard with the fewest users func selectShard(tx Tx) (shardUrl string, err error) { min := MaxInt counts, err := getShardUserCount(tx) @@ -170,10 +172,9 @@ func addShardForUser(tx Tx, userId string, shardAddress string) (err error) { } func (this *Shards) GetShards() (result []string, err error) { - err = this.cache.Use("shards", func() (interface{}, error) { + return cache.Use(this.cache, "shards", func() ([]string, error) { return getShards(this.db) - }, &result) - return + }, time.Minute) } func getShards(tx Tx) (result []string, err error) { diff --git a/lib/camunda/shards/shards_test.go b/lib/camunda/shards/shards_test.go index c03b741..ce00b1c 100644 --- a/lib/camunda/shards/shards_test.go +++ b/lib/camunda/shards/shards_test.go @@ -2,7 +2,6 @@ package shards import ( "context" - "github.com/SENERGY-Platform/external-task-worker/lib/camunda/cache" "github.com/SENERGY-Platform/external-task-worker/lib/test/docker" "reflect" "sync" @@ -22,7 +21,7 @@ func TestSelectShard(t *testing.T) { return } - s, err := New(pgConn, cache.None) + s, err := New(pgConn, nil) if err != nil { t.Error(err) return @@ -44,7 +43,7 @@ func TestSelectShardWithCache(t *testing.T) { return } - s, err := New(pgConn, cache.New(nil)) + s, err := New(pgConn, nil) if err != nil { t.Error(err) return diff --git a/lib/devicerepository/cache.go b/lib/devicerepository/cache.go deleted file mode 100644 index 34f7cb4..0000000 --- a/lib/devicerepository/cache.go +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2019 InfAI (CC SES) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package devicerepository - -import ( - "encoding/json" - "errors" - "github.com/coocood/freecache" - "log" -) - -var DefaultExpiration = 60 // 60sec -var L1Size = 40 * 1024 * 1024 //40MB -var Debug = false - -type Cache struct { - l1 *freecache.Cache -} - -type Item struct { - Key string - Value []byte -} - -var ErrNotFound = errors.New("key not found in cache") - -func NewCache() *Cache { - return &Cache{l1: freecache.NewCache(L1Size)} -} - -func (this *Cache) Get(key string) (item Item, err error) { - item.Value, err = this.l1.Get([]byte(key)) - if err != nil && err != freecache.ErrNotFound { - log.Println("ERROR: in Cache::l1.Get()", err) - } - return -} - -func (this *Cache) Set(key string, value []byte) { - this.SetWithExpiration(key, value, DefaultExpiration) -} - -func (this *Cache) SetWithExpiration(key string, value []byte, expirationInSec int) { - err := this.l1.Set([]byte(key), value, expirationInSec) - if err != nil { - log.Println("ERROR: in Cache::l1.Set()", err) - } - return -} - -func (this *Cache) Use(key string, getter func() (interface{}, error), result interface{}) (err error) { - item, err := this.Get(key) - if err == nil { - err = json.Unmarshal(item.Value, result) - return - } - temp, err := getter() - if err != nil { - return err - } - value, err := json.Marshal(temp) - if err != nil { - return err - } - this.Set(key, value) - return json.Unmarshal(value, &result) -} - -func (this *Cache) UseWithExpiration(key string, getter func() (result interface{}, expirationInSec int, err error), result interface{}) (err error) { - item, err := this.Get(key) - if err == nil { - err = json.Unmarshal(item.Value, result) - return - } - temp, expiration, err := getter() - if err != nil { - return err - } - value, err := json.Marshal(temp) - if err != nil { - return err - } - this.SetWithExpiration(key, value, expiration) - return json.Unmarshal(value, &result) -} diff --git a/lib/devicerepository/devicerepository.go b/lib/devicerepository/devicerepository.go index f89eb8f..accda53 100644 --- a/lib/devicerepository/devicerepository.go +++ b/lib/devicerepository/devicerepository.go @@ -17,37 +17,62 @@ package devicerepository import ( - "encoding/json" "errors" "github.com/SENERGY-Platform/external-task-worker/lib/devicerepository/model" "github.com/SENERGY-Platform/external-task-worker/util" + "github.com/SENERGY-Platform/service-commons/pkg/cache" + "github.com/SENERGY-Platform/service-commons/pkg/signal" "log" "net/url" + "time" ) type Iot struct { - cache *Cache - repoUrl string - keycloak Keycloak + cache *cache.Cache + repoUrl string + keycloak Keycloak + cacheTimeout time.Duration } -func NewIot(config util.Config) *Iot { - return &Iot{repoUrl: config.DeviceRepoUrl, cache: NewCache(), keycloak: Keycloak{config: config}} +func NewIot(config util.Config) (*Iot, error) { + c, err := cache.New(cache.Config{ + CacheInvalidationSignalHooks: map[cache.Signal]cache.ToKey{ + signal.Known.CacheInvalidationAll: nil, + signal.Known.DeviceCacheInvalidation: func(signalValue string) (cacheKey string) { + return "device." + signalValue + }, + signal.Known.DeviceTypeCacheInvalidation: func(signalValue string) (cacheKey string) { + return "device-type." + signalValue + }, + signal.Known.DeviceGroupInvalidation: func(signalValue string) (cacheKey string) { + return "device-group." + signalValue + }, + signal.Known.ProtocolInvalidation: func(signalValue string) (cacheKey string) { + return "protocol." + signalValue + }, + }, + }) + if err != nil { + return nil, err + } + cacheTimeout := time.Minute + if config.CacheTimeout != "" && config.CacheTimeout != "-" { + cacheTimeout, err = time.ParseDuration(config.CacheTimeout) + } + return &Iot{repoUrl: config.DeviceRepoUrl, cache: c, keycloak: Keycloak{config: config}, cacheTimeout: cacheTimeout}, err } func (this *Iot) GetToken(user string) (result Impersonate, err error) { - err = this.cache.UseWithExpiration("user_token."+user, func() (interface{}, int, error) { + return cache.UseWithExpInGet(this.cache, "user_token."+user, func() (Impersonate, time.Duration, error) { token, expirationInSec, err := this.keycloak.GetUserToken(user) - return token, int(expirationInSec) - 10, err - }, &result) - return + return token, time.Duration(expirationInSec-10) * time.Second, err + }, this.cacheTimeout) } func (this *Iot) GetDevice(token Impersonate, id string) (result model.Device, err error) { - err = this.cache.Use("device."+id, func() (interface{}, error) { + return cache.Use(this.cache, "device."+id, func() (model.Device, error) { return this.getDevice(token, id) - }, &result) - return + }, this.cacheTimeout) } func (this *Iot) getDevice(token Impersonate, id string) (result model.Device, err error) { @@ -56,10 +81,9 @@ func (this *Iot) getDevice(token Impersonate, id string) (result model.Device, e } func (this *Iot) GetProtocol(token Impersonate, id string) (result model.Protocol, err error) { - err = this.cache.Use("protocol."+id, func() (interface{}, error) { + return cache.Use(this.cache, "protocol."+id, func() (model.Protocol, error) { return this.getProtocol(token, id) - }, &result) - return + }, this.cacheTimeout) } func (this *Iot) getProtocol(token Impersonate, id string) (result model.Protocol, err error) { @@ -92,20 +116,22 @@ func (this *Iot) getServiceFromCache(id string) (service model.Service, err erro if err != nil { return service, err } - err = json.Unmarshal(item.Value, &service) - return + var ok bool + service, ok = item.(model.Service) + if !ok { + err = errors.New("unable to interpret cache value as model.Service") + } + return service, err } func (this *Iot) saveServiceToCache(service model.Service) { - buffer, _ := json.Marshal(service) - this.cache.Set("service."+service.Id, buffer) + _ = this.cache.Set("service."+service.Id, service, this.cacheTimeout) } func (this *Iot) GetDeviceType(token Impersonate, id string) (result model.DeviceType, err error) { - err = this.cache.Use("deviceType."+id, func() (interface{}, error) { + return cache.Use(this.cache, "device-type."+id, func() (model.DeviceType, error) { return this.getDeviceType(token, id) - }, &result) - return + }, this.cacheTimeout) } func (this *Iot) getDeviceType(token Impersonate, id string) (result model.DeviceType, err error) { @@ -114,10 +140,9 @@ func (this *Iot) getDeviceType(token Impersonate, id string) (result model.Devic } func (this *Iot) GetDeviceGroup(token Impersonate, id string) (result model.DeviceGroup, err error) { - err = this.cache.Use("deviceGroup."+id, func() (interface{}, error) { + return cache.Use(this.cache, "device-group."+id, func() (model.DeviceGroup, error) { return this.getDeviceGroup(token, id) - }, &result) - return + }, this.cacheTimeout) } func (this *Iot) getDeviceGroup(token Impersonate, id string) (result model.DeviceGroup, err error) { diff --git a/lib/devicerepository/factory.go b/lib/devicerepository/factory.go index f5c03e2..54afe91 100644 --- a/lib/devicerepository/factory.go +++ b/lib/devicerepository/factory.go @@ -22,6 +22,6 @@ type FactoryType struct{} var Factory FactoryType -func (FactoryType) Get(config util.Config) RepoInterface { +func (FactoryType) Get(config util.Config) (RepoInterface, error) { return NewIot(config) } diff --git a/lib/devicerepository/interface.go b/lib/devicerepository/interface.go index 17363c8..780d0f7 100644 --- a/lib/devicerepository/interface.go +++ b/lib/devicerepository/interface.go @@ -22,7 +22,7 @@ import ( ) type FactoryInterface interface { - Get(configType util.Config) RepoInterface + Get(configType util.Config) (RepoInterface, error) } type RepoInterface interface { diff --git a/lib/test/main_test.go b/lib/test/main_test.go index c00dcef..3d409c3 100644 --- a/lib/test/main_test.go +++ b/lib/test/main_test.go @@ -17,9 +17,15 @@ package test import ( + "context" + "github.com/SENERGY-Platform/external-task-worker/lib/test/docker" "io" + "log" + "os" "runtime/debug" "strings" + "sync" + "testing" ) var example = struct { @@ -62,8 +68,8 @@ var color = struct { Hex: "urn:infai:ses:characteristic:0fc343ce-4627-4c88-b1e0-d3ed29754af8", } -//helper to find log creation -//example: log.SetOutput(StackWriter{Out: log.Writer(), Compare: "ERROR: json: Unmarshal(nil *model.AspectNode)"}) +// helper to find log creation +// example: log.SetOutput(StackWriter{Out: log.Writer(), Compare: "ERROR: json: Unmarshal(nil *model.AspectNode)"}) type StackWriter struct { Out io.Writer Compare string @@ -75,3 +81,22 @@ func (s StackWriter) Write(p []byte) (n int, err error) { } return s.Out.Write(p) } + +// TODO: remove as soon as https://github.com/testcontainers/testcontainers-go/issues/1671 is fixed +func TestMain(m *testing.M) { + //this main is needed to keep ryuk from closing + //which is needed because ryuk is currently (github.com/testcontainers/testcontainers-go v0.26.0) not able to restart + //https://github.com/testcontainers/testcontainers-go/issues/1671 + var code int + defer os.Exit(code) + wg := &sync.WaitGroup{} + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + //keep ryuk alive by keeping a container for the context of all tests + _, _, err := docker.Memcached(ctx, wg) + if err != nil { + log.Fatal(err) + } + code = m.Run() +} diff --git a/lib/test/mock/iot.go b/lib/test/mock/iot.go index 58b617c..45d84b2 100644 --- a/lib/test/mock/iot.go +++ b/lib/test/mock/iot.go @@ -50,7 +50,7 @@ func (this *RepoMock) GetDeviceGroup(token devicerepository.Impersonate, id stri return dg, nil } -func (this *RepoMock) Get(configType util.Config) devicerepository.RepoInterface { +func (this *RepoMock) Get(configType util.Config) (devicerepository.RepoInterface, error) { if this.ResetOnGetRepoInterface { this.devices = map[string]model.Device{} this.services = map[string]model.Service{} @@ -58,7 +58,7 @@ func (this *RepoMock) Get(configType util.Config) devicerepository.RepoInterface this.deviceTypes = map[string]model.DeviceType{} this.deviceGroups = map[string]model.DeviceGroup{} } - return this + return this, nil } func (this *RepoMock) GetDevice(token devicerepository.Impersonate, id string) (model.Device, error) { diff --git a/lib/test/worker_error_retry_test.go b/lib/test/worker_error_retry_test.go index e36e3aa..def9fb3 100644 --- a/lib/test/worker_error_retry_test.go +++ b/lib/test/worker_error_retry_test.go @@ -23,7 +23,6 @@ import ( "errors" "github.com/SENERGY-Platform/external-task-worker/lib" "github.com/SENERGY-Platform/external-task-worker/lib/camunda" - "github.com/SENERGY-Platform/external-task-worker/lib/camunda/cache" "github.com/SENERGY-Platform/external-task-worker/lib/camunda/shards" "github.com/SENERGY-Platform/external-task-worker/lib/devicerepository/model" "github.com/SENERGY-Platform/external-task-worker/lib/messages" @@ -82,7 +81,7 @@ func TestWorkerErrorRetries(t *testing.T) { return } - s, err := shards.New(pgConn, cache.None) + s, err := shards.New(pgConn, nil) if err != nil { t.Error(err) return diff --git a/lib/worker.go b/lib/worker.go index f204649..932b7c4 100644 --- a/lib/worker.go +++ b/lib/worker.go @@ -74,17 +74,18 @@ type DeviceGroupsHandler interface { func Worker(ctx context.Context, config util.Config, comFactory com.FactoryInterface, repoFactory devicerepository.FactoryInterface, camundaFactory interfaces.FactoryInterface, marshallerFactory marshaller.FactoryInterface, timescaleFactory timescale.FactoryInterface) { log.Println("start camunda worker") - w := New(ctx, config, comFactory, repoFactory, camundaFactory, marshallerFactory, timescaleFactory) + w, err := New(ctx, config, comFactory, repoFactory, camundaFactory, marshallerFactory, timescaleFactory) + if err != nil { + log.Fatal("FATAL-ERROR:", err) + } StartHealthCheckEndpoint(ctx, config, w) w.Loop(ctx) } -func New(ctx context.Context, config util.Config, comFactory com.FactoryInterface, repoFactory devicerepository.FactoryInterface, camundaFactory interfaces.FactoryInterface, marshallerFactory marshaller.FactoryInterface, timescaleFactory timescale.FactoryInterface) (w *CmdWorker) { - var err error - +func New(ctx context.Context, config util.Config, comFactory com.FactoryInterface, repoFactory devicerepository.FactoryInterface, camundaFactory interfaces.FactoryInterface, marshallerFactory marshaller.FactoryInterface, timescaleFactory timescale.FactoryInterface) (w *CmdWorker, err error) { metrics, err := prometheus.Start(ctx, config) if err != nil { - log.Fatal("ERROR: prometheus.Start", err) + return w, fmt.Errorf("prometheus.Start %w", err) } w = &CmdWorker{ @@ -97,7 +98,7 @@ func New(ctx context.Context, config util.Config, comFactory com.FactoryInterfac w.timescale, err = timescaleFactory(ctx, config) if err != nil { - log.Fatal("ERROR: comFactory.NewProducer", err) + return w, fmt.Errorf("comFactory.NewProducer %w", err) } if config.CompletionStrategy != util.OPTIMISTIC { @@ -107,22 +108,25 @@ func New(ctx context.Context, config util.Config, comFactory com.FactoryInterfac err = comFactory.NewConsumer(ctx, config, w.HandleTaskResponse, w.ErrorMessageHandler) } if err != nil { - log.Fatal("ERROR: comFactory.NewConsumer", err) + return w, fmt.Errorf("comFactory.NewConsumer %w", err) } } w.producer, err = comFactory.NewProducer(ctx, config) if err != nil { - log.Fatal("ERROR: comFactory.NewProducer", err) + return w, fmt.Errorf("comFactory.NewProducer %w", err) + } + w.repository, err = repoFactory.Get(config) + if err != nil { + return w, fmt.Errorf("repoFactory.Get %w", err) } - w.repository = repoFactory.Get(config) w.camunda, err = camundaFactory.Get(config, w.producer, metrics) if err != nil { - log.Fatal("ERROR: comFactory.NewProducer", err) + return w, fmt.Errorf("camundaFactory.Get %w", err) } filterEvents := config.TimescaleWrapperUrl == "" || config.TimescaleWrapperUrl == "-" w.deviceGroupsHandler = devicegroups.New(config.GroupScheduler, w.camunda, w.repository, w.CreateProtocolMessage, config.CamundaFetchLockDuration, config.SubResultExpirationInSeconds, config.SubResultDatabaseUrls, config.MemcachedTimeout, config.MemcachedMaxIdleConns, filterEvents) - return + return w, nil } func (w *CmdWorker) Loop(ctx context.Context) { diff --git a/main.go b/main.go index e167bc8..18ee635 100644 --- a/main.go +++ b/main.go @@ -57,6 +57,12 @@ func main() { cancel() }() + err = lib.StartCacheInvalidator(ctx, config) + if err != nil { + log.Println("WARNING: unable to start cache invalidator", err) + } + lib.Worker(ctx, config, comswitch.Factory, devicerepository.Factory, camunda.Factory, marshaller.Factory, timescale.Factory) + log.Println("worker stopped") } diff --git a/util/config.go b/util/config.go index 16e04df..46229f0 100644 --- a/util/config.go +++ b/util/config.go @@ -92,6 +92,11 @@ type Config struct { MetadataErrorTo string ErrorTopic string + CacheTimeout string + CacheInvalidationAllKafkaTopics []string + DeviceKafkaTopic string + DeviceGroupKafkaTopic string + KafkaTopicConfigs map[string][]kafka.ConfigEntry }