From 08dd562bdea20d52572d8651ab2b7bc4e8247834 Mon Sep 17 00:00:00 2001 From: Toby Schneider Date: Mon, 27 Nov 2017 18:35:56 -0500 Subject: [PATCH 1/7] 1. changed interprocess_portal to interprocess in configuration; 2. Added start of logger --- src/apps/middleware/CMakeLists.txt | 2 +- src/apps/middleware/gobyd/config.proto | 2 +- src/apps/middleware/gobyd/gobyd.cpp | 8 +- src/apps/middleware/logger/CMakeLists.txt | 4 + src/apps/middleware/logger/config.proto | 16 + src/apps/middleware/logger/logger.cpp | 96 ++++++ src/common/application_base3.h | 11 +- src/middleware/CMakeLists.txt | 1 + src/middleware/log.cpp | 7 + src/middleware/log.h | 286 ++++++++++++++++++ src/middleware/multi-thread-application.h | 2 +- src/middleware/serialize_parse.h | 1 - src/middleware/single-thread-application.h | 2 +- src/moos/protobuf/moos_gateway_config.proto | 2 +- src/test/middleware/CMakeLists.txt | 3 + src/test/middleware/log/CMakeLists.txt | 7 + src/test/middleware/log/test.cpp | 149 +++++++++ src/test/middleware/log/test.proto | 18 ++ .../middleware/multi_thread_app1/test.proto | 2 +- .../middleware/multi_thread_app2/test.proto | 2 +- .../middleware/single_thread_app1/test.proto | 2 +- 21 files changed, 604 insertions(+), 19 deletions(-) create mode 100644 src/apps/middleware/logger/CMakeLists.txt create mode 100644 src/apps/middleware/logger/config.proto create mode 100644 src/apps/middleware/logger/logger.cpp create mode 100644 src/middleware/log.cpp create mode 100644 src/middleware/log.h create mode 100644 src/test/middleware/log/CMakeLists.txt create mode 100644 src/test/middleware/log/test.cpp create mode 100644 src/test/middleware/log/test.proto diff --git a/src/apps/middleware/CMakeLists.txt b/src/apps/middleware/CMakeLists.txt index d5328ca2d..c858e69e9 100644 --- a/src/apps/middleware/CMakeLists.txt +++ b/src/apps/middleware/CMakeLists.txt @@ -1,2 +1,2 @@ add_subdirectory(gobyd) - +add_subdirectory(logger) diff --git a/src/apps/middleware/gobyd/config.proto b/src/apps/middleware/gobyd/config.proto index 93d55756a..75d5b2bbb 100644 --- a/src/apps/middleware/gobyd/config.proto +++ b/src/apps/middleware/gobyd/config.proto @@ -8,5 +8,5 @@ message GobyDaemonConfig { optional App3Config app = 1; optional int32 router_threads = 2 [default = 10]; - optional InterProcessPortalConfig interprocess_portal = 3; + optional InterProcessPortalConfig interprocess = 3; } diff --git a/src/apps/middleware/gobyd/gobyd.cpp b/src/apps/middleware/gobyd/gobyd.cpp index 83e30de9a..85a78c251 100644 --- a/src/apps/middleware/gobyd/gobyd.cpp +++ b/src/apps/middleware/gobyd/gobyd.cpp @@ -55,14 +55,14 @@ int main(int argc, char* argv[]) goby::Daemon::Daemon() : router_context_(new zmq::context_t(app_cfg().router_threads())), manager_context_(new zmq::context_t(1)), - router_(*router_context_, app_cfg().interprocess_portal()), - manager_(*manager_context_, app_cfg().interprocess_portal(), router_), + router_(*router_context_, app_cfg().interprocess()), + manager_(*manager_context_, app_cfg().interprocess(), router_), router_thread_(new std::thread([&] { router_.run(); })), manager_thread_(new std::thread([&] { manager_.run(); })) { - if(!app_cfg().interprocess_portal().has_platform()) + if(!app_cfg().interprocess().has_platform()) { - glog.is(WARN) && glog << "Using default platform name of " << app_cfg().interprocess_portal().platform() << std::endl; + glog.is(WARN) && glog << "Using default platform name of " << app_cfg().interprocess().platform() << std::endl; } } diff --git a/src/apps/middleware/logger/CMakeLists.txt b/src/apps/middleware/logger/CMakeLists.txt new file mode 100644 index 000000000..51a657667 --- /dev/null +++ b/src/apps/middleware/logger/CMakeLists.txt @@ -0,0 +1,4 @@ +protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS config.proto) + +add_executable(goby_logger logger.cpp ${PROTO_SRCS} ${PROTO_HDRS}) +target_link_libraries(goby_logger goby_middleware) diff --git a/src/apps/middleware/logger/config.proto b/src/apps/middleware/logger/config.proto new file mode 100644 index 000000000..cba46fd8f --- /dev/null +++ b/src/apps/middleware/logger/config.proto @@ -0,0 +1,16 @@ +import "goby/common/protobuf/app3.proto"; +import "goby/middleware/protobuf/interprocess_config.proto"; + +package goby.protobuf; + +message LoggerConfig +{ + optional goby.protobuf.App3Config app = 1; + optional goby.protobuf.InterProcessPortalConfig interprocess = 2; + + required string log_dir = 3; + + optional string type_regex = 4 [default = ".*"]; + optional string group_regex = 5 [default = ".*"]; +} + diff --git a/src/apps/middleware/logger/logger.cpp b/src/apps/middleware/logger/logger.cpp new file mode 100644 index 000000000..54c33f2bf --- /dev/null +++ b/src/apps/middleware/logger/logger.cpp @@ -0,0 +1,96 @@ +#include + +#include "goby/middleware/single-thread-application.h" +#include "goby/middleware/log.h" +#include "goby/common/time.h" + +#include "config.pb.h" + +using goby::glog; +using namespace goby::common::logger; + +void signal_handler(int sig); + +namespace goby +{ + + class Logger : public goby::SingleThreadApplication + { + public: + Logger() : + goby::SingleThreadApplication(1*boost::units::si::hertz), + log_(std::string(cfg().log_dir() + "/" + cfg().interprocess().platform() + "_" + goby::common::goby_file_timestamp() + ".goby").c_str(), std::ofstream::binary) + { + if(!log_.is_open()) + glog.is(DIE) && glog << "Failed to open log in directory: " << cfg().log_dir() << std::endl; + + namespace sp = std::placeholders; + interprocess().subscribe_regex(std::bind(&Logger::log, this, sp::_1, sp::_2, sp::_3, sp::_4), + {goby::MarshallingScheme::ALL_SCHEMES}, + cfg().type_regex(), + cfg().group_regex()); + } + + void log(const std::vector& data, int scheme, const std::string& type, const Group& group); + void loop() override + { + if(do_quit) quit(); + } + + + static std::atomic do_quit; + + private: + std::ofstream log_; + }; +} + +std::atomic goby::Logger::do_quit {false}; + + +int main(int argc, char* argv[]) +{ + // block signals from all but this main thread + sigset_t new_mask; + sigfillset(&new_mask); + sigset_t old_mask; + pthread_sigmask(SIG_BLOCK, &new_mask, &old_mask); + + std::thread t(std::bind(goby::run, argc, argv)); + + // unblock signals + sigset_t empty_mask; + sigemptyset(&empty_mask); + pthread_sigmask(SIG_SETMASK, &empty_mask, 0); + + struct sigaction action; + action.sa_handler = &signal_handler; + + // register the usual quitting signals + sigaction(SIGINT, &action, 0); + sigaction(SIGTERM, &action, 0); + sigaction(SIGQUIT, &action, 0); + + // wait for the app to quit + t.join(); + + return 0; +} + +void signal_handler(int sig) +{ + goby::Logger::do_quit = true; +} + + +void goby::Logger::log(const std::vector& data, int scheme, const std::string& type, const Group& group) +{ + glog.is(DEBUG1) && glog << "Received " << data.size() << " bytes to log to [scheme, type, group] = [" << scheme << ", " << type << ", " << group << "]" << std::endl; + + LogEntry entry(data, scheme, type, group); + + // TODO: add logger hook + // plugin.log(entry); + + entry.serialize(&log_); +} diff --git a/src/common/application_base3.h b/src/common/application_base3.h index da653f2f4..b86e6602b 100644 --- a/src/common/application_base3.h +++ b/src/common/application_base3.h @@ -40,12 +40,11 @@ namespace goby { - /// \brief Run a Goby application derived from MinimalApplicationBase. - /// blocks caller until MinimalApplicationBase::__run() returns - /// \param argc same as int main(int argc, char* argv) - /// \param argv same as int main(int argc, char* argv) - /// \return same as int main(int argc, char* argv) - + /// \brief Run a Goby application + /// blocks caller until ```__run()``` returns + /// \param argc same as ```int main(int argc, char* argv)``` + /// \param argv same as ```int main(int argc, char* argv)``` + /// \return same as ```int main(int argc, char* argv)``` template int run(int argc, char* argv[]); diff --git a/src/middleware/CMakeLists.txt b/src/middleware/CMakeLists.txt index bd3ae9ab5..cc143f9e1 100644 --- a/src/middleware/CMakeLists.txt +++ b/src/middleware/CMakeLists.txt @@ -7,6 +7,7 @@ set(SRC transport-interprocess-zeromq.cpp transport-intervehicle.cpp transport-interfaces.cpp + log.cpp ${PROTO_SRCS} ${PROTO_HDRS} ) diff --git a/src/middleware/log.cpp b/src/middleware/log.cpp new file mode 100644 index 000000000..812316999 --- /dev/null +++ b/src/middleware/log.cpp @@ -0,0 +1,7 @@ +#include "log.h" + +boost::bimap::type> goby::LogEntry::groups_; +boost::bimap::type> goby::LogEntry::types_; + +goby::uint::type goby::LogEntry::group_index_(1); +goby::uint::type goby::LogEntry::type_index_(1); diff --git a/src/middleware/log.h b/src/middleware/log.h new file mode 100644 index 000000000..a267f1058 --- /dev/null +++ b/src/middleware/log.h @@ -0,0 +1,286 @@ +#ifndef Log20171127H +#define Log20171127H + +#include +#include +#include + +#include "goby/common/logger.h" +#include "goby/common/exception.h" +#include "group.h" + +namespace goby +{ + template struct uint {}; + template<> struct uint<1> { typedef std::uint8_t type; }; + template<> struct uint<2> { typedef std::uint16_t type; }; + template<> struct uint<4> { typedef std::uint32_t type; }; + template<> struct uint<8> { typedef std::uint64_t type; }; + + class LogEntry + { + private: + + static constexpr int size_bytes_{4}; + static constexpr int scheme_bytes_{2}; + static constexpr int group_bytes_{2}; + static constexpr int type_bytes_{2}; + static constexpr int crc_bytes_ {4}; + static constexpr uint::type scheme_group_index_ { 0xFFFF }; + static constexpr uint::type scheme_type_index_ { 0xFFFE }; + + public: + LogEntry(const std::vector& data, int scheme, const std::string& type, const Group& group) + : data_(data), + scheme_(scheme), + type_(type), + group_(group) + { } + + LogEntry() { } + + template + void parse(Stream* s) + { + using namespace goby::common::logger; + using goby::glog; + + auto old_except_mask = s->exceptions(); + s->exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit); + + uint::type scheme(0); + do + { + + char next_char = s->peek(); + if(next_char != magic_[0]) + { + glog.is(WARN) && glog << "Next character [" << next_char << "] is not the start of the expected magic word [" << magic_ << "]. Seeking until next magic word." << std::endl; + } + + std::string magic_read(magic_.size(), '\0'); + int discarded = 0; + + while(magic_read != magic_) + { + while(s->peek() != magic_[0]) + { + ++discarded; + s->get(); + } + s->read(&magic_read[0], magic_.size()); + } + + if(discarded != 0) + glog.is(WARN) && glog << "Found next magic word after skipping " << discarded << " bytes" << std::endl; + + boost::crc_32_type crc; + crc.process_bytes(&magic_read[0], magic_.size()); + + auto size(read_one::type>(s, &crc)); + auto fixed_field_size = scheme_bytes_ + group_bytes_ + type_bytes_ + crc_bytes_; + + if(size < fixed_field_size) + throw(goby::Exception("Invalid size read: " + std::to_string(size) + " as message must be at least " + std::to_string(fixed_field_size) + " bytes long")); + + auto data_size = size - fixed_field_size; + glog.is(DEBUG2) && glog << "Reading entry of " << size << " bytes (" << data_size << " bytes data)" << std::endl; + + scheme = read_one::type>(s, &crc); + auto group_index(read_one::type>(s, &crc)); + auto type_index(read_one::type>(s, &crc)); + + data_.resize(data_size); + s->read(reinterpret_cast(&data_[0]), data_size); + crc.process_bytes(&data_[0], data_.size()); + + auto calculated_crc = crc.checksum(); + + auto given_crc(read_one::type>(s)); + + if(calculated_crc != given_crc) + { + data_.clear(); + throw(goby::Exception("Invalid CRC on packet: given: " + std::to_string(given_crc) + ", calculated: " + std::to_string(calculated_crc))); + } + + if(scheme == scheme_group_index_) + { + std::string group(data_.begin(), data_.end()); + glog.is(DEBUG1) && glog << "Mapping group [" << group << "] to index: " << group_index << std::endl; + groups_.left.insert({ group, group_index }); + data_.clear(); + } + else if(scheme == scheme_type_index_) + { + std::string type(data_.begin(), data_.end()); + glog.is(DEBUG1) && glog << "Mapping type [" << type << "] to index: " << type_index << std::endl; + types_.left.insert({ type, type_index }); + data_.clear(); + } + else + { + scheme_ = scheme; + + std::string type = "_unknown" + std::to_string(type_index) + "_"; + auto type_it = types_.right.find(type_index); + if(type_it != types_.right.end()) + type = type_it->second; + else + glog.is(WARN) && glog << "No type entry in file for type index: " << type_index << std::endl; + type_ = type; + + std::string group = "_unknown" + std::to_string(group_index) + "_"; + auto group_it = groups_.right.find(group_index); + if(group_it != groups_.right.end()) + group = group_it->second; + else + glog.is(WARN) && glog << "No group entry in file for group index: " << group_index << std::endl; + group_ = goby::DynamicGroup(group); + } + } + while(scheme == scheme_group_index_ || scheme == scheme_type_index_); + + s->exceptions(old_except_mask); + } + + // [GBY3][size: 4][scheme: 2][group: 2][type: 2][data][crc32: 4] + // if scheme == 0xFFFF what follows is not data, but the string value for the group index + // if scheme == 0xFFFE what follows is not data, but the string value for the group index + template + void serialize(Stream* s) + { + auto old_except_mask = s->exceptions(); + s->exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit); + + std::string group(group_); + + // insert indexing entry if the first time we saw this group + if(groups_.left.count(group) == 0) + { + auto index = group_index_++; + groups_.left.insert({ group, index }); + _serialize(s, scheme_group_index_, + index, 0, group.data(), group.size()); + } + if(types_.left.count(type_) == 0) + { + auto index = type_index_++; + types_.left.insert({ type_, index }); + _serialize(s, scheme_type_index_, + 0, index, type_.data(), type_.size()); + } + + auto group_index = groups_.left.at(group); + auto type_index = types_.left.at(type_); + + // insert actual data + _serialize(s, scheme_, group_index, type_index, + reinterpret_cast(&data_[0]), data_.size()); + + s->exceptions(old_except_mask); + } + + const std::vector& data() { return data_; } + int scheme() { return scheme_; } + const std::string& type() { return type_; } + const Group& group() { return group_; } + + static void reset() + { + groups_.clear(); + types_.clear(); + group_index_ = 1; + type_index_ = 1; + } + + + private: + + template + void _serialize(Stream* s, + uint::type scheme, + uint::type group_index, + uint::type type_index, + const char* data, + int data_size) + { + std::string group_str(netint_to_string(group_index)); + std::string type_str(netint_to_string(type_index)); + std::string scheme_str(netint_to_string(scheme)); + + uint::type size = scheme_bytes_ + group_bytes_ + type_bytes_ + data_size + crc_bytes_; + std::string size_str(netint_to_string(size)); + + auto header = magic_ + size_str + scheme_str + group_str + type_str; + s->write(header.data(), header.size()); + s->write(data, data_size); + + boost::crc_32_type crc; + crc.process_bytes(header.data(), header.size()); + crc.process_bytes(data, data_size); + + uint::type cs(crc.checksum()); + std::string cs_str(netint_to_string(cs)); + s->write(cs_str.data(), cs_str.size()); + } + + template + Unsigned read_one(Stream* s, boost::crc_32_type* crc = 0) + { + auto size = std::numeric_limits::digits/8; + std::string str(size, '\0'); + s->read(&str[0], size); + if(crc) crc->process_bytes(&str[0], size); + return string_to_netint(str); + } + + + template + std::string netint_to_string(Unsigned u) + { + auto size = std::numeric_limits::digits/8; + std::string s(size, '\0'); + for(int i = 0; i < size; ++i) + s[i] = (u >> (size-(i+1))*8) & 0xff; + return s; + } + + template + Unsigned string_to_netint(std::string s) + { + Unsigned u(0); + auto size = std::numeric_limits::digits/8; + if(s.size() > size) + s.erase(0, s.size() - size); + if(s.size() < size) + s.insert(0, size - s.size(), '\0'); + + + for(int i = 0; i < size; ++i) + u |= (s[i] & 0xff) << ((size-(i+1))*8); + return u; + } + + private: + std::vector data_; + int scheme_; + std::string type_; + Group group_; + + + + static boost::bimap::type> groups_; + static uint::type group_index_; + + static boost::bimap::type> types_; + static uint::type type_index_; + + const std::string magic_ { "GBY3" }; + }; + + +} + + +#endif diff --git a/src/middleware/multi-thread-application.h b/src/middleware/multi-thread-application.h index aa4d6ede3..4f89ade02 100644 --- a/src/middleware/multi-thread-application.h +++ b/src/middleware/multi-thread-application.h @@ -89,7 +89,7 @@ namespace goby MultiThreadApplication(boost::units::quantity loop_freq) : MainThreadBase(goby::common::ApplicationBase3::app_cfg(), &portal_, loop_freq), - portal_(goby::common::ApplicationBase3::app_cfg().interprocess_portal()) + portal_(goby::common::ApplicationBase3::app_cfg().interprocess()) { goby::glog.set_lock_action(goby::common::logger_lock::lock); } diff --git a/src/middleware/serialize_parse.h b/src/middleware/serialize_parse.h index 40f25bd94..cccce0bb0 100644 --- a/src/middleware/serialize_parse.h +++ b/src/middleware/serialize_parse.h @@ -13,7 +13,6 @@ namespace goby { - // // MarshallingScheme // diff --git a/src/middleware/single-thread-application.h b/src/middleware/single-thread-application.h index f8ca68ade..4ae2acb04 100644 --- a/src/middleware/single-thread-application.h +++ b/src/middleware/single-thread-application.h @@ -47,7 +47,7 @@ namespace goby SingleThreadApplication(boost::units::quantity loop_freq) : MainThread(goby::common::ApplicationBase3::app_cfg(), loop_freq), - portal_(goby::common::ApplicationBase3::app_cfg().interprocess_portal()) + portal_(goby::common::ApplicationBase3::app_cfg().interprocess()) { MainThread::set_transporter(&portal_); } diff --git a/src/moos/protobuf/moos_gateway_config.proto b/src/moos/protobuf/moos_gateway_config.proto index df12ee87e..aed356444 100644 --- a/src/moos/protobuf/moos_gateway_config.proto +++ b/src/moos/protobuf/moos_gateway_config.proto @@ -4,7 +4,7 @@ import "goby/middleware/protobuf/interprocess_config.proto"; message GobyMOOSGatewayConfig { optional goby.protobuf.App3Config app = 1; - optional goby.protobuf.InterProcessPortalConfig interprocess_portal = 2; + optional goby.protobuf.InterProcessPortalConfig interprocess = 2; optional string moos_server = 3 [default = "localhost"]; optional int32 moos_port = 4 [default = 9000]; diff --git a/src/test/middleware/CMakeLists.txt b/src/test/middleware/CMakeLists.txt index bb2ae8dbd..61a8a8af0 100644 --- a/src/test/middleware/CMakeLists.txt +++ b/src/test/middleware/CMakeLists.txt @@ -9,3 +9,6 @@ add_subdirectory(middleware_regex) add_subdirectory(single_thread_app1) add_subdirectory(multi_thread_app1) add_subdirectory(multi_thread_app2) + +add_subdirectory(log) + diff --git a/src/test/middleware/log/CMakeLists.txt b/src/test/middleware/log/CMakeLists.txt new file mode 100644 index 000000000..fd7d6625c --- /dev/null +++ b/src/test/middleware/log/CMakeLists.txt @@ -0,0 +1,7 @@ +protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS test.proto) + +add_executable(goby_test_middleware_log test.cpp ${PROTO_SRCS} ${PROTO_HDRS}) +target_link_libraries(goby_test_middleware_log goby_middleware) + +add_test(goby_test_middleware_log ${goby_BIN_DIR}/goby_test_middleware_log) + diff --git a/src/test/middleware/log/test.cpp b/src/test/middleware/log/test.cpp new file mode 100644 index 000000000..91db80232 --- /dev/null +++ b/src/test/middleware/log/test.cpp @@ -0,0 +1,149 @@ +#include "goby/common/logger.h" +#include "goby/middleware/log.h" +#include "goby/middleware/serialize_parse.h" + +#include "test.pb.h" + +const goby::Group tempgroup("groups::temp"); +const goby::Group ctdgroup("groups::ctd"); +int nctd = 5; + +void read_log(int test) +{ + goby::LogEntry::reset(); + + std::ifstream in_log_file("/tmp/goby3_test_log.goby"); + try + { + goby::LogEntry entry; + entry.parse(&in_log_file); + assert(entry.scheme() == goby::MarshallingScheme::PROTOBUF); + assert(entry.group() == tempgroup); + assert(entry.type() == TempSample::descriptor()->full_name()); + assert(test != 3 && test != 4); + } + catch(goby::Exception& e) + { + std::cerr << e.what() << std::endl; + assert(test == 3 || test == 4); + } + + if(test == 4) + { + goby::LogEntry entry; + entry.parse(&in_log_file); + assert(entry.scheme() == goby::MarshallingScheme::PROTOBUF); + assert(entry.group() == "_unknown1_"); + assert(entry.type() == TempSample::descriptor()->full_name()); + } + + + for(int i = 0; i < nctd; ++i) + { + goby::LogEntry entry; + entry.parse(&in_log_file); + assert(entry.scheme() == goby::MarshallingScheme::DCCL); + assert(entry.group() == ctdgroup); + assert(entry.type() == CTDSample::descriptor()->full_name()); + } + + // eof + try + { + goby::LogEntry entry; + entry.parse(&in_log_file); + bool expected_eof = false; + assert(expected_eof); + } + catch (std::ifstream::failure e) { + assert(in_log_file.eof()); + } +} + +void write_log(int test) +{ + goby::LogEntry::reset(); + std::ofstream out_log_file("/tmp/goby3_test_log.goby"); + + switch(test) + { + default: break; + case 1: + // insert some chars at the beginning of the file + out_log_file << "foo"; + break; + } + + TempSample t; + { + t.set_temperature(500); + std::vector data(t.ByteSize()); + t.SerializeToArray(&data[0], data.size()); + goby::LogEntry entry(data, goby::MarshallingScheme::PROTOBUF, TempSample::descriptor()->full_name(), tempgroup); + entry.serialize(&out_log_file); + } + + switch(test) + { + default: break; + case 2: + // insert some chars at the middle of the file + out_log_file << "bar"; + break; + case 3: + { + + // corrupt the previous entry + auto pos = out_log_file.tellp(); + out_log_file.seekp(pos-std::ios::streamoff(6)); + out_log_file.put(0); + out_log_file.seekp(pos); + break; + } + + case 4: + { + // corrupt the start (index) + auto pos = out_log_file.tellp(); + out_log_file.seekp(7, out_log_file.beg); + out_log_file.put(0); + out_log_file.seekp(pos); + break; + } + + + } + + + std::vector ctds; + for(int i = 0; i < nctd; ++i) + { + CTDSample ctd; + ctd.set_temperature(i*100); + std::vector data(ctd.ByteSize()); + ctd.SerializeToArray(&data[0], data.size()); + goby::LogEntry entry(data, goby::MarshallingScheme::DCCL, CTDSample::descriptor()->full_name(), ctdgroup); + entry.serialize(&out_log_file); + ctds.push_back(ctd); + } +} + +int main(int argc, char* argv[]) +{ + goby::glog.add_stream(goby::common::logger::DEBUG3, &std::cerr); + goby::glog.set_name(argv[0]); + + int ntests = 5; + + for(int test = 0; test < ntests; ++test) + { + std::cout << "Running test " << test << std::endl; + write_log(test); + read_log(test); + } + + + + std::cout << "all tests passed" << std::endl; +} + diff --git a/src/test/middleware/log/test.proto b/src/test/middleware/log/test.proto new file mode 100644 index 000000000..9c6523885 --- /dev/null +++ b/src/test/middleware/log/test.proto @@ -0,0 +1,18 @@ +import "dccl/protobuf/option_extensions.proto"; + +message CTDSample +{ + option (dccl.msg).id = 127; + option (dccl.msg).max_bytes = 32; + option (dccl.msg).codec_version = 3; + + optional double salinity = 1 [(dccl.field) = { min: 0 max: 40 precision: 1}]; + optional double temperature = 2 [(dccl.field) = { min: 3 max: 30 precision: 1}]; + optional double depth = 3 [(dccl.field) = { min: 0 max: 5000 }]; +} + + +message TempSample +{ + optional double temperature = 1; +} diff --git a/src/test/middleware/multi_thread_app1/test.proto b/src/test/middleware/multi_thread_app1/test.proto index 3b90e4650..bb5133c24 100644 --- a/src/test/middleware/multi_thread_app1/test.proto +++ b/src/test/middleware/multi_thread_app1/test.proto @@ -5,7 +5,7 @@ import "goby/middleware/protobuf/interprocess_config.proto"; message TestConfig { optional goby.protobuf.App3Config app = 1; - optional goby.protobuf.InterProcessPortalConfig interprocess_portal = 2; + optional goby.protobuf.InterProcessPortalConfig interprocess = 2; } diff --git a/src/test/middleware/multi_thread_app2/test.proto b/src/test/middleware/multi_thread_app2/test.proto index 3664d78cb..ff2cbf4e4 100644 --- a/src/test/middleware/multi_thread_app2/test.proto +++ b/src/test/middleware/multi_thread_app2/test.proto @@ -5,7 +5,7 @@ import "goby/middleware/protobuf/interprocess_config.proto"; message TestConfig { optional goby.protobuf.App3Config app = 1; - optional goby.protobuf.InterProcessPortalConfig interprocess_portal = 2; + optional goby.protobuf.InterProcessPortalConfig interprocess = 2; optional int32 num_messages = 3 [default = 10000]; optional int32 num_rx_threads = 4 [default = 10]; diff --git a/src/test/middleware/single_thread_app1/test.proto b/src/test/middleware/single_thread_app1/test.proto index 24fc8095f..a3ace0fea 100644 --- a/src/test/middleware/single_thread_app1/test.proto +++ b/src/test/middleware/single_thread_app1/test.proto @@ -5,7 +5,7 @@ import "goby/middleware/protobuf/interprocess_config.proto"; message TestConfig { optional goby.protobuf.App3Config app = 1; - optional goby.protobuf.InterProcessPortalConfig interprocess_portal = 2; + optional goby.protobuf.InterProcessPortalConfig interprocess = 2; } From 67f7725b3782d5dc039521002927fa1f64d156d9 Mon Sep 17 00:00:00 2001 From: Toby Schneider Date: Tue, 28 Nov 2017 12:02:29 -0500 Subject: [PATCH 2/7] Update to group integer accesS --- src/middleware/group.h | 17 +++++++++-------- src/middleware/log.h | 8 ++++---- src/middleware/transport-intervehicle.h | 2 +- src/test/middleware/log/test.cpp | 1 + 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/middleware/group.h b/src/middleware/group.h index ed12b0056..21faa5e21 100644 --- a/src/middleware/group.h +++ b/src/middleware/group.h @@ -10,11 +10,11 @@ namespace goby public: constexpr Group(const char* c = "") : c_(c) { } constexpr Group(int i) : i_(i) { } - - constexpr operator int() const { return i_; } + + constexpr int numeric() const { return i_; } constexpr const char* c_str() const { return c_; } - + operator std::string() const { if(c_ != nullptr) return std::string(c_); @@ -35,7 +35,7 @@ namespace goby template void check_validity() { - static_assert((int(group) != 0) || (group.c_str()[0] != '\0'), "goby::Group must have non-zero length string or non-zero integer value."); + static_assert((group.numeric() != 0) || (group.c_str()[0] != '\0'), "goby::Group must have non-zero length string or non-zero integer value."); } inline void check_validity_runtime(const Group& group) @@ -51,11 +51,12 @@ namespace goby { public: DynamicGroup(const std::string& s) : s_(new std::string(s)) - { - Group::set_c_str(s_->c_str()); - } - DynamicGroup(int i) : Group(i) { } + { + Group::set_c_str(s_->c_str()); + } + DynamicGroup(int i) : Group(i) { } + private: std::unique_ptr s_; }; diff --git a/src/middleware/log.h b/src/middleware/log.h index a267f1058..8be745c1f 100644 --- a/src/middleware/log.h +++ b/src/middleware/log.h @@ -34,17 +34,17 @@ namespace goby : data_(data), scheme_(scheme), type_(type), - group_(group) + group_(std::string(group)) { } - LogEntry() { } + LogEntry() : group_("") { } template void parse(Stream* s) { using namespace goby::common::logger; using goby::glog; - + auto old_except_mask = s->exceptions(); s->exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit); @@ -266,7 +266,7 @@ namespace goby std::vector data_; int scheme_; std::string type_; - Group group_; + DynamicGroup group_; diff --git a/src/middleware/transport-intervehicle.h b/src/middleware/transport-intervehicle.h index ce1218581..fb83bb002 100644 --- a/src/middleware/transport-intervehicle.h +++ b/src/middleware/transport-intervehicle.h @@ -181,7 +181,7 @@ namespace goby goby::protobuf::DCCLSubscription dccl_subscription; dccl_subscription.set_dccl_id(dccl_id); - dccl_subscription.set_group(group); + dccl_subscription.set_group(group.numeric()); dccl_subscription.set_protobuf_name(SerializerParserHelper::type_name()); _insert_file_desc_with_dependencies(Data::descriptor()->file(), &dccl_subscription); Base::inner_.template publish(dccl_subscription); diff --git a/src/test/middleware/log/test.cpp b/src/test/middleware/log/test.cpp index 91db80232..4f9101bea 100644 --- a/src/test/middleware/log/test.cpp +++ b/src/test/middleware/log/test.cpp @@ -33,6 +33,7 @@ void read_log(int test) goby::LogEntry entry; entry.parse(&in_log_file); assert(entry.scheme() == goby::MarshallingScheme::PROTOBUF); + // corrupted index assert(entry.group() == "_unknown1_"); assert(entry.type() == TempSample::descriptor()->full_name()); } From fc83832da911d5e2408e5d4b7d9f8d867b02a32b Mon Sep 17 00:00:00 2001 From: Toby Schneider Date: Tue, 28 Nov 2017 13:45:17 -0500 Subject: [PATCH 3/7] Write-protect file before closing logger --- src/apps/middleware/logger/logger.cpp | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/apps/middleware/logger/logger.cpp b/src/apps/middleware/logger/logger.cpp index 54c33f2bf..90462b81e 100644 --- a/src/apps/middleware/logger/logger.cpp +++ b/src/apps/middleware/logger/logger.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include "goby/middleware/single-thread-application.h" #include "goby/middleware/log.h" @@ -19,7 +21,8 @@ namespace goby public: Logger() : goby::SingleThreadApplication(1*boost::units::si::hertz), - log_(std::string(cfg().log_dir() + "/" + cfg().interprocess().platform() + "_" + goby::common::goby_file_timestamp() + ".goby").c_str(), std::ofstream::binary) + log_file_path_(std::string(cfg().log_dir() + "/" + cfg().interprocess().platform() + "_" + goby::common::goby_file_timestamp() + ".goby")), + log_(log_file_path_.c_str(), std::ofstream::binary) { if(!log_.is_open()) glog.is(DIE) && glog << "Failed to open log in directory: " << cfg().log_dir() << std::endl; @@ -31,6 +34,13 @@ namespace goby cfg().group_regex()); } + ~Logger() + { + log_.close(); + // set read only + chmod(log_file_path_.c_str(), S_IRUSR | S_IRGRP); + } + void log(const std::vector& data, int scheme, const std::string& type, const Group& group); void loop() override { @@ -41,6 +51,7 @@ namespace goby static std::atomic do_quit; private: + std::string log_file_path_; std::ofstream log_; }; } From b9eee98e85b6949cfc48896d296937ddf9d47833 Mon Sep 17 00:00:00 2001 From: Toby Schneider Date: Tue, 28 Nov 2017 19:08:07 -0500 Subject: [PATCH 4/7] Added standalone multithread application --- src/middleware/multi-thread-application.h | 143 ++++++++++++++++------ 1 file changed, 108 insertions(+), 35 deletions(-) diff --git a/src/middleware/multi-thread-application.h b/src/middleware/multi-thread-application.h index 4f89ade02..479dd28a2 100644 --- a/src/middleware/multi-thread-application.h +++ b/src/middleware/multi-thread-application.h @@ -65,35 +65,33 @@ namespace goby std::unique_ptr> forwarder_; }; - - template - class MultiThreadApplication + + template + class MultiThreadApplicationBase : public goby::common::ApplicationBase3, - public goby::Thread> + public goby::Thread { private: - goby::InterThreadTransporter interthread_; - goby::InterProcessPortal portal_; - std::map>> alive_; std::map>> threads_; public: - using MainThreadBase = goby::Thread>; - - MultiThreadApplication(double loop_freq_hertz = 0) : - MultiThreadApplication(loop_freq_hertz*boost::units::si::hertz) - { } + using MainThreadBase = goby::Thread; - MultiThreadApplication(boost::units::quantity loop_freq) - : MainThreadBase(goby::common::ApplicationBase3::app_cfg(), &portal_, loop_freq), - portal_(goby::common::ApplicationBase3::app_cfg().interprocess()) - { - goby::glog.set_lock_action(goby::common::logger_lock::lock); - } - virtual ~MultiThreadApplication() { } + MultiThreadApplicationBase(double loop_freq_hertz = 0) : + MultiThreadApplicationBase(loop_freq_hertz*boost::units::si::hertz) + { } + + + MultiThreadApplicationBase(boost::units::quantity loop_freq, Transporter* portal) + : MainThreadBase(goby::common::ApplicationBase3::app_cfg(), portal, loop_freq) + { + + goby::glog.set_lock_action(goby::common::logger_lock::lock); + } + virtual ~MultiThreadApplicationBase() { } template @@ -103,26 +101,101 @@ namespace goby template void join_thread(int index = -1); - protected: - goby::InterProcessPortal& interprocess() { return portal_; } - goby::InterThreadTransporter& interthread() { return portal_.inner(); } - - + protected: void quit() override; private: - void run() override + + void run() override { MainThreadBase::run_once(); } template void _launch_thread(int index, Lambda thread_lambda); }; + + + template + class MultiThreadApplication + : public MultiThreadApplicationBase> + { + + private: + goby::InterThreadTransporter interthread_; + goby::InterProcessPortal portal_; + using Base = MultiThreadApplicationBase>; + + public: + MultiThreadApplication(double loop_freq_hertz = 0) : + MultiThreadApplication(loop_freq_hertz*boost::units::si::hertz) + { } + + MultiThreadApplication(boost::units::quantity loop_freq) + : Base(loop_freq, &portal_), + portal_(interthread_, goby::common::ApplicationBase3::app_cfg().interprocess()) + { } + virtual ~MultiThreadApplication() { } + + protected: + goby::InterThreadTransporter& interthread() { return portal_.inner(); } + goby::InterProcessPortal& interprocess() { return portal_; } + + }; + + template + class MultiThreadStandaloneApplication + : public MultiThreadApplicationBase + { + + private: + using Base = MultiThreadApplicationBase; + goby::InterThreadTransporter interthread_; + + public: + MultiThreadStandaloneApplication(double loop_freq_hertz = 0) : + MultiThreadStandaloneApplication(loop_freq_hertz*boost::units::si::hertz) + { } + + MultiThreadStandaloneApplication(boost::units::quantity loop_freq) + : Base(loop_freq, &interthread_) + { + + } + virtual ~MultiThreadStandaloneApplication() { } + + protected: + goby::InterThreadTransporter& interthread() { return interthread_; } + + }; + + template + class MultiThreadTest + : public MultiThreadStandaloneApplication + { + + private: + using Base = MultiThreadStandaloneApplication; + + public: + MultiThreadTest(boost::units::quantity loop_freq = 0*boost::units::si::hertz) + : Base(loop_freq) + { + + } + virtual ~MultiThreadTest() { } + + protected: + // so we can add on threads that publish to the outside for testing + goby::InterThreadTransporter& interprocess() { return Base::interthread(); } + + }; + + } -template +template template -void goby::MultiThreadApplication::launch_thread() +void goby::MultiThreadApplicationBase::launch_thread() { const Config& cfg = goby::common::ApplicationBase3::app_cfg(); int index = -1; @@ -135,9 +208,9 @@ void goby::MultiThreadApplication::launch_thread() _launch_thread(index, thread_lambda); } -template +template template -void goby::MultiThreadApplication::launch_thread(int index) +void goby::MultiThreadApplicationBase::launch_thread(int index) { const Config& cfg = goby::common::ApplicationBase3::app_cfg(); std::type_index type_i = std::type_index(typeid(ThreadType)); @@ -149,9 +222,9 @@ void goby::MultiThreadApplication::launch_thread(int index) _launch_thread(index, thread_lambda); } -template +template template -void goby::MultiThreadApplication::_launch_thread(int index, Lambda thread_lambda) + void goby::MultiThreadApplicationBase::_launch_thread(int index, Lambda thread_lambda) { std::type_index type_i = std::type_index(typeid(ThreadType)); @@ -166,9 +239,9 @@ void goby::MultiThreadApplication::_launch_thread(int index, Lambda thre } -template +template template -void goby::MultiThreadApplication::join_thread(int index /* = -1 */) +void goby::MultiThreadApplicationBase::join_thread(int index /* = -1 */) { auto type_i = std::type_index(typeid(ThreadType)); @@ -182,8 +255,8 @@ void goby::MultiThreadApplication::join_thread(int index /* = -1 */) } -template -void goby::MultiThreadApplication::quit() +template +void goby::MultiThreadApplicationBase::quit() { for(auto& amap : alive_) { From fde59f02c3f07e08589ddc8ad9be1516719301bf Mon Sep 17 00:00:00 2001 From: Toby Schneider Date: Tue, 28 Nov 2017 22:40:18 -0500 Subject: [PATCH 5/7] Made configuration validation of required fields optional (useful for standalone tests not to do this). Attempted to improve shutdown of applications, though not entirely successful yet, when not running loop() --- src/common/application_base3.h | 8 +- src/common/configuration_reader.cpp | 5 +- src/common/configuration_reader.h | 3 +- src/middleware/multi-thread-application.h | 130 ++++++++++++---------- src/middleware/thread.h | 20 +++- src/middleware/transport-interfaces.cpp | 41 ++++--- src/middleware/transport-interfaces.h | 7 +- 7 files changed, 121 insertions(+), 93 deletions(-) diff --git a/src/common/application_base3.h b/src/common/application_base3.h index b86e6602b..08fe80a2c 100644 --- a/src/common/application_base3.h +++ b/src/common/application_base3.h @@ -54,7 +54,7 @@ namespace goby class ApplicationBase3 { public: - ApplicationBase3(); + ApplicationBase3(bool check_required_configuration = true); virtual ~ApplicationBase3() { } protected: @@ -65,7 +65,7 @@ namespace goby virtual void quit() { alive_ = false; } /// \brief Accesses configuration object passed at launch - const Config& app_cfg() { return cfg_; } + Config& app_cfg() { return cfg_; } const std::chrono::system_clock::time_point& start_time() const { return start_time_; } @@ -107,7 +107,7 @@ template char** goby::common::ApplicationBase3::argv_ = 0; template -goby::common::ApplicationBase3::ApplicationBase3() +goby::common::ApplicationBase3::ApplicationBase3(bool check_required_configuration) : alive_(true) { using goby::glog; @@ -121,7 +121,7 @@ goby::common::ApplicationBase3::ApplicationBase3() try { std::string application_name; - common::ConfigReader::read_cfg(argc_, argv_, &cfg_, &application_name, &od, &var_map); + common::ConfigReader::read_cfg(argc_, argv_, &cfg_, &application_name, &od, &var_map, check_required_configuration); __set_application_name(application_name); // incorporate some parts of the AppBaseConfig that are common diff --git a/src/common/configuration_reader.cpp b/src/common/configuration_reader.cpp index 791dc6719..f1d89495c 100644 --- a/src/common/configuration_reader.cpp +++ b/src/common/configuration_reader.cpp @@ -41,7 +41,8 @@ void goby::common::ConfigReader::read_cfg(int argc, google::protobuf::Message* message, std::string* application_name, boost::program_options::options_description* od_all, - boost::program_options::variables_map* var_map) + boost::program_options::variables_map* var_map, + bool check_required_configuration /*= true*/) { if(!argv) return; @@ -170,7 +171,7 @@ void goby::common::ConfigReader::read_cfg(int argc, } // now the proto message must have all required fields - if(!message->IsInitialized()) + if(check_required_configuration && !message->IsInitialized()) { std::vector< std::string > errors; message->FindInitializationErrors(&errors); diff --git a/src/common/configuration_reader.h b/src/common/configuration_reader.h index 3683c2890..ba935e2e6 100644 --- a/src/common/configuration_reader.h +++ b/src/common/configuration_reader.h @@ -75,7 +75,8 @@ namespace goby google::protobuf::Message* message, std::string* application_name, boost::program_options::options_description* od_all, - boost::program_options::variables_map* var_map); + boost::program_options::variables_map* var_map, + bool check_required_configuration = true); static void get_protobuf_program_options( boost::program_options::options_description& po_desc, diff --git a/src/middleware/multi-thread-application.h b/src/middleware/multi-thread-application.h index 479dd28a2..e1d030140 100644 --- a/src/middleware/multi-thread-application.h +++ b/src/middleware/multi-thread-application.h @@ -73,8 +73,16 @@ namespace goby { private: - std::map>> alive_; - std::map>> threads_; + struct ThreadManagement + { + std::atomic alive{true}; + std::unique_ptr thread; + std::shared_ptr poll_cv; + std::shared_ptr poll_mutex; + }; + + + std::map> threads_; public: @@ -85,19 +93,21 @@ namespace goby { } - MultiThreadApplicationBase(boost::units::quantity loop_freq, Transporter* portal) - : MainThreadBase(goby::common::ApplicationBase3::app_cfg(), portal, loop_freq) - { - - goby::glog.set_lock_action(goby::common::logger_lock::lock); - } + MultiThreadApplicationBase(boost::units::quantity loop_freq, Transporter* portal, bool check_required_configuration = true) + : goby::common::ApplicationBase3(check_required_configuration), + MainThreadBase(goby::common::ApplicationBase3::app_cfg(), portal, loop_freq) + { + goby::glog.set_lock_action(goby::common::logger_lock::lock); + } virtual ~MultiThreadApplicationBase() { } template - void launch_thread(); + void launch_thread() + { _launch_thread(-1); } template - void launch_thread(int index); + void launch_thread(int index) + { _launch_thread(index); } template void join_thread(int index = -1); @@ -109,8 +119,9 @@ namespace goby void run() override { MainThreadBase::run_once(); } - template - void _launch_thread(int index, Lambda thread_lambda); + template + void _launch_thread(int index); + }; @@ -156,8 +167,8 @@ namespace goby MultiThreadStandaloneApplication(loop_freq_hertz*boost::units::si::hertz) { } - MultiThreadStandaloneApplication(boost::units::quantity loop_freq) - : Base(loop_freq, &interthread_) + MultiThreadStandaloneApplication(boost::units::quantity loop_freq, bool check_required_configuration = true) + : Base(loop_freq, &interthread_, check_required_configuration) { } @@ -178,7 +189,7 @@ namespace goby public: MultiThreadTest(boost::units::quantity loop_freq = 0*boost::units::si::hertz) - : Base(loop_freq) + : Base(loop_freq, false) { } @@ -190,52 +201,46 @@ namespace goby }; - + // selects which constructor to use based on whether the thread is launched with an index or not + template + struct ThreadTypeSelector { }; + template + struct ThreadTypeSelector + { + static std::shared_ptr thread(const Config& cfg, int index = -1) + { return std::make_shared(cfg); }; + }; + template + struct ThreadTypeSelector + { + static std::shared_ptr thread(const Config& cfg, int index) + { return std::make_shared(cfg, index); }; + }; } template -template -void goby::MultiThreadApplicationBase::launch_thread() -{ +template + void goby::MultiThreadApplicationBase::_launch_thread(int index) +{ const Config& cfg = goby::common::ApplicationBase3::app_cfg(); - int index = -1; std::type_index type_i = std::type_index(typeid(ThreadType)); - auto thread_lambda = [this, type_i, index, &cfg]() - { - ThreadType goby_thread(cfg); - goby_thread.run(alive_[type_i][index]); - }; - _launch_thread(index, thread_lambda); -} -template -template -void goby::MultiThreadApplicationBase::launch_thread(int index) -{ - const Config& cfg = goby::common::ApplicationBase3::app_cfg(); - std::type_index type_i = std::type_index(typeid(ThreadType)); - auto thread_lambda = [this, type_i, index, &cfg]() + if(threads_[type_i].count(index)) + throw(Exception(std::string("Thread of type: ") + type_i.name() + " and index " + std::to_string(index) + " was already launched.")); + + auto& thread_manager = threads_[type_i][index]; + + // copy configuration + auto thread_lambda = [this, type_i, index, cfg, &thread_manager]() { - ThreadType goby_thread(cfg, index); - goby_thread.run(alive_[type_i][index]); + std::shared_ptr goby_thread(ThreadTypeSelector::thread(cfg, index)); + thread_manager.poll_cv = goby_thread->interthread().cv(); + thread_manager.poll_mutex = goby_thread->interthread().poll_mutex(); + goby_thread->run(thread_manager.alive); }; - _launch_thread(index, thread_lambda); -} -template -template - void goby::MultiThreadApplicationBase::_launch_thread(int index, Lambda thread_lambda) -{ - std::type_index type_i = std::type_index(typeid(ThreadType)); - if(threads_[type_i].count(index)) - throw(Exception(std::string("Thread of type: ") + type_i.name() + " and index " + std::to_string(index) + " was already launched.")); - - alive_[type_i][index] = true; - - threads_[type_i].insert( - std::make_pair(index, - std::unique_ptr(new std::thread(thread_lambda)))); + thread_manager.thread = std::unique_ptr(new std::thread(thread_lambda)); } @@ -248,9 +253,9 @@ void goby::MultiThreadApplicationBase::join_thread(int inde if(!threads_[type_i].count(index)) throw(Exception(std::string("No running thread of type: ") + type_i.name() + " and index " + std::to_string(index) + " to join.")); - alive_[type_i][index] = false; - threads_[type_i][index]->join(); - alive_[type_i].erase(index); + + threads_[type_i][index].alive = false; + threads_[type_i][index].thread->join(); threads_[type_i].erase(index); } @@ -258,16 +263,19 @@ void goby::MultiThreadApplicationBase::join_thread(int inde template void goby::MultiThreadApplicationBase::quit() { - for(auto& amap : alive_) - { - for (auto& a : amap.second) - a.second = false; - } - + // join the threads for(auto& tmap : threads_) { for(auto & t : tmap.second) - t.second->join(); + { + { + std::unique_lock lock(*t.second.poll_mutex); + t.second.alive = false; + } + // notify condition variables + t.second.poll_cv->notify_all(); + t.second.thread->join(); + } } goby::common::ApplicationBase3::quit(); diff --git a/src/middleware/thread.h b/src/middleware/thread.h index 43c21d3a7..2c13b1bf6 100644 --- a/src/middleware/thread.h +++ b/src/middleware/thread.h @@ -26,6 +26,8 @@ #include #include #include +#include + #include @@ -68,7 +70,13 @@ namespace goby void run(std::atomic& alive) { alive_ = &alive; - while(alive) run_once(); + while(alive) + { + std::unique_lock lock(*transporter_->poll_mutex()); + + run_once(&lock); + } + } int index() { return index_; } @@ -105,7 +113,7 @@ namespace goby decltype(loop_frequency_) loop_frequency() { return loop_frequency_; } double loop_max_frequency() { return std::numeric_limits::infinity(); } - void run_once(); + void run_once(std::unique_lock* lock = nullptr); TransporterType& transporter() { return *transporter_; } @@ -118,7 +126,7 @@ namespace goby template - void goby::Thread::run_once() + void goby::Thread::run_once(std::unique_lock* lock) { if(!transporter_) throw(goby::Exception("Null transporter")); @@ -126,12 +134,12 @@ template if(loop_frequency_hertz() == std::numeric_limits::infinity()) { // call loop as fast as possible - transporter_->poll(std::chrono::seconds(0)); + transporter_->poll(std::chrono::seconds(0), lock); loop(); } else if(loop_frequency_hertz() > 0) { - int events = transporter_->poll(loop_time_); + int events = transporter_->poll(loop_time_, lock); // timeout if(events == 0) @@ -144,7 +152,7 @@ template else { // don't call loop() - transporter_->poll(); + transporter_->poll(std::chrono::system_clock::time_point::max(), lock); } } diff --git a/src/middleware/transport-interfaces.cpp b/src/middleware/transport-interfaces.cpp index e1496c479..649bd91ec 100644 --- a/src/middleware/transport-interfaces.cpp +++ b/src/middleware/transport-interfaces.cpp @@ -1,44 +1,53 @@ #include "transport-interfaces.h" -int goby::PollerInterface::poll(const std::chrono::system_clock::time_point& timeout) +int goby::PollerInterface::poll(const std::chrono::system_clock::time_point& timeout, + std::unique_lock* lock) { - return _poll_all(timeout); + return _poll_all(timeout, lock); } -int goby::PollerInterface::poll(std::chrono::system_clock::duration wait_for) +int goby::PollerInterface::poll(std::chrono::system_clock::duration wait_for, + std::unique_lock* lock) { if(wait_for == std::chrono::system_clock::duration::max()) - return poll(); + return poll(std::chrono::system_clock::time_point::max(), lock); else - return poll(std::chrono::system_clock::now() + wait_for); + return poll(std::chrono::system_clock::now() + wait_for, lock); } -int goby::PollerInterface::_poll_all(const std::chrono::system_clock::time_point& timeout) -{ - std::unique_lock lock(*poll_mutex_); +int goby::PollerInterface::_poll_all(const std::chrono::system_clock::time_point& timeout, + std::unique_lock* lock) +{ + + std::unique_ptr> our_lock; + if(lock == nullptr) + { + our_lock.reset(new std::unique_lock(*poll_mutex_)); + lock = our_lock.get(); + } + int poll_items = _transporter_poll(); - - while(poll_items == 0) // no items, so wait - { + if(poll_items == 0) + { if(timeout == std::chrono::system_clock::time_point::max()) { - cv_->wait(lock); // wait_until doesn't work well with time_point::max() + cv_->wait(*lock); // wait_until doesn't work well with time_point::max() poll_items = _transporter_poll(); - + if(poll_items == 0) goby::glog.is(goby::common::logger::DEBUG1) && goby::glog << "PollerInterface condition_variable: spurious wakeup" << std::endl; - + } else { - if(cv_->wait_until(lock, timeout) == std::cv_status::no_timeout) + if(cv_->wait_until(*lock, timeout) == std::cv_status::no_timeout) poll_items = _transporter_poll(); else return poll_items; } } - + return poll_items; } diff --git a/src/middleware/transport-interfaces.h b/src/middleware/transport-interfaces.h index a044b16f0..c325a2219 100644 --- a/src/middleware/transport-interfaces.h +++ b/src/middleware/transport-interfaces.h @@ -71,8 +71,8 @@ namespace goby { } - int poll(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point::max()); - int poll(std::chrono::system_clock::duration wait_for); + int poll(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point::max(), std::unique_lock* lock = nullptr); + int poll(std::chrono::system_clock::duration wait_for, std::unique_lock* lock = nullptr); std::shared_ptr poll_mutex() { return poll_mutex_; } std::shared_ptr cv() { return cv_; } @@ -84,7 +84,8 @@ namespace goby private: // poll all the transporters for data, including a timeout (only called by the outside-most Poller) - int _poll_all(const std::chrono::system_clock::time_point& timeout); + int _poll_all(const std::chrono::system_clock::time_point& timeout, + std::unique_lock* lock = nullptr); std::shared_ptr poll_mutex_; // signaled when there's no data for this thread to read during _poll() From fa3a1626bcbcb8e443f1149af090bbf9c16586a6 Mon Sep 17 00:00:00 2001 From: Toby Schneider Date: Wed, 29 Nov 2017 11:06:59 -0500 Subject: [PATCH 6/7] Fixed errors in interthread - reader/writer locking was not working correctly, so switched to C++14 and it's shared_timed_mutex which is a much cleaner solution. Also, properly locked poller_mutexs to ensure that there isn't a window where a condition variable notify is missed --- CMakeLists.txt | 4 +- src/middleware/multi-thread-application.h | 28 ++-- src/middleware/poller.h | 6 +- src/middleware/thread.h | 27 ++-- src/middleware/transport-common.h | 2 +- src/middleware/transport-interfaces.cpp | 41 +++--- src/middleware/transport-interfaces.h | 9 +- .../transport-interprocess-zeromq.h | 3 +- src/middleware/transport-interprocess.h | 6 +- src/middleware/transport-interthread.cpp | 4 +- src/middleware/transport-interthread.h | 122 ++++++++---------- src/middleware/transport-intervehicle.h | 9 +- 12 files changed, 123 insertions(+), 138 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 494e26ddf..c3b756629 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,9 +11,9 @@ cmake_policy(SET CMP0009 NEW) project(goby) if(CMAKE_VERSION VERSION_LESS 3.1) - add_definitions(-std=c++11) + add_definitions(-std=c++14) else() - set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED ON) endif() diff --git a/src/middleware/multi-thread-application.h b/src/middleware/multi-thread-application.h index e1d030140..88a7b1127 100644 --- a/src/middleware/multi-thread-application.h +++ b/src/middleware/multi-thread-application.h @@ -37,6 +37,7 @@ namespace goby template class SimpleThread : public Thread> { + using SimpleThreadBase = Thread>; public: SimpleThread(const Config& cfg, double loop_freq_hertz = 0, int index = -1) : SimpleThread(cfg, loop_freq_hertz*boost::units::si::hertz, index) { } @@ -48,6 +49,11 @@ namespace goby interthread_.reset(new goby::InterThreadTransporter); forwarder_.reset(new goby::InterProcessForwarder(*interthread_)); Thread>::set_transporter(forwarder_.get()); + + interthread_->template subscribe( + [this](const bool& shutdown) + { if(shutdown) SimpleThreadBase::thread_quit(); } + ); } goby::InterProcessForwarder& interprocess() @@ -84,7 +90,8 @@ namespace goby std::map> threads_; - + goby::InterThreadTransporter interthread_; + public: using MainThreadBase = goby::Thread; @@ -111,7 +118,9 @@ namespace goby template void join_thread(int index = -1); - protected: + protected: + goby::InterThreadTransporter& interthread() { return interthread_; } + void quit() override; private: @@ -132,7 +141,6 @@ namespace goby { private: - goby::InterThreadTransporter interthread_; goby::InterProcessPortal portal_; using Base = MultiThreadApplicationBase>; @@ -143,7 +151,7 @@ namespace goby MultiThreadApplication(boost::units::quantity loop_freq) : Base(loop_freq, &portal_), - portal_(interthread_, goby::common::ApplicationBase3::app_cfg().interprocess()) + portal_(Base::interthread(), goby::common::ApplicationBase3::app_cfg().interprocess()) { } virtual ~MultiThreadApplication() { } @@ -160,7 +168,6 @@ namespace goby private: using Base = MultiThreadApplicationBase; - goby::InterThreadTransporter interthread_; public: MultiThreadStandaloneApplication(double loop_freq_hertz = 0) : @@ -168,14 +175,13 @@ namespace goby { } MultiThreadStandaloneApplication(boost::units::quantity loop_freq, bool check_required_configuration = true) - : Base(loop_freq, &interthread_, check_required_configuration) + : Base(loop_freq, &Base::interthread(), check_required_configuration) { } virtual ~MultiThreadStandaloneApplication() { } protected: - goby::InterThreadTransporter& interthread() { return interthread_; } }; @@ -233,6 +239,7 @@ template // copy configuration auto thread_lambda = [this, type_i, index, cfg, &thread_manager]() { +// std::cout << std::this_thread::get_id() << ": thread " << index << std::endl; std::shared_ptr goby_thread(ThreadTypeSelector::thread(cfg, index)); thread_manager.poll_cv = goby_thread->interthread().cv(); thread_manager.poll_mutex = goby_thread->interthread().poll_mutex(); @@ -263,17 +270,12 @@ void goby::MultiThreadApplicationBase::join_thread(int inde template void goby::MultiThreadApplicationBase::quit() { + interthread_.publish(true); // join the threads for(auto& tmap : threads_) { for(auto & t : tmap.second) { - { - std::unique_lock lock(*t.second.poll_mutex); - t.second.alive = false; - } - // notify condition variables - t.second.poll_cv->notify_all(); t.second.thread->join(); } } diff --git a/src/middleware/poller.h b/src/middleware/poller.h index f3d080a66..e20fe8143 100644 --- a/src/middleware/poller.h +++ b/src/middleware/poller.h @@ -19,16 +19,16 @@ namespace goby PollerInterface* inner_poller() { return inner_poller_; } private: - int _transporter_poll() + int _transporter_poll(std::unique_ptr>& lock) override { // work from the inside out int inner_poll_items = 0; if(inner_poller_) // recursively call inner poll - inner_poll_items += static_cast(inner_poller_)->_transporter_poll(); + inner_poll_items += static_cast(inner_poller_)->_transporter_poll(lock); int poll_items = 0; if(!inner_poll_items) - poll_items += static_cast(this)->template _poll(); + poll_items += static_cast(this)->template _poll(lock); // goby::glog.is(goby::common::logger::DEBUG3) && goby::glog << "Poller::transporter_poll(): " << typeid(*this).name() << " this: " << this << " (" << poll_items << " items) "<< " inner_poller_: " << inner_poller_ << " (" << inner_poll_items << " items) " << std::endl; diff --git a/src/middleware/thread.h b/src/middleware/thread.h index 2c13b1bf6..dcffd1f2a 100644 --- a/src/middleware/thread.h +++ b/src/middleware/thread.h @@ -33,6 +33,8 @@ #include "goby/common/exception.h" +#include "group.h" + namespace goby { template @@ -72,9 +74,7 @@ namespace goby alive_ = &alive; while(alive) { - std::unique_lock lock(*transporter_->poll_mutex()); - - run_once(&lock); + run_once(); } } @@ -113,20 +113,26 @@ namespace goby decltype(loop_frequency_) loop_frequency() { return loop_frequency_; } double loop_max_frequency() { return std::numeric_limits::infinity(); } - void run_once(std::unique_lock* lock = nullptr); + void run_once(); TransporterType& transporter() { return *transporter_; } const Config& cfg() { return cfg_; } - void thread_quit() { (*alive_) = false; } + void thread_quit() { (*alive_) = false; } + + static constexpr goby::Group shutdown_group_ { "goby::ThreadShutdown" }; + }; + } - template - void goby::Thread::run_once(std::unique_lock* lock) + constexpr goby::Group goby::Thread::shutdown_group_; + +template + void goby::Thread::run_once() { if(!transporter_) throw(goby::Exception("Null transporter")); @@ -134,12 +140,12 @@ template if(loop_frequency_hertz() == std::numeric_limits::infinity()) { // call loop as fast as possible - transporter_->poll(std::chrono::seconds(0), lock); + transporter_->poll(std::chrono::seconds(0)); loop(); } else if(loop_frequency_hertz() > 0) { - int events = transporter_->poll(loop_time_, lock); + int events = transporter_->poll(loop_time_); // timeout if(events == 0) @@ -152,9 +158,8 @@ template else { // don't call loop() - transporter_->poll(std::chrono::system_clock::time_point::max(), lock); + transporter_->poll(); } } - #endif diff --git a/src/middleware/transport-common.h b/src/middleware/transport-common.h index f3eb782d0..c01ecd03b 100644 --- a/src/middleware/transport-common.h +++ b/src/middleware/transport-common.h @@ -49,7 +49,7 @@ namespace goby private: friend Poller; - int _poll() + int _poll(std::unique_ptr>& lock) { return 0; } }; diff --git a/src/middleware/transport-interfaces.cpp b/src/middleware/transport-interfaces.cpp index 649bd91ec..30ea5c6f0 100644 --- a/src/middleware/transport-interfaces.cpp +++ b/src/middleware/transport-interfaces.cpp @@ -1,39 +1,36 @@ #include "transport-interfaces.h" +#include "goby/common/exception.h" +#include - -int goby::PollerInterface::poll(const std::chrono::system_clock::time_point& timeout, - std::unique_lock* lock) +int goby::PollerInterface::poll(const std::chrono::system_clock::time_point& timeout) { - return _poll_all(timeout, lock); + return _poll_all(timeout); } -int goby::PollerInterface::poll(std::chrono::system_clock::duration wait_for, - std::unique_lock* lock) +int goby::PollerInterface::poll(std::chrono::system_clock::duration wait_for) { if(wait_for == std::chrono::system_clock::duration::max()) - return poll(std::chrono::system_clock::time_point::max(), lock); + return poll(); else - return poll(std::chrono::system_clock::now() + wait_for, lock); + return poll(std::chrono::system_clock::now() + wait_for); } -int goby::PollerInterface::_poll_all(const std::chrono::system_clock::time_point& timeout, - std::unique_lock* lock) +int goby::PollerInterface::_poll_all(const std::chrono::system_clock::time_point& timeout) { + // hold this lock until either we find a polled item or we wait on the condition variable + std::unique_ptr> lock(new std::unique_lock(*poll_mutex_)); +// std::cout << std::this_thread::get_id() << " _poll_all locking: " << poll_mutex_.get() << std::endl; - std::unique_ptr> our_lock; - if(lock == nullptr) - { - our_lock.reset(new std::unique_lock(*poll_mutex_)); - lock = our_lock.get(); - } - - int poll_items = _transporter_poll(); - if(poll_items == 0) - { + int poll_items = _transporter_poll(lock); + while(poll_items == 0) + { + if(!lock) + throw(goby::Exception("Poller lock was released by poll() but no poll items were returned")); + if(timeout == std::chrono::system_clock::time_point::max()) { cv_->wait(*lock); // wait_until doesn't work well with time_point::max() - poll_items = _transporter_poll(); + poll_items = _transporter_poll(lock); if(poll_items == 0) goby::glog.is(goby::common::logger::DEBUG1) && goby::glog << "PollerInterface condition_variable: spurious wakeup" << std::endl; @@ -42,7 +39,7 @@ int goby::PollerInterface::_poll_all(const std::chrono::system_clock::time_point else { if(cv_->wait_until(*lock, timeout) == std::cv_status::no_timeout) - poll_items = _transporter_poll(); + poll_items = _transporter_poll(lock); else return poll_items; } diff --git a/src/middleware/transport-interfaces.h b/src/middleware/transport-interfaces.h index c325a2219..017db6837 100644 --- a/src/middleware/transport-interfaces.h +++ b/src/middleware/transport-interfaces.h @@ -71,8 +71,8 @@ namespace goby { } - int poll(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point::max(), std::unique_lock* lock = nullptr); - int poll(std::chrono::system_clock::duration wait_for, std::unique_lock* lock = nullptr); + int poll(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point::max()); + int poll(std::chrono::system_clock::duration wait_for); std::shared_ptr poll_mutex() { return poll_mutex_; } std::shared_ptr cv() { return cv_; } @@ -80,12 +80,11 @@ namespace goby private: template friend class Poller; // poll the transporter for data - virtual int _transporter_poll() = 0; + virtual int _transporter_poll(std::unique_ptr>& lock) = 0; private: // poll all the transporters for data, including a timeout (only called by the outside-most Poller) - int _poll_all(const std::chrono::system_clock::time_point& timeout, - std::unique_lock* lock = nullptr); + int _poll_all(const std::chrono::system_clock::time_point& timeout); std::shared_ptr poll_mutex_; // signaled when there's no data for this thread to read during _poll() diff --git a/src/middleware/transport-interprocess-zeromq.h b/src/middleware/transport-interprocess-zeromq.h index 4a02cf158..c5fda7dd4 100644 --- a/src/middleware/transport-interprocess-zeromq.h +++ b/src/middleware/transport-interprocess-zeromq.h @@ -161,7 +161,7 @@ namespace goby } - int _poll() + int _poll(std::unique_ptr>& lock) { int items = 0; protobuf::InprocControl control_msg; @@ -171,6 +171,7 @@ namespace goby { case protobuf::InprocControl::RECEIVE: ++items; + if(lock) lock.reset(); for(auto &sub : subscriptions_) { const auto& data = control_msg.received_data(); diff --git a/src/middleware/transport-interprocess.h b/src/middleware/transport-interprocess.h index b59d6a668..0d1461fde 100644 --- a/src/middleware/transport-interprocess.h +++ b/src/middleware/transport-interprocess.h @@ -92,8 +92,8 @@ namespace goby private: friend PollerType; - int _poll() - { return static_cast(this)->_poll(); } + int _poll(std::unique_ptr>& lock) + { return static_cast(this)->_poll(lock); } }; template @@ -190,7 +190,7 @@ namespace goby sub->post(bytes.begin(), bytes.end(), data->marshalling_scheme(), data->type(), data->group()); } - int _poll() + int _poll(std::unique_ptr>& lock) { return 0; } // A forwarder is a shell, only the inner Transporter has data private: diff --git a/src/middleware/transport-interthread.cpp b/src/middleware/transport-interthread.cpp index 732f6e8d9..fda8a8d0e 100644 --- a/src/middleware/transport-interthread.cpp +++ b/src/middleware/transport-interthread.cpp @@ -1,6 +1,4 @@ #include "transport-interthread.h" std::unordered_map> goby::SubscriptionStoreBase::stores_; -std::mutex goby::SubscriptionStoreBase::stores_mutex_; -std::condition_variable goby::SubscriptionStoreBase::stores_cv_; -std::atomic goby::SubscriptionStoreBase::pollers_(0); +std::shared_timed_mutex goby::SubscriptionStoreBase::stores_mutex_; diff --git a/src/middleware/transport-interthread.h b/src/middleware/transport-interthread.h index 5b3faf463..127cafe0b 100644 --- a/src/middleware/transport-interthread.h +++ b/src/middleware/transport-interthread.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -12,41 +13,17 @@ #include "transport-common.h" namespace goby -{ - // increments and decrements a reader count using RAII - // once the reader count goes to zero, notifies a condition variable (for the writer(s) to use) - struct ReaderRegister - { - ReaderRegister(std::atomic& counter, std::condition_variable& cv) : counter_(counter), cv_(cv) - { - ++counter; - } - ~ReaderRegister() - { - --counter_; - - if(counter_ == 0) cv_.notify_all(); - } - - std::atomic& counter_; - std::condition_variable& cv_; - }; - - +{ class SubscriptionStoreBase { public: // returns number of data items posted to callbacks - static int poll_all(std::thread::id thread_id) + static int poll_all(std::thread::id thread_id, std::unique_ptr>& lock) { - stores_mutex_.lock(); - // multiple readers - ReaderRegister r(pollers_, stores_cv_); - stores_mutex_.unlock(); - + std::shared_lock stores_lock(stores_mutex_); int poll_items = 0; for (auto const &s : stores_) - poll_items += s.second->poll(thread_id); + poll_items += s.second->poll(thread_id, lock); return poll_items; } @@ -54,41 +31,45 @@ namespace goby template static void insert() { - // check the store, and if there isn't one for this type, create one - std::unique_lock lock(stores_mutex_); - while(pollers_ > 0) // wait for readers - stores_cv_.wait(lock); - + std::lock_guard lock(stores_mutex_); auto index = std::type_index(typeid(StoreType)); if(!stores_.count(index)) stores_.insert(std::make_pair(index, std::unique_ptr(new StoreType))); } protected: - virtual int poll(std::thread::id thread_id) = 0; + virtual int poll(std::thread::id thread_id, std::unique_ptr>& lock) = 0; private: // stores a map of Datas to SubscriptionStores so that can call poll() on all the stores static std::unordered_map> stores_; - static std::mutex stores_mutex_; - static std::condition_variable stores_cv_; - static std::atomic pollers_; // number of active readers on stores_mutex_ + static std::shared_timed_mutex stores_mutex_; }; + + struct DataProtection + { + DataProtection(std::shared_ptr dm, + std::shared_ptr pcv, + std::shared_ptr pm) + : data_mutex(dm), poller_cv(pcv), poller_mutex(pm) {} + + std::shared_ptr data_mutex; + std::shared_ptr poller_cv; + std::shared_ptr poller_mutex; + }; template class SubscriptionStore : public SubscriptionStoreBase { public: - static void subscribe(std::function)> func, const Group& group, std::thread::id thread_id, std::shared_ptr mutex, std::shared_ptr cv) + static void subscribe(std::function)> func, const Group& group, std::thread::id thread_id, std::shared_ptr data_mutex, std::shared_ptr cv, std::shared_ptr poller_mutex) { { - std::unique_lock lock(subscription_mutex_); - while(subscription_readers_ > 0) - subscription_cv_.wait(lock); - + std::lock_guard lock(subscription_mutex_); + // insert callback auto it = subscription_callbacks_.insert(std::make_pair(thread_id, Callback(group, func))); // insert group with iterator to callback @@ -106,7 +87,7 @@ namespace goby // if we don't have a condition variable already for this thread, store it if(!data_protection_.count(thread_id)) - data_protection_.insert(std::make_pair(thread_id, std::make_pair(mutex, cv))); + data_protection_.insert(std::make_pair(thread_id, DataProtection(data_mutex, cv, poller_mutex))); } // try inserting a copy of this templated class via the base class for SubscriptionStoreBase::poll_all to use @@ -115,13 +96,13 @@ namespace goby static void publish(std::shared_ptr data, const Group& group, const goby::protobuf::TransporterConfig& transport_cfg) { +// std::cout << std::this_thread::get_id() << "publishing to : " << group << std::endl; + // push new data // build up local vector of relevant condition variables while locked - std::vector, std::shared_ptr>> cv_to_notify; + std::vector cv_to_notify; { - subscription_mutex_.lock(); - ReaderRegister r(subscription_readers_, subscription_cv_); - subscription_mutex_.unlock(); + std::shared_lock lock(subscription_mutex_); auto range = subscription_groups_.equal_range(group); for (auto it = range.first; it != range.second; ++it) @@ -132,7 +113,7 @@ namespace goby if(thread_id != std::this_thread::get_id() || transport_cfg.echo()) { // protect the DataQueue we are writing to - std::unique_lock lock(*(data_protection_.find(thread_id)->second.first)); + std::unique_lock lock(*(data_protection_.find(thread_id)->second.data_mutex)); auto queue_it = data_.find(thread_id); queue_it->second.insert(group, data); cv_to_notify.push_back(data_protection_.at(thread_id)); @@ -141,25 +122,30 @@ namespace goby } // unlock and notify condition variables from local vector - for (const auto& cv_mutex_pair : cv_to_notify) + for (const auto& data_protection : cv_to_notify) { - cv_mutex_pair.second->notify_all(); + { + // lock to ensure the other thread isn't in the limbo region + // between _poll_all() and wait(), where the condition variable + // signal would be lost + +// std::cout << std::this_thread::get_id() << "publish notify locking: " << data_protection.poller_mutex.get() << std::endl; + std::lock_guard(*data_protection.poller_mutex); + } + data_protection.poller_cv->notify_all(); } } private: - int poll(std::thread::id thread_id) + int poll(std::thread::id thread_id, std::unique_ptr>& lock) { std::vector)>, std::shared_ptr>> data_callbacks; int poll_items_count = 0; - subscription_mutex_.lock(); - ReaderRegister r(subscription_readers_, subscription_cv_); - subscription_mutex_.unlock(); - + std::shared_lock sub_lock(subscription_mutex_); auto queue_it = data_.find(thread_id); if(queue_it == data_.end()) @@ -167,7 +153,7 @@ namespace goby { - std::unique_lock lock(*(data_protection_.find(thread_id)->second.first)); + std::unique_lock data_lock(*(data_protection_.find(thread_id)->second.data_mutex)); // loop over all Groups stored in this DataQueue for (auto data_it = queue_it->second.cbegin(), end = queue_it->second.cend(); data_it != end; ++data_it) @@ -185,6 +171,8 @@ namespace goby for(auto& datum : data_it->second) { ++poll_items_count; + // we have data, no need to keep this lock any longer + if(lock) lock.reset(); data_callbacks.push_back(std::make_pair(callback, datum)); } } @@ -239,11 +227,9 @@ namespace goby // threads that are subscribed to a given group static std::unordered_multimap subscription_groups_; // condition variable to use for data - static std::unordered_map, std::shared_ptr>> data_protection_; + static std::unordered_map data_protection_; - static std::mutex subscription_mutex_; // protects subscription_callbacks, subscription_groups, data_protection, and the overarching data_ map (but not the DataQueues within it, which are protected by the mutexes stored in data_protection_)) - static std::condition_variable subscription_cv_; - static std::atomic subscription_readers_; // number of active readers on subscription_mutex_ + static std::shared_timed_mutex subscription_mutex_; // protects subscription_callbacks, subscription_groups, data_protection, and the overarching data_ map (but not the DataQueues within it, which are protected by the mutexes stored in data_protection_)) // data for a given thread @@ -259,14 +245,10 @@ namespace goby template std::unordered_multimap::subscription_callbacks_)::const_iterator> SubscriptionStore::subscription_groups_; template - std::unordered_map, std::shared_ptr>> SubscriptionStore::data_protection_; + std::unordered_map SubscriptionStore::data_protection_; template - std::mutex SubscriptionStore::subscription_mutex_; - template - std::condition_variable SubscriptionStore::subscription_cv_; - template - std::atomic SubscriptionStore::subscription_readers_; + std::shared_timed_mutex SubscriptionStore::subscription_mutex_; class InterThreadTransporter : public StaticTransporterInterface, @@ -309,21 +291,21 @@ namespace goby void subscribe_dynamic(std::function f, const Group& group) { check_validity_runtime(group); - SubscriptionStore::subscribe([=](std::shared_ptr pd) { f(*pd); }, group, std::this_thread::get_id(), data_mutex_, Poller::cv()); + SubscriptionStore::subscribe([=](std::shared_ptr pd) { f(*pd); }, group, std::this_thread::get_id(), data_mutex_, Poller::cv(), Poller::poll_mutex()); } template()> void subscribe_dynamic(std::function)> f, const Group& group) { check_validity_runtime(group); - SubscriptionStore::subscribe(f, group, std::this_thread::get_id(), data_mutex_, Poller::cv()); + SubscriptionStore::subscribe(f, group, std::this_thread::get_id(), data_mutex_, Poller::cv(), Poller::poll_mutex()); } private: friend Poller; - int _poll() - { return SubscriptionStoreBase::poll_all(std::this_thread::get_id()); } + int _poll(std::unique_ptr>& lock) + { return SubscriptionStoreBase::poll_all(std::this_thread::get_id(), lock); } private: // protects this thread's DataQueue diff --git a/src/middleware/transport-intervehicle.h b/src/middleware/transport-intervehicle.h index fb83bb002..8b6793db3 100644 --- a/src/middleware/transport-intervehicle.h +++ b/src/middleware/transport-intervehicle.h @@ -124,8 +124,8 @@ namespace goby private: friend PollerType; - int _poll() - { return static_cast(this)->_poll(); } + int _poll(std::unique_ptr>& lock) + { return static_cast(this)->_poll(lock); } }; template @@ -200,7 +200,7 @@ namespace goby - int _poll() + int _poll(std::unique_ptr>& lock) { return 0; } void _receive_dccl_data_forwarded(const goby::protobuf::DCCLForwardedData& packets) @@ -301,7 +301,7 @@ namespace goby subscriptions_[dccl_id].insert(std::make_pair(group, subscription)); } - int _poll() + int _poll(std::unique_ptr>& lock) { int items = 0; goby::acomms::protobuf::ModemTransmission msg; @@ -309,6 +309,7 @@ namespace goby { _receive(msg); ++items; + if(lock) lock.reset(); } return items; } From 1cd6732edef8f60ca1c59111eb5aed07057164a7 Mon Sep 17 00:00:00 2001 From: Toby Schneider Date: Wed, 29 Nov 2017 13:09:12 -0500 Subject: [PATCH 7/7] Added option to launch_thread from MultiThreadApplications using a different configuration object than the main application --- src/middleware/multi-thread-application.h | 37 ++++++++++++++--------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/middleware/multi-thread-application.h b/src/middleware/multi-thread-application.h index 88a7b1127..d6dade602 100644 --- a/src/middleware/multi-thread-application.h +++ b/src/middleware/multi-thread-application.h @@ -111,10 +111,18 @@ namespace goby template void launch_thread() - { _launch_thread(-1); } + { _launch_thread(-1, goby::common::ApplicationBase3::app_cfg()); } template void launch_thread(int index) - { _launch_thread(index); } + { _launch_thread(index, goby::common::ApplicationBase3::app_cfg()); } + + template + void launch_thread(const ThreadConfig& cfg) + { _launch_thread(-1, cfg); } + template + void launch_thread(int index, const ThreadConfig& cfg) + { _launch_thread(index, cfg); } + template void join_thread(int index = -1); @@ -128,8 +136,8 @@ namespace goby void run() override { MainThreadBase::run_once(); } - template - void _launch_thread(int index); + template + void _launch_thread(int index, const ThreadConfig& cfg); }; @@ -208,27 +216,26 @@ namespace goby }; // selects which constructor to use based on whether the thread is launched with an index or not - template + template struct ThreadTypeSelector { }; - template - struct ThreadTypeSelector + template + struct ThreadTypeSelector { - static std::shared_ptr thread(const Config& cfg, int index = -1) + static std::shared_ptr thread(const ThreadConfig& cfg, int index = -1) { return std::make_shared(cfg); }; }; - template - struct ThreadTypeSelector + template + struct ThreadTypeSelector { - static std::shared_ptr thread(const Config& cfg, int index) + static std::shared_ptr thread(const ThreadConfig& cfg, int index) { return std::make_shared(cfg, index); }; }; } template -template - void goby::MultiThreadApplicationBase::_launch_thread(int index) + template + void goby::MultiThreadApplicationBase::_launch_thread(int index, const ThreadConfig& cfg) { - const Config& cfg = goby::common::ApplicationBase3::app_cfg(); std::type_index type_i = std::type_index(typeid(ThreadType)); if(threads_[type_i].count(index)) @@ -240,7 +247,7 @@ template auto thread_lambda = [this, type_i, index, cfg, &thread_manager]() { // std::cout << std::this_thread::get_id() << ": thread " << index << std::endl; - std::shared_ptr goby_thread(ThreadTypeSelector::thread(cfg, index)); + std::shared_ptr goby_thread(ThreadTypeSelector::thread(cfg, index)); thread_manager.poll_cv = goby_thread->interthread().cv(); thread_manager.poll_mutex = goby_thread->interthread().poll_mutex(); goby_thread->run(thread_manager.alive);