Skip to content

Commit

Permalink
Merge pull request #21 from eclipse-ecal/global-dataread-datawrite-re…
Browse files Browse the repository at this point in the history
…moval

global process dataclock, dataread bytes, datawrite bytes removed
  • Loading branch information
rex-schilasky authored Feb 15, 2024
2 parents d0429ff + 3b37231 commit bc8c5f8
Show file tree
Hide file tree
Showing 24 changed files with 42 additions and 114 deletions.
2 changes: 0 additions & 2 deletions samples/cpp/monitoring/monitoring_rec/src/monitoring_rec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ int main(int argc, char **argv)
std::cout << "pname : " << process.pname() << std::endl; // process name
std::cout << "uname : " << process.uname() << std::endl; // unit name
std::cout << "pparam : " << process.pparam() << std::endl; // process parameter
std::cout << "datawrite : " << process.datawrite() << std::endl; // data write bytes per sec
std::cout << "dataread : " << process.dataread() << std::endl; // date read bytes per sec
std::cout << "severity : " << process.state().severity() << std::endl; // process state severity
std::cout << "info : " << process.state().info() << std::endl; // process state info
std::cout << "tsync state : " << process.tsync_state() << std::endl; // time sync state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ message LayerParShm
repeated string memory_file_list = 1; // list of memory file names
}

message LayerParInproc
{
}

message LayerParTcp
{
int32 port = 1; // tcp writers port number
Expand All @@ -43,15 +39,19 @@ message ConnnectionPar // connection parameter for read
{
LayerParUdpMC layer_par_udpmc = 1; // parameter for ecal udp multicast
LayerParShm layer_par_shm = 2; // parameter for ecal shared memory
// 3 = parameter for ecal inner process
LayerParTcp layer_par_tcp = 4; // parameter for ecal tcp
}

enum eTLayerType // transport layer
{
tl_none = 0; // undefined
tl_ecal_udp_mc = 1; // ecal udp multicast
// 2 = ecal udp unicast (not supported anymore)
// 3 = ecal udp metal (not supported anymore)
tl_ecal_shm = 4; // ecal shared memory
tl_ecal_tcp = 5; // ecal tcp
// 42 = inproc (not supported anymore)
tl_all = 255; // all layer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ message Process // process
string pname = 4; // process name
string uname = 5; // unit name
string pparam = 6; // process parameter
int64 datawrite = 10; // data write bytes per sec
int64 dataread = 11; // data read bytes per sec
// 10 = data write bytes per sec
// 11 = data read bytes per sec
ProcessState state = 12; // process state info
eTSyncState tsync_state = 13; // time synchronization state
string tsync_mod_name = 14; // time synchronization module name
Expand Down
4 changes: 2 additions & 2 deletions samples/cpp/monitoring/monitoring_reg/src/protobuf/ecal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ message Content // topic content
int64 id = 1; // sample id
int64 clock = 2; // internal used clock
int64 time = 3; // time the content was updated
int64 hash = 7; // unique hash for that sample
int32 size = 6; // size (additional for none payload "header only samples")
bytes payload = 4; // octet stream
int32 size = 6; // size (redundant for compatibility)
int64 hash = 7; // unique hash for that sample
}

enum eCmdType // command type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ message LayerParShm
repeated string memory_file_list = 1; // list of memory file names
}

message LayerParInproc
{
}

message LayerParTcp
{
int32 port = 1; // tcp writers port number
Expand All @@ -43,15 +39,19 @@ message ConnnectionPar // connection parameter for read
{
LayerParUdpMC layer_par_udpmc = 1; // parameter for ecal udp multicast
LayerParShm layer_par_shm = 2; // parameter for ecal shared memory
// 3 = parameter for ecal inner process
LayerParTcp layer_par_tcp = 4; // parameter for ecal tcp
}

enum eTLayerType // transport layer
{
tl_none = 0; // undefined
tl_ecal_udp_mc = 1; // ecal udp multicast
// 2 = ecal udp unicast (not supported anymore)
// 3 = ecal udp metal (not supported anymore)
tl_ecal_shm = 4; // ecal shared memory
tl_ecal_tcp = 5; // ecal tcp
// 42 = inproc (not supported anymore)
tl_all = 255; // all layer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ message Process // process
string pname = 4; // process name
string uname = 5; // unit name
string pparam = 6; // process parameter
int64 datawrite = 10; // data write bytes per sec
int64 dataread = 11; // data read bytes per sec
// 10 = data write bytes per sec
// 11 = data read bytes per sec
ProcessState state = 12; // process state info
eTSyncState tsync_state = 13; // time synchronization state
string tsync_mod_name = 14; // time synchronization module name
Expand Down
5 changes: 0 additions & 5 deletions src/core/include/ecal/types/monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ namespace eCAL
{
rclock = 0;
pid = 0;
datawrite = 0;
dataread = 0;
state_severity = 0;
state_severity_level = 0;
tsync_state = 0;
Expand All @@ -131,9 +129,6 @@ namespace eCAL
std::string uname; //!< unit name
std::string pparam; //!< process parameter

int64_t datawrite; //!< data write bytes per sec
int64_t dataread; //!< data read bytes per sec

int32_t state_severity; //!< process state info severity:
//!< proc_sev_unknown = 0 (condition unknown)
//!< proc_sev_healthy = 1 (process healthy)
Expand Down
8 changes: 0 additions & 8 deletions src/core/src/ecal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,6 @@ namespace eCAL
{
for (size_t i = 0; i < static_cast<size_t>(argc_); ++i) if (argv_[i] != nullptr) g_task_parameter.emplace_back(argv_[i]);
}

g_process_wclock = 0;
g_process_wbytes = 0;
g_process_wbytes_sum = 0;

g_process_rclock = 0;
g_process_rbytes = 0;
g_process_rbytes_sum = 0;
}
g_globals_ctx_ref_cnt++;

Expand Down
8 changes: 0 additions & 8 deletions src/core/src/ecal_global_accessors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ namespace eCAL
eCAL_Process_eSeverity g_process_severity(eCAL_Process_eSeverity::proc_sev_unknown);
eCAL_Process_eSeverity_Level g_process_severity_level(eCAL_Process_eSeverity_Level::proc_sev_level1);

std::atomic<long long> g_process_wclock;
std::atomic<long long> g_process_wbytes;
std::atomic<long long> g_process_wbytes_sum;

std::atomic<long long> g_process_rclock;
std::atomic<long long> g_process_rbytes;
std::atomic<long long> g_process_rbytes_sum;

CGlobals* g_globals()
{
return g_globals_ctx;
Expand Down
8 changes: 0 additions & 8 deletions src/core/src/ecal_global_accessors.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,4 @@ namespace eCAL

extern eCAL_Process_eSeverity g_process_severity;
extern eCAL_Process_eSeverity_Level g_process_severity_level;

extern std::atomic<long long> g_process_wclock;
extern std::atomic<long long> g_process_wbytes;
extern std::atomic<long long> g_process_wbytes_sum;

extern std::atomic<long long> g_process_rclock;
extern std::atomic<long long> g_process_rbytes;
extern std::atomic<long long> g_process_rbytes_sum;
}
4 changes: 0 additions & 4 deletions src/core/src/monitoring/ecal_monitoring_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,6 @@ namespace eCAL
const int process_id = sample_process.pid;
const std::string& process_param = sample_process.pparam;
const std::string& unit_name = sample_process.uname;
const long long process_datawrite = sample_process.datawrite;
const long long process_dataread = sample_process.dataread;
const auto& sample_process_state = sample_process.state;
const int process_state_severity = sample_process_state.severity;
const int process_state_severity_level = sample_process_state.severity_level;
Expand Down Expand Up @@ -392,8 +390,6 @@ namespace eCAL

// update flexible content
ProcessInfo.rclock++;
ProcessInfo.datawrite = process_datawrite;
ProcessInfo.dataread = process_dataread;
ProcessInfo.state_severity = process_state_severity;
ProcessInfo.state_severity_level = process_state_severity_level;
ProcessInfo.state_info = process_state_info;
Expand Down
11 changes: 2 additions & 9 deletions src/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,9 @@ namespace eCAL
break;
}

// update globals
g_process_rclock++;
g_process_rbytes_sum += payload_size;

// apply sample to data reader
std::vector<std::shared_ptr<CDataReader>> readers_to_apply;

// Lock the sync map only while extracting the relevant shared pointers to the Datareaders.
// Apply the samples to the readers afterward.
{
Expand Down Expand Up @@ -215,10 +212,6 @@ namespace eCAL
{
if (!m_created) return false;

// update globals
g_process_rclock++;
g_process_rbytes_sum += len_;

// apply sample to data reader
size_t applied_size(0);
std::vector<std::shared_ptr<CDataReader>> readers_to_apply;
Expand Down
6 changes: 0 additions & 6 deletions src/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,6 @@ namespace eCAL
{
// increase write clock
m_clock++;

// statistics
g_process_wclock++;
}

std::string CDataWriter::Dump(const std::string& indent_ /* = "" */)
Expand Down Expand Up @@ -1208,9 +1205,6 @@ namespace eCAL
const std::hash<SSndHash> hf;
const size_t snd_hash = hf(SSndHash(m_topic_id, m_clock));

// increase overall sum send
g_process_wbytes_sum += len_;

// store size for monitoring
m_topic_size = len_;

Expand Down
20 changes: 18 additions & 2 deletions src/core/src/readwrite/tcp/ecal_writer_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,28 @@ namespace eCAL
std::vector<char> serialized_proto_header;
SerializeToBuffer(proto_header, serialized_proto_header);

// Get size of ecal payload sample
// Get size of un-altered proto header size
auto proto_header_size = static_cast<uint16_t>(serialized_proto_header.size());

// Compute needed padding for aligning the payload
//const size_t minimal_header_size = ecal_magic_size + sizeof(uint16_t) + ecal_sample_size;
constexpr size_t alignment_bytes = 8;
const size_t minimal_header_size = ecal_magic_size + sizeof(uint16_t) + proto_header_size;
const size_t padding_size = (alignment_bytes - (minimal_header_size % alignment_bytes)) % alignment_bytes;

// Add more bytes to the protobuf message to blow it up to the alignment
// Aligning the user payload this way should be 100% compatible with previous
// versions. It's most certainly bad style though and we should improve this
// in a future eCAL version.
//
// TODO: REMOVE ME FOR ECAL6
proto_header.padding.resize(padding_size);

// Serialize payload sample again (now with padding) and reread size
serialized_proto_header.clear();
SerializeToBuffer(proto_header, serialized_proto_header);
proto_header_size = static_cast<uint16_t>(serialized_proto_header.size());

// prepare the header buffer
// 'ECAL' + proto header size field + proto header
m_header_buffer.resize(ecal_magic_size + sizeof(uint16_t) + proto_header_size);

Expand Down
10 changes: 0 additions & 10 deletions src/core/src/registration/ecal_registration_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ namespace eCAL
process_sample_process.pname = Process::GetProcessName();
process_sample_process.uname = Process::GetUnitName();
process_sample_process.pparam = Process::GetProcessParameter();
process_sample_process.datawrite = g_process_wbytes;
process_sample_process.dataread = g_process_rbytes;
process_sample_process.state.severity = static_cast<Registration::eProcessSeverity>(g_process_severity);
process_sample_process.state.severity_level = static_cast<Registration::eProcessSeverityLevel>(g_process_severity_level);
process_sample_process.state.info = g_process_info;
Expand Down Expand Up @@ -533,14 +531,6 @@ namespace eCAL

void CRegistrationProvider::RegisterSendThread()
{
// calculate average receive bytes
g_process_rbytes = static_cast<long long>(((double)g_process_rbytes_sum / m_reg_refresh) * 1000.0);
g_process_rbytes_sum = 0;

// calculate average write bytes
g_process_wbytes = static_cast<long long>(((double)g_process_wbytes_sum / m_reg_refresh) * 1000.0);
g_process_wbytes_sum = 0;

#if ECAL_CORE_SUBSCRIBER
// refresh subscriber registration
if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations();
Expand Down
8 changes: 0 additions & 8 deletions src/core/src/serialization/ecal_serialize_monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ namespace
eCAL::nanopb::encode_string(pb_process_.uname, process_.uname);
// pparam
eCAL::nanopb::encode_string(pb_process_.pparam, process_.pparam);
// datawrite
pb_process_.datawrite = process_.datawrite;
// dataread
pb_process_.dataread = process_.dataread;
// state
pb_process_.has_state = true;
// state.severity
Expand Down Expand Up @@ -518,10 +514,6 @@ namespace
process_.rclock = pb_process_.rclock;
// pid
process_.pid = pb_process_.pid;
// datawrite
process_.datawrite = pb_process_.datawrite;
// dataread
process_.dataread = pb_process_.dataread;
// state.severity
process_.state_severity = pb_process_.state.severity;
// state.severity_level
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ namespace
eCAL::nanopb::encode_string(pb_sample_.process.uname, registration_.process.uname);
// pparam
eCAL::nanopb::encode_string(pb_sample_.process.pparam, registration_.process.pparam);
// datawrite
pb_sample_.process.datawrite = registration_.process.datawrite;
// dataread
pb_sample_.process.dataread = registration_.process.dataread;
// state.severity
pb_sample_.process.state.severity = static_cast<eCAL_pb_eProcessSeverity>(registration_.process.state.severity);
// state.severity_level
Expand Down Expand Up @@ -359,10 +355,6 @@ namespace
registration_.process.rclock = pb_sample_.process.rclock;
// pid
registration_.process.pid = pb_sample_.process.pid;
// datawrite
registration_.process.datawrite = pb_sample_.process.datawrite;
// dataread
registration_.process.dataread = pb_sample_.process.dataread;
// state.severity
registration_.process.state.severity = static_cast<eCAL::Registration::eProcessSeverity>(pb_sample_.process.state.severity);
// state.severity_level
Expand Down
6 changes: 2 additions & 4 deletions src/core/src/serialization/ecal_struct_sample_registration.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,13 @@ namespace eCAL
// Process information
struct Process
{
int32_t rclock = 0; // registration clock
int32_t rclock = 0; // registration clock
std::string hname; // host name
std::string hgname; // host group name
int32_t pid = 0; // process id
int32_t pid = 0; // process id
std::string pname; // process name
std::string uname; // unit name
std::string pparam; // process parameter
int64_t datawrite = 0; // data write bytes per sec
int64_t dataread = 0; // data read bytes per sec
ProcessState state; // process state info
eTSyncState tsync_state = tsync_none; // time synchronization state
std::string tsync_mod_name; // time synchronization module name
Expand Down
12 changes: 4 additions & 8 deletions src/core/src/serialization/nanopb/process.pb.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ typedef struct _eCAL_pb_Process {
pb_callback_t pname; /* process name */
pb_callback_t uname; /* unit name */
pb_callback_t pparam; /* process parameter */
int64_t datawrite; /* data write bytes per sec */
int64_t dataread; /* data read bytes per sec */
/* 10 = data write bytes per sec
11 = data read bytes per sec */
bool has_state;
eCAL_pb_ProcessState state; /* process state info */
eCAL_pb_eTSyncState tsync_state; /* time synchronization state */
Expand Down Expand Up @@ -85,9 +85,9 @@ extern "C" {

/* Initializer values for message structs */
#define eCAL_pb_ProcessState_init_default {_eCAL_pb_eProcessSeverity_MIN, {{NULL}, NULL}, _eCAL_pb_eProcessSeverityLevel_MIN}
#define eCAL_pb_Process_init_default {0, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, 0, false, eCAL_pb_ProcessState_init_default, _eCAL_pb_eTSyncState_MIN, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}}
#define eCAL_pb_Process_init_default {0, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, false, eCAL_pb_ProcessState_init_default, _eCAL_pb_eTSyncState_MIN, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}}
#define eCAL_pb_ProcessState_init_zero {_eCAL_pb_eProcessSeverity_MIN, {{NULL}, NULL}, _eCAL_pb_eProcessSeverityLevel_MIN}
#define eCAL_pb_Process_init_zero {0, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, 0, false, eCAL_pb_ProcessState_init_zero, _eCAL_pb_eTSyncState_MIN, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}}
#define eCAL_pb_Process_init_zero {0, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, false, eCAL_pb_ProcessState_init_zero, _eCAL_pb_eTSyncState_MIN, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}}

/* Field tags (for use in manual encoding/decoding) */
#define eCAL_pb_ProcessState_severity_tag 1
Expand All @@ -99,8 +99,6 @@ extern "C" {
#define eCAL_pb_Process_pname_tag 4
#define eCAL_pb_Process_uname_tag 5
#define eCAL_pb_Process_pparam_tag 6
#define eCAL_pb_Process_datawrite_tag 10
#define eCAL_pb_Process_dataread_tag 11
#define eCAL_pb_Process_state_tag 12
#define eCAL_pb_Process_tsync_state_tag 13
#define eCAL_pb_Process_tsync_mod_name_tag 14
Expand All @@ -124,8 +122,6 @@ X(a, STATIC, SINGULAR, INT32, pid, 3) \
X(a, CALLBACK, SINGULAR, STRING, pname, 4) \
X(a, CALLBACK, SINGULAR, STRING, uname, 5) \
X(a, CALLBACK, SINGULAR, STRING, pparam, 6) \
X(a, STATIC, SINGULAR, INT64, datawrite, 10) \
X(a, STATIC, SINGULAR, INT64, dataread, 11) \
X(a, STATIC, OPTIONAL, MESSAGE, state, 12) \
X(a, STATIC, SINGULAR, UENUM, tsync_state, 13) \
X(a, CALLBACK, SINGULAR, STRING, tsync_mod_name, 14) \
Expand Down
Loading

0 comments on commit bc8c5f8

Please sign in to comment.