diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..540d38f --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.css linguist-detectable=false diff --git a/CHANGELOG.md b/CHANGELOG.md index f22b0eb..6edb6ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Change Log +## [2.3.0] - 24-03-2018 +### Extensions API +- Added `reset` in `DispatchErrorExtension` which is called when previously failed is no longer failing + +### Updates +- Added `stats dyn` and `stats queue` +- Backup keys only visible with `--with-key` parameter +- Max bulk items can be set to 5-500 + +### Fixes +- Dynamic buffer recovery check fix +- Crash when rotating an invalid file + +### Internal Updates +- Abstract server and I/O service +- Use shared pointer to keep reference for the session in the handler + ## [2.2.1] - 19-03-2018 ### Extensions API - Added `LogExtension::Level` enum for readability of `LogExtension::Data::level` diff --git a/CMakeLists.txt b/CMakeLists.txt index 483eb21..10165a3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,8 +11,8 @@ option (use_boost "Use boost or standalone networking lib" OFF) option (old_toolchain "Is old toolchain" OFF) set (RESIDUE_MAJOR "2") -set (RESIDUE_MINOR "2") -set (RESIDUE_PATCH "1") +set (RESIDUE_MINOR "3") +set (RESIDUE_PATCH "0") set (RESIDUE_VERSION "${RESIDUE_MAJOR}.${RESIDUE_MINOR}.${RESIDUE_PATCH}") set (RESIDUE_NAME "Residue") diff --git a/README.md b/README.md index 0a59e3d..c5818b9 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,8 @@ Some of the notable features are listed below * Encryption: All the incoming and outgoing data is automatically encrypted. Residue also has several layers and techniques to secure the requests and to prevent unauthorised application from logging without compromising the speed. * Compression: Residue server and official client libraries provide ability to compress the network packets which makes a big difference in amount of data transferred over the wire. * Speed is not compromised in any way and there is a lot of testing being done every day to ensure that logging server meets the speed requirements. + * Dynamic buffers allow logs to be queued up in the memory while there is some I/O error (e.g, disk full) and cleared as soon as disk errors are fixed + * Residue extensions allow certain code to be executed in case of specific events (e.g, log dispatch failure) - see [Extensions API](/docs/CONFIGURATION.md#extensions) * Reliability: We are actively working on making the server much more reliable from restarting from the crash to handling bad traffic. * Residue comes with various [tools](https://github.com/topics/residue-tools) for devops and [client libraries](https://github.com/topics/residue-client) for easy integration. * There are many more features available. Please feel free to download your copy of residue binary and give it a try. diff --git a/docs/CLI_COMMANDS.md b/docs/CLI_COMMANDS.md index 73723dc..830bff5 100644 --- a/docs/CLI_COMMANDS.md +++ b/docs/CLI_COMMANDS.md @@ -71,9 +71,31 @@ Only check the schedule for this logger rotation ### `stats` Displays server stats and number of active sessions -##### `list` +#### `list` Lists for active sessions (received, sent and how long session has been active for and associated clients if registered) +#### `dyn` +List dynamic buffer status + +#### `queue` +List processing queue status + +You can also run sampling on the queue by providing `sampling`. Thread will be stalled for the amount of seconds value you provide (valid value from 3 to 10) + +For example + +``` +stats queue sampling 6 --client-id unmanaged +``` + +It will list the speed of each queue, e.g, + +``` +Queue For: unmanaged Active: 8376 Backlog: 7442 Speed: 5 items/s (incl. bulk) +``` + +Sampling is done only on a non-empty _active_ queue + ##### `--client-id ` Filters stats for specified client. Some of the clients may not be listed as they're only registered when server receives anything from them. diff --git a/samples/extensions/failure-notify/README.md b/samples/extensions/failure-notify/README.md index 038cd06..6d6119c 100644 --- a/samples/extensions/failure-notify/README.md +++ b/samples/extensions/failure-notify/README.md @@ -9,7 +9,9 @@ To install this extension simply add following lines in `extensions` config "module": "path/to/failure-notify.so", "description": "Notifies the recipients when log failures have hit the threshold", "config": { + "script": "send.sh", "threshold": 20000, + "repeat": false, "recipients": ["operations@example.com"] } } diff --git a/samples/extensions/failure-notify/failure-notify.cc b/samples/extensions/failure-notify/failure-notify.cc index 0bfcf85..cbd392c 100644 --- a/samples/extensions/failure-notify/failure-notify.cc +++ b/samples/extensions/failure-notify/failure-notify.cc @@ -14,11 +14,16 @@ Extension::Result FailureNotify::execute(const DispatchErrorExtension::Data* con return {1, true}; } + if (!(conf().get("repeat", true)) && m_sent) { + return {0, true}; + } + m_failureCount++; if (m_failureCount >= conf().get("threshold", 200UL)) { notifyRecipients(data); m_failureCount = 0UL; + m_sent = true; } return {0, true}; @@ -33,11 +38,14 @@ void FailureNotify::notifyRecipients(const DispatchErrorExtension::Data* const d JsonDoc recipient(recipientNode); std::string email = recipient.as(""); writeLog("Notifying " + email); - - ss << "echo 'Residue dispatch error " - << std::strerror(data->errorNumber) << ". File: " - << data->filename << "' | mail -s 'residue err' " << email; + ss << conf().get("script", "send.sh") << " '" << std::strerror(data->errorNumber) << "' '" << data->filename << "' " << email; system(ss.str().c_str()); } } } + +void FailureNotify::reset() +{ + m_failureCount = 0; + m_sent = false; +} diff --git a/samples/extensions/failure-notify/failure-notify.h b/samples/extensions/failure-notify/failure-notify.h index a90f011..b213f8e 100644 --- a/samples/extensions/failure-notify/failure-notify.h +++ b/samples/extensions/failure-notify/failure-notify.h @@ -8,7 +8,7 @@ using namespace residue; class FailureNotify : public DispatchErrorExtension { public: - FailureNotify() : DispatchErrorExtension("failure-notify"), m_failureCount(0UL) + FailureNotify() : DispatchErrorExtension("failure-notify"), m_failureCount(0UL), m_sent(false) { } @@ -16,8 +16,10 @@ class FailureNotify : public DispatchErrorExtension { virtual ~FailureNotify() = default; virtual Extension::Result execute(const DispatchErrorExtension::Data* const) override; + virtual void reset() override; private: unsigned long m_failureCount; + bool m_sent; void notifyRecipients(const DispatchErrorExtension::Data* const data); }; diff --git a/samples/extensions/failure-notify/send.sh b/samples/extensions/failure-notify/send.sh new file mode 100755 index 0000000..430c1c8 --- /dev/null +++ b/samples/extensions/failure-notify/send.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "Residue dispatch error $1. File: $2" | mail -s 'residue err' $3 diff --git a/samples/keys/encrypted-privkey.pem b/samples/keys/encrypted-privkey.pem deleted file mode 100644 index 41701ac..0000000 --- a/samples/keys/encrypted-privkey.pem +++ /dev/null @@ -1,51 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIJJwIBAAKCAgEAuOrPTlTLs3FnwYHChUWgDuiE2rY6pM7OKVI5r1/kT3iflTMd -kc4N3NYjx0r5PMlvW+77ANvuu/9UGmcTsI4u1CT4l56Zn3jBDD+XUJZT6R7V7Bk8 -ISSDYjuetYoHb2cvlr1PENde5IvjQQq4iQeKFuM7tyYrWDBInryXZ1zSaJ2ufjxx -8/AuorIGHQclneUi/i8PwjJWIXiwkZmAEbzMed4Pq4ItcyRs+6pjujyzTNDNQ16u -omK09XmvK0ySF2gIPywFApXKnmIlUbzk1M/HSHnm7Wqr0MDZ7c2uRMGYKrDtylGP -nXq8qHOzc2/RyDVZLS8Ug0TXGPzDqyXaz3Np1dx+MzBybb6tKt2FDoOARiFXyios -YOpgYY+k+kLUDiHEQ4NQxkIQZ4C6Prua3IFeT987/aio45fBcUta4lUIEp0kNglp -MnRg8/BNr8dleTj1pRSXZnrnCDd+7jZaKQOX8xKPNmdZd36S0CYWn1E9LApzI0bv -/qSthyn4PzeirSI9acckmHvCHEdd5KoRvvhftiuLXfdZAsWKnzTiHR/OjSVPtEtc -2Yqikfxp8al/K4LBq8hS7ly9nGEH2h1mo2SjJVvajuj5Hj173t7Bb6npD3Vp/iog -RyLh2JQ8+0jSLHAnz3aP+rvCYrtNP1W/SOvHx3Au/RaJAPDxh5UxPlUZxqkCAREC -ggIAEwkc3qbYu+JDJr5MfqkMtj2VNKHRVLrvk0/+aKQ5YoinBBCOVonq1ruw3At7 -igWtWIXHA9pnomlXt2x+Qx2x/0AKiBQXV/IT32DmKi2XslnDMqg+qQ8Nhlj1/BmA -w/BE5hsEX9nvalmxvyqD7/0C68SRcPiilF9SxQviaMIGmdP/I5VH9zqbY5JTc+3p -hPW0ixtuzHYfdGMDHgwJazzY0E9Ub8XfCBaSv43GgFkSdRV+irMokLdWZJBKgLFL -RiyXb+otsTURF9VluVr5cEKQv+5uXDRkgqpwyWvC4W5IIoMniYhlX0Ua8zkWPNMK -TRSM8dPnwjgHFWVQYK1O/sRZ8/S1l4r4Qs2zunNXfvXTBF2prfmqJVCAys4YGmwx -MK1wlbo6xw7DPWfrR9PTFMa1IWcuPbuGWZooB7ZlNe0nCMHpqmfLMaY6I1tJNFZC -q5Juv2yh7GBY/EnIuLgRMs85sGVhj9c7rXmF/gmQZWCkJQzGk8/eprHvS7T3Hvfd -D/YuCwvafBex7pFZRtZO6+xZGiwhdAz+sTPvfm73Ng6/ZjhLSDt8TEnx9+/EjhIT -2DN/ahigTp32XNpyl2Man8ibbqmhdIEybNbYqG3uChPmp6FFHr9sS1ALi8SXVp75 -sCwSOEeh3Hh2oYjOKxBQAb+dfte3n6C3GcQv1Pq0odGOrf0CggEBAPwjo/47QKjv -HfIXb4Zvqo31PSz6sT5RyhSc7MY1CZzhUfLnLHDolphSQGOXGI/jkBu9Dtf1mxb1 -Jy2aoguFBF4djmgEHmQLS+O7CUFNyoq1A+4lOqHJ+lvoMBl4aTEGPL/M2jCSNxAz -CmOIr3++EvNEFcqBKxqAIAGSu7776X+ECCRghx4QkvPrjUWh3IpIyODUPxearcHy -NPI0g3CG+5fp6tt3SjcDpXE05sH0K8c0MsSAO6aDdY6ZgYR+pHv24UhQoIff2xUH -L4g9V/XwDqkJNaa8j9bEUjdt2pfhPtlqwhdX3vL01bKUerNnFvOdo/HYTUAMA5It -nDkD0fsBlDUCggEBALu/qn6CWv5Z1R1AO1/qKAWEhTq813l0nDtCeoYu4oJUHn9T -MK92Dq3DjbwBQ0KbMiJQBNeaKzmQkXsNoiUscAffnfpBeuYg4t7wF9JrdR/Dpr4E -DwyYDVf44EVfP/WF4DDFeTCDhq1KIffuqUtEjJsTur2yyVv0PpOXMhDVlktDP3jC -yZwVoGHF8GcoEyz+gVTf3BhdUnjLC8OVQ2ko1SFl5y4JhaneFk12gjfuy34lcl29 -bdxOoNGC4x9TE0vq7H8gXgxT/UqFJphXI4TvNq3BVK2Q27A/plTH1zrlGaF5xhUT -vhID/iDl/bq8s0ULnHNe6k+KTDYX1msJkYr3TyUCggEBAJRRM0pA+NvX839ZFG0y -oI+9b0eihmDk0TlNP/wfMtTAxssekpzFDUqKvHbRWb4NY9QU244YAOBT+O2mQTPz -5HOY6ltNt4YkwzqqQa3xhjN5iddhMYxYsWM9SXhk8pVO9o7w+NFG8za0nLMFG/DK -KUPNsncezg+WqWpWUFI51KVcuX7PXoon3fjk6bB9VI2UOe2qBv7TdUTo09nEiY18 -k/8CEa5GK6fkBvdMS4EmN96XLOwPMiW2vZ8tHwKk2TnchITkIjHO+Vep37mNfwkj -zGNu1EP2crqRmcZAnrOxrH/kcivZVfhTyP+iooemDYA+nKxhHmHo8wqxavRcmaK1 -onkCggEBAI+SkW/NGGgmk+kxHlhnxEB0ZeGfd5kc7/EUuAxB+IHH2xYScIYtGkik -mY/Euvay6ho9MOEbindQbz/7XeAxCmBftQqqiyhzgFAhAyhwSoGzu757kwmhc52g -MwffbSUqI+kPesq+7oSD+9u2gXXLAhw8Q4IBMJGcqFK+6gzfkQxgmfLvTuDFPmji -qMda4XzCnyLJTfSDmWtuGA4IuxQuSKERsMjaGutPend4ve6Jbm+GDCmQ24paPr5V -CAjlLN+zpcqgR+tPSTj8aM7ZOUeJwGbBBIS6EWipJNdrpJZy9Xt7PR8tNv63wlVk -lNoX1GH50f3fK6Y8lKHV/lHaJADbPIUCggEAEBmR1RSESuFEE9Yj0MmyAgTbYl4m -Ur6ev64MUzVtdzv+jnxRuaW1WXPoAPy42D7U/U5QWFuzAOyIldNHioeramkbwDue -qJ8wq1GXvlARZVtVqPvuLy+GBpNyrd08dfxWYqOzEwpULnkfEJqfJHOHbDremwUt -0Qi7VcWKEffS1v5gD2luOs1KfSHsr6OrQAt6eG3mep35kBEZ3gIiuru3uJFzMeJL -ggmD4lGpHwLemlvIxSrVVf+fPyuj2D02O0Hvup9EYH9+7YkpoTc4D/QdLX0JKc6P -W9vkBgHgvTQEr+QfPGhpJTlMBUMEgBNLTGp+bG6mQry8o+sBUaqbIRorHg== ------END RSA PRIVATE KEY----- diff --git a/samples/keys/encrypted-pubkey.pem b/samples/keys/encrypted-pubkey.pem deleted file mode 100644 index cfa78b8..0000000 --- a/samples/keys/encrypted-pubkey.pem +++ /dev/null @@ -1,9 +0,0 @@ ------BEGIN PUBLIC KEY----- -MIIBIDANBgkqhkiG9w0BAQEFAAOCAQ0AMIIBCAKCAQEAq5rcQoTQfEOPx0A1GRGc -PmTCZjeofZ6BFup6ZQcOzgDbZNKVWT2uJkMG3zXHojZZIfFl2nUSe3l/dxMGxvVi -WwmtoZtUvBv2yZhS33V1/DhWcef/F9G1Ni+PUAVPLH2SZGCggX5xpsvg6Ctv//nQ -FB+iP6dkcgEGkBikhrpYN6Dv0iKTer/QkSwxuwCqYJXLny9vVN4srzy3xWPtDaIe -5G8n3bZRc5jAY8VGViHxq9PSzTI5IYprdpl/luciaLfyd3eXrlcpn1UxdDDHkwSl -e7o4pmnEK0bw2rfAGj8OE1wBAZBDTqgV94BlORAhDZgdUITsyEYgkXjUvx2FZ8M2 -JwIBEQ== ------END PUBLIC KEY----- diff --git a/samples/keys/epri.pem b/samples/keys/epri.pem new file mode 100644 index 0000000..2c3aa91 --- /dev/null +++ b/samples/keys/epri.pem @@ -0,0 +1,30 @@ +-----BEGIN RSA PRIVATE KEY----- +Proc-Type: 4,ENCRYPTED +DEK-Info: AES-256-CBC,6B8E2DD75947B469A061FC706E612069 + +hLhpVBm/5npaIhoNDE4m6gLNpwUKd0PCPTq7s9BhPS6+2+vTS2XqrVQn/rH1qehZ +Cqv8LQEiRVNQpZ6BXfr2GDWr/4QyeBVcuFAviAk7euT/l4zkUtqK91gqqvLevDvg +0BQ799w921fu5dGw1MnTnz+PxqLveIzmE3fHa4MlUGXRk7FHn8uDI6CwI3w2fZG/ +LF1R86Dwk8TpZdXk0VUGmLrKrJms18ZD1q1jIUTzZVscWNQQaom1ODr2JVmqKyzP +u06wucrBVyiyouZ4PMKam0R+zOn71kwQV4Y7mW0Ze4J9PNSMu7JZq0gl4uRH5Zvg +387+HH3unb306QjiS2ZDpfLwg3qiGIAqxIrkDr3a5d0bVR3u6cTM0J+1V2aTvNp6 +oBDSkv4JgnPhvSxsI3EIOQs+7v6c3gY5BdykUEc6/1gTiIuixUEz+lqQ5M/8gdU6 +tHWu0JCVFO2iVAYbitWK6PM05ol47FZsQQUNFMOeJGGTG+o5bo7SuOllWVDBCzbt +ZEdGMX42aCM8ZgSDLbHMyWH7MrRDnWZJq5ifE/hD5UP6n+MVNa7zG5OG70Fw1pQA +DTlYPX7mMTO0VvkaGPmrJQ+rbqUhauVQl9ktlpd5d1qnVlWIoA5KiQgO2l9Wm9nm +3ECmM7lNydWw/8/yW5S3DM3LwzGHaVBjBGcK4y5Qb87CPWlV9CeMVI4AfT3VERtl +aoJiVzAj+ASz0/S/bid+9ezwIEnFat0O1wA+j+NIzd/s8zW+8/mJdo5qmIqC4+nX +bcYulM8MgpoDjSYOBiext5aNlHfffCrqcLCdXsxgDHZjv1tHw61QO9VPIbcrLanS +GEwgTGHY8QSmVcCr0nzvkSXyT2Ey5Rq0k+BluaX8nN3350qHw7nxiskysiREj/KB +rkgrvA/ZEUcPSCOYTlcFPiwtTSVlYL7jXtTJlJ811DgBONrFFuFIkrwyr57jogEN +RwCqQfa8pp4bjYKUsYpaRqHAF/tY29mMGqStGGF6/ekkJKLMlC0UCBVq5mxYnJOo +9bqNCz3h5kXM6+k3O6ntp5+64btftNd78NEGiJSrm/LSE7lJNNknmGf0Ew2UqOD8 +6EFlkjoyNU76a3bwAVc+ltWx8gBe2Txlu1jTFa1UuZhIyoLPeKG2sh/bew32YfNm +G9AfTezELYdPpta6aMko1u8tVk8ImbwARe83xgMVuF4OKHkA4KxXcq7hZmMZdinV +wDEkLG9KUD7odFaF1wPnpOTeFlCX3MiXpTWLFDquoXFzfiy9XF24J3oEJ6hnZkuv +r0lpjWjVCqEscHVLCqgvWbEqDhlINHyPXWbnCp3m/0ZmT8MkvOABCM5SDy/x6+qo +fYpWv8gKFyb0aU9Kd3tEK2kNQnCJJrRJPbdy0I9A9Kg8BEaTDsbanpXOirPqgm+s +ZI37WUpqyIbr2TRX8SRg3jbBhIcyfjRS5Qn3KA6UIo9SwN+XWChsdVU6xJ2Tlhy+ +5BPcqX4YWK+sXAvA6R8qgQ+KajZEi1Zutfpx73WSF0duSROdgNRpQvvP8QYNpRPU +8xlHgRmDXeQ4r5Dzdfjh9RfONek+LYL1R4XMe/O6x8NkG6a40PViJrt3L3VUQPMh +-----END RSA PRIVATE KEY----- diff --git a/samples/keys/epub.pem b/samples/keys/epub.pem new file mode 100644 index 0000000..fc65de1 --- /dev/null +++ b/samples/keys/epub.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIDANBgkqhkiG9w0BAQEFAAOCAQ0AMIIBCAKCAQEAkVl+xbOQqA8iSXWRNlYu +8Mu1wo9YrGnZUBV52gvR+8QHrYZztDMsgOSjoecvVAPsCbd9h7v2ZJk7sllN/RVs +QQXEjK0zAiASumvL3SWx9mqyFIESWdMNoWvGAQx7qmRv1+AMsgXVK9LrF7QNxXEm +BGZdavRP1MT0E/EvmD5HZvpS329CaBXjTYTYSCH8N4kmh/WAzlVSfwI8Hs/Fh7cJ +/7slaf2aHOi0ulhwXiM+Zt85/pbyAv77Zuysu/U3LJT0kNTTXZWEVWuBGX5Hazf9 +dUF9UQNatZjSiHvD7nzXOjvxAoP1cN3MfGhxF0nHBrpy6pJg93jcpsay5HmbdzTP +SwIBEQ== +-----END PUBLIC KEY----- diff --git a/samples/residue.conf.json b/samples/residue.conf.json index 6806f0f..66fc750 100644 --- a/samples/residue.conf.json +++ b/samples/residue.conf.json @@ -36,7 +36,7 @@ }, { "client_id": "muflihun00102031", - "public_key": "$RESIDUE_HOME/samples/keys/encrypted-pubkey.pem", + "public_key": "$RESIDUE_HOME/samples/keys/epub.pem", "loggers": ["residue"] } ], @@ -48,7 +48,8 @@ }, { "logger_id": "default", - "configuration_file": "$RESIDUE_HOME/samples/configurations/default-Logger.conf" + "configuration_file": "$RESIDUE_HOME/samples/configurations/default-Logger.conf", + "rotation_freq": "hourly" }, { "logger_id": "sample-app", diff --git a/src/cli/clients.cc b/src/cli/clients.cc index b62e654..41f0068 100644 --- a/src/cli/clients.cc +++ b/src/cli/clients.cc @@ -23,7 +23,6 @@ #include "core/configuration.h" #include "core/registry.h" -#include "logging/residue-log-dispatcher.h" #include "tasks/client-integrity-task.h" #include "utils/utils.h" @@ -31,8 +30,8 @@ using namespace residue; Clients::Clients(Registry* registry) : Command("clients", - "List, add or remove clients from the server configuration", - "clients [list] [clean]", + "List clients available on the server (dead or alive)", + "clients [list] [clean] [--with-key] [--with-token]", registry) { } @@ -43,7 +42,7 @@ void Clients::execute(std::vector&& params, std::ostringstream& res result << "Clients: " << registry()->clients().size() << std::endl; } if (hasParam(params, "list")) { - list(result, hasParam(params, "--with-key")); + list(result, ¶ms); } else if (hasParam(params, "clean")) { if (registry()->clientIntegrityTask()->isExecuting()) { result << "\nAlready running, please try again later" << std::endl;; @@ -54,27 +53,10 @@ void Clients::execute(std::vector&& params, std::ostringstream& res registry()->clientIntegrityTask()->kickOff(); result << "\nFinished client integrity task" << std::endl; } - } else if (hasParam(params, "checkdyn")) { - // check dynamic buffer - ResidueLogDispatcher* dispatcher = el::Helpers::logDispatchCallback("ResidueLogDispatcher"); - if (dispatcher != nullptr) { - if (dispatcher->m_dynamicBuffer.empty()) { - result << "Dynamic buffer is empty"; - } else { - result << "Dynamic buffer information:\n"; - for (auto& pair : dispatcher->m_dynamicBuffer) { - result << "Logger: " << pair.second.logger->id() << "\t"; - result << "Filename: " << pair.first << "\t"; - result << "Items: " << pair.second.lines.size() << "\n"; - } - } - } else { - result << "Could not extract dispatcher"; - } } } -void Clients::list(std::ostringstream& result, bool withKey) const +void Clients::list(std::ostringstream& result, const std::vector* paramsPtr) const { int i = 1; for (auto& c : registry()->clients()) { @@ -83,11 +65,15 @@ void Clients::list(std::ostringstream& result, bool withKey) const << ", Age: " << (Utils::now() - c.second.dateCreated()) << "s" << ", Status: " << (!c.second.isAlive() ? "DEAD" : "ALIVE " + std::to_string(c.second.age() - (Utils::now() - c.second.dateCreated())) + "s"); - if (withKey) { - result << ", Key: " << c.second.key(); + if (hasParam(*paramsPtr, "--with-token")) { + result << ", Token: " << c.second.token(); } - if (!c.second.backupKey().empty()) { - result << ", Backup Key: " << c.second.backupKey(); + if (hasParam(*paramsPtr, "--with-key")) { + result << ", Key: " << c.second.key(); + + if (!c.second.backupKey().empty()) { + result << ", Backup Key: " << c.second.backupKey(); + } } result << std::endl; } diff --git a/src/cli/clients.h b/src/cli/clients.h index d19fc66..37c6583 100644 --- a/src/cli/clients.h +++ b/src/cli/clients.h @@ -39,7 +39,7 @@ class Clients final : public Command virtual void execute(std::vector&&, std::ostringstream&, bool) const override; private: - void list(std::ostringstream&, bool withKey) const; + void list(std::ostringstream&, const std::vector*) const; }; } diff --git a/src/cli/stats.cc b/src/cli/stats.cc index daa0a49..a4ec1b5 100644 --- a/src/cli/stats.cc +++ b/src/cli/stats.cc @@ -21,16 +21,20 @@ #include "cli/stats.h" +#include + #include "core/client.h" #include "core/registry.h" +#include "logging/log-request-handler.h" +#include "logging/residue-log-dispatcher.h" #include "utils/utils.h" using namespace residue; Stats::Stats(Registry* registry) : Command("stats", - "Displays current session details e.g, active sessions etc", - "stats [list] [--client-id ]", + "Displays current session details e.g, active sessions, queue and buffer info etc", + "stats [list] [dyn] [queue] [--client-id ]", registry) { } @@ -72,5 +76,60 @@ void Stats::execute(std::vector&& params, std::ostringstream& resul result << ": " << registry()->activeSessions().size() << std::endl; } result << tmpR.str(); + } else if (hasParam(params, "dyn")) { + // check dynamic buffer + ResidueLogDispatcher* dispatcher = el::Helpers::logDispatchCallback("ResidueLogDispatcher"); + if (dispatcher != nullptr) { + if (dispatcher->m_dynamicBuffer.empty()) { + result << "Dynamic buffer is empty"; + } else { + result << "Dynamic buffer information:\n"; + for (auto& pair : dispatcher->m_dynamicBuffer) { + result << "Logger: " << pair.second.logger->id() << "\t"; + result << "Filename: " << pair.first << "\t"; + result << "Items: " << pair.second.lines.size() << "\n"; + } + } + } else { + result << "Could not extract dispatcher"; + } + } else if (hasParam(params, "queue")) { + auto displayQueueStat = [&](const std::string& clientId) { + auto pos = registry()->logRequestHandler()->m_queueProcessor.find(clientId); + if (pos == registry()->logRequestHandler()->m_queueProcessor.end()) { + result << "ERR: Client not registered in processor"; + } else { + const ClientQueueProcessor* processor = registry()->logRequestHandler()->m_queueProcessor.at(clientId).get(); + result << "Queue For: " << std::setw(20) << std::left << clientId << " "; + result << "Active:" << std::setw(6) << std::right << processor->m_queue.size() << " "; + result << "Backlog:" << std::setw(6) << std::right << processor->m_queue.backlogSize(); + if (hasParam(params, "sampling")) { + std::string sampleCount = getParamValue(params, "sampling"); + if (sampleCount.empty()) { + sampleCount = "3"; + } + int sc = atoi(sampleCount.c_str()); + if (sc < 1 || sc > 10) { + sc = 3; + } + auto size = processor->m_queue.size(); + if (size > 0) { + std::this_thread::sleep_for(std::chrono::seconds(sc)); + auto newSize = processor->m_queue.size(); + result << " Speed:" << std::setw(3) << std::right << (size - newSize) / sc << " items/s (incl. bulk)"; + } + } + result << "\n"; + } + }; + std::string clientId = getParamValue(params, "--client-id"); + if (clientId.empty()) { + for (auto& pair : registry()->logRequestHandler()->m_queueProcessor) { + clientId = pair.first; + displayQueueStat(clientId); + } + } else { + displayQueueStat(clientId); + } } } diff --git a/src/connect/connection-request-handler.cc b/src/connect/connection-request-handler.cc index 44d5880..a564248 100644 --- a/src/connect/connection-request-handler.cc +++ b/src/connect/connection-request-handler.cc @@ -135,6 +135,7 @@ void ConnectionRequestHandler::connect(ConnectionRequest* request, const std::sh // To reduce size of the data clonedClient.setAge(0); clonedClient.setDateCreated(0); + clonedClient.removeToken(); ConnectionResponse response(&clonedClient); std::string output; response.serialize(output); diff --git a/src/connect/connection-response.cc b/src/connect/connection-response.cc index 52f84e2..77c7eff 100644 --- a/src/connect/connection-response.cc +++ b/src/connect/connection-response.cc @@ -35,6 +35,7 @@ ConnectionResponse::ConnectionResponse(const Client* client, const Configuration m_loggingPort(0), m_key(client->key()), m_clientId(client->id()), + m_clientToken(client->token()), m_clientAge(client->age()), m_clientDateCreated(client->dateCreated()), m_isAcknowledged(client->acknowledged()) @@ -77,18 +78,21 @@ void ConnectionResponse::serialize(std::string& output) const doc.addValue("flags", static_cast(m_configuration->flag())) .addValue("max_bulk_size", static_cast(m_configuration->maxItemsInBulk())) .startObject("server_info") - .addValue("version", ss.str().c_str()) + .addValue("version", ss.str()) .endObject(); } if (!m_errorText.empty()) { - doc.addValue("error_text", m_errorText.c_str()); + doc.addValue("error_text", m_errorText); } if (!m_key.empty()) { doc.addValue("key", m_key.c_str()); } if (!m_clientId.empty()) { - doc.addValue("client_id", m_clientId.c_str()); + doc.addValue("client_id", m_clientId); + } + if (!m_clientToken.empty()) { + doc.addValue("token", m_clientToken); } if (m_loggingPort != 0) { doc.addValue("logging_port", m_loggingPort); diff --git a/src/connect/connection-response.h b/src/connect/connection-response.h index d2ce68f..03f9a8c 100644 --- a/src/connect/connection-response.h +++ b/src/connect/connection-response.h @@ -57,6 +57,7 @@ class ConnectionResponse final : public Response int m_loggingPort; std::string m_key; std::string m_clientId; + std::string m_clientToken; unsigned int m_clientAge; types::Time m_clientDateCreated; bool m_isAcknowledged; diff --git a/src/core/client.cc b/src/core/client.cc index a2cc548..d5e9d45 100644 --- a/src/core/client.cc +++ b/src/core/client.cc @@ -44,6 +44,7 @@ Client::Client(const ConnectionRequest* request): { m_key = AES::generateKey(request->keySize()); resetDateCreated(); + m_token = AES::generateKey(128); } Client::~Client() diff --git a/src/core/client.h b/src/core/client.h index ffc4497..8005a2c 100644 --- a/src/core/client.h +++ b/src/core/client.h @@ -35,12 +35,12 @@ class Registry; /// /// \brief Single client object known to the server /// -class Client +class Client final { public: explicit Client(const ConnectionRequest* request); - virtual ~Client(); + ~Client(); inline bool operator==(const std::string& id) const { @@ -57,6 +57,16 @@ class Client return m_id; } + inline const std::string& token() const + { + return m_token; + } + + inline void removeToken() + { + m_token = ""; + } + inline bool acknowledged() const { return m_acknowledged; @@ -152,6 +162,8 @@ class Client std::string m_key; int m_keySize; + std::string m_token; + bool m_acknowledged; bool m_isManaged; diff --git a/src/core/configuration.cc b/src/core/configuration.cc index f99a8e4..82e2678 100644 --- a/src/core/configuration.cc +++ b/src/core/configuration.cc @@ -274,8 +274,8 @@ void Configuration::loadFromInput(std::string&& jsonStr) m_dispatchDelay = 1; } m_maxItemsInBulk = m_jsonDoc.get("max_items_in_bulk", 5); - if (m_maxItemsInBulk <= 1 || m_maxItemsInBulk >= 100) { - errorStream << " Invalid value for [max_items_in_bulk]. Please choose between 2-100" << std::endl; + if (m_maxItemsInBulk < 5 || m_maxItemsInBulk > 500) { + errorStream << " Invalid value for [max_items_in_bulk]. Please choose between 5-500" << std::endl; } diff --git a/src/crash-handlers.h b/src/crash-handlers.h index 9050264..62d08fd 100644 --- a/src/crash-handlers.h +++ b/src/crash-handlers.h @@ -23,6 +23,9 @@ #define CrashHandlers_h #include "logging/log.h" +#include "utils/utils.h" + +using namespace residue; static bool s_exitOnInterrupt = false; @@ -39,6 +42,7 @@ void interruptHandler(int) void generalTerminateHandler(int sig, bool showMsg) { if (showMsg) { + std::cerr << Utils::formatTime(Utils::now(), "%d/%M/%Y %h:%m:%s") << " "; std::cerr << "Application abnormally terminated." << std::endl; std::cerr << "Please report it to us on https://github.com/muflihun/residue/issues/ " << std::endl; } diff --git a/src/extensions/dispatch-error-extension.h b/src/extensions/dispatch-error-extension.h index ca3725b..d87fcf4 100644 --- a/src/extensions/dispatch-error-extension.h +++ b/src/extensions/dispatch-error-extension.h @@ -81,6 +81,11 @@ class DispatchErrorExtension : public Extension /// virtual Extension::Result execute(const Data* const data) = 0; + /// + /// \brief Called when previously failed is no longer failing + /// + virtual void reset() {} + private: virtual Extension::Result executeWrapper(void* d) override; }; diff --git a/src/logging/client-queue-processor.cc b/src/logging/client-queue-processor.cc index 7507018..2ec9eeb 100644 --- a/src/logging/client-queue-processor.cc +++ b/src/logging/client-queue-processor.cc @@ -88,9 +88,16 @@ void ClientQueueProcessor::processRequestQueue() #endif m_registry->clientIntegrityTask()->pauseClient(m_clientId); } - +#ifdef RESIDUE_DEBUG + DRVLOG_IF(total > 0, RV_CRAZY) << "Items: " << total; +#endif for (std::size_t i = 0; i < total; ++i) { +#ifdef RESIDUE_HIGH_RESOLUTION_PROFILING + types::Time m_timeTakenByItem; + RESIDUE_PROFILE_START(t_process_item); +#endif + if (m_registry->configuration()->dispatchDelay() > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(m_registry->configuration()->dispatchDelay())); } @@ -104,9 +111,13 @@ void ClientQueueProcessor::processRequestQueue() // get another reference to shared pointer for session std::shared_ptr session = rawRequest.session; + RESIDUE_HIGH_PROFILE_CHECKPOINT_MIS(t_process_item, m_timeTakenByItem, 1, 1); + RequestHandler::handle(std::move(rawRequest), &request, Request::StatusCode::BAD_REQUEST, false, false, compressionEnabled); + RESIDUE_HIGH_PROFILE_CHECKPOINT_MIS(t_process_item, m_timeTakenByItem, 2, 1); + if ((!request.isValid() && !request.isBulk()) || request.statusCode() != Request::StatusCode::OK) { RVLOG(RV_ERROR) << "Failed: " << request.errorText(); @@ -126,6 +137,12 @@ void ClientQueueProcessor::processRequestQueue() DRVLOG(RV_DEBUG) << "Request client: " << request.client(); #endif for (const auto& js : request.jsonObject()) { + +#ifdef RESIDUE_HIGH_RESOLUTION_PROFILING + types::Time m_timeTakenByBulkItem; + RESIDUE_PROFILE_START(t_process_bulk_item); +#endif + if (itemCount == maxItemsInBulk) { RLOG(ERROR) << "Maximum number of bulk requests reached. Ignoring the rest of items in bulk"; break; @@ -138,6 +155,9 @@ void ClientQueueProcessor::processRequestQueue() requestItem.setDateReceived(request.dateReceived()); requestItem.deserialize(std::move(requestItemStr)); + + RESIDUE_HIGH_PROFILE_CHECKPOINT_MIS(t_process_bulk_item, m_timeTakenByBulkItem, 1, 1); + if (requestItem.isValid()) { requestItem.setIpAddr(request.ipAddr()); requestItem.setSessionId(request.sessionId()); @@ -156,6 +176,7 @@ void ClientQueueProcessor::processRequestQueue() } else { RLOG(ERROR) << "Invalid request in bulk."; } + RESIDUE_HIGH_PROFILE_CHECKPOINT_MIS(t_process_bulk_item, m_timeTakenByBulkItem, 2, 1); } } else { RLOG(ERROR) << "Bulk requests are not allowed"; @@ -204,8 +225,12 @@ void ClientQueueProcessor::processRequestQueue() float timeTakenInSec = static_cast(m_timeTaken / 1000.0f); RLOG_IF(total > 0, DEBUG) << "Took " << timeTakenInSec << "s to process the queue of " << total << " items (" << totalRequests << " requests). Average: " - << (static_cast(m_timeTaken) / static_cast(total)) << "ms/item [" - << (static_cast(m_timeTaken) / static_cast(totalRequests)) << "ms/request]"; + << (static_cast(m_timeTaken) / static_cast(total)) << "ms/item ≈ " + << (timeTakenInSec > 0 ? std::ceil(static_cast(total) / timeTakenInSec) : -1) << " items/s " + << "[" << (static_cast(m_timeTaken) / static_cast(totalRequests)) << "ms/req" + << " ≈ " << (timeTakenInSec > 0 ? std::ceil(static_cast(totalRequests) / timeTakenInSec) : -1) + << " req/s" + << "]"; #endif m_queue.switchContext(); @@ -213,6 +238,11 @@ void ClientQueueProcessor::processRequestQueue() bool ClientQueueProcessor::processRequest(LogRequest* request, Client** clientRef, bool forceCheck, Session *session) { +#ifdef RESIDUE_HIGH_RESOLUTION_PROFILING + types::Time m_timeTakenProcessRequest; + RESIDUE_PROFILE_START(t_process_request); +#endif + bool bypassChecks = !forceCheck && clientRef != nullptr && *clientRef != nullptr; #ifdef RESIDUE_DEV DRVLOG(RV_DEBUG_2) << "Force check: " << forceCheck << ", clientRef: " << clientRef << ", *clientRef: " @@ -226,6 +256,8 @@ bool ClientQueueProcessor::processRequest(LogRequest* request, Client** clientRe return false; } + RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(t_process_request, m_timeTakenProcessRequest, 1, 1); + if (!bypassChecks && !client->isAlive(request->dateReceived())) { RLOG(ERROR) << "Invalid request. Client is dead"; RLOG(DEBUG) << "Req received: " << request->dateReceived() << ", client created: " << client->dateCreated() << ", age: " << client->age() << ", result: " << client->dateCreated() + client->age(); @@ -240,6 +272,8 @@ bool ClientQueueProcessor::processRequest(LogRequest* request, Client** clientRe session->setClient(client); } + RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(t_process_request, m_timeTakenProcessRequest, 2, 1); + if (!bypassChecks && client->isManaged()) { // take this opportunity to update the user for unmanaged logger @@ -259,7 +293,11 @@ bool ClientQueueProcessor::processRequest(LogRequest* request, Client** clientRe RLOG(WARNING) << "Ignoring log from unauthorized logger [" << request->loggerId() << "]"; return false; } + RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(t_process_request, m_timeTakenProcessRequest, 3, 2); + dispatch(request); + + RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(t_process_request, m_timeTakenProcessRequest, 4, 3); return true; } return false; diff --git a/src/logging/client-queue-processor.h b/src/logging/client-queue-processor.h index ffb0912..fff385c 100644 --- a/src/logging/client-queue-processor.h +++ b/src/logging/client-queue-processor.h @@ -81,6 +81,8 @@ class ClientQueueProcessor final : public RequestHandler std::thread m_worker; JsonDoc m_jsonDocForBulk; + friend class Stats; + //// /// \brief Dispatches the request using custom user message /// diff --git a/src/logging/log-request-handler.h b/src/logging/log-request-handler.h index 5217579..dc57b0f 100644 --- a/src/logging/log-request-handler.h +++ b/src/logging/log-request-handler.h @@ -54,6 +54,8 @@ class LogRequestHandler final : public RequestHandler virtual void handle(RawRequest&&); private: std::unordered_map> m_queueProcessor; + + friend class Stats; }; } #endif /* LogRequestHandler_h */ diff --git a/src/logging/log.h b/src/logging/log.h index d8fadf0..76fddbd 100644 --- a/src/logging/log.h +++ b/src/logging/log.h @@ -63,16 +63,34 @@ #define RESIDUE_UNUSED(x) (void)x #ifdef RESIDUE_PROFILING +# define RESIDUE_PROFILE_CHECKPOINT_TEMPL(id, result, idx, refidx, type, unit) std::chrono::high_resolution_clock::time_point id##chkpnt_3##idx = std::chrono::high_resolution_clock::now();\ + result = std::chrono::duration_cast( id##chkpnt_3##idx - id##1 ).count();\ + {auto resultforcompar = std::chrono::duration_cast( id##chkpnt_3##idx - id##chkpnt_3##refidx ).count();\ + RLOG(DEBUG) << #id << idx << " from base (" << #id << "1) " << result << " " << unit << " (from checkpoint " << #id << refidx << ": " << resultforcompar << " " << unit ")";} # define RESIDUE_PROFILE_START(id) std::chrono::high_resolution_clock::time_point id##1 = std::chrono::high_resolution_clock::now(); RESIDUE_UNUSED(id##1) # define RESIDUE_PROFILE_END(id, result) std::chrono::high_resolution_clock::time_point id##2 = std::chrono::high_resolution_clock::now();\ result = std::chrono::duration_cast( id##2 - id##1 ).count(); -# define RESIDUE_PROFILE_CHECKPOINT(id, result, idx) std::chrono::high_resolution_clock::time_point id##3##idx = std::chrono::high_resolution_clock::now();\ - result = std::chrono::duration_cast( id##3##idx - id##1 ).count();\ - std::cout << idx << " checkpoint at " << result << " ms\n"; +# define RESIDUE_PROFILE_CHECKPOINT(id, result, idx, refidx) RESIDUE_PROFILE_CHECKPOINT_TEMPL(id, result, idx, refidx, std::chrono::milliseconds, "ms") +# define RESIDUE_PROFILE_CHECKPOINT_NS(id, result, idx, refidx) RESIDUE_PROFILE_CHECKPOINT_TEMPL(id, result, idx, refidx, std::chrono::nanoseconds, "nans") +# define RESIDUE_PROFILE_CHECKPOINT_MIS(id, result, idx, refidx) RESIDUE_PROFILE_CHECKPOINT_TEMPL(id, result, idx, refidx, std::chrono::microseconds, "mis") + #else # define RESIDUE_PROFILE_START(id) # define RESIDUE_PROFILE_END(id, result) -# define RESIDUE_PROFILE_CHECKPOINT(id, result, idx) +# define RESIDUE_PROFILE_CHECKPOINT(id, result, idx, refidx) +# define RESIDUE_PROFILE_CHECKPOINT_MIS(id, result, idx, refidx) +# define RESIDUE_PROFILE_CHECKPOINT_NS(id, result, idx, refidx) #endif // RESIDUE_PROFILING +#if defined(RESIDUE_DEV) && defined(RESIDUE_PROFILING) +#define RESIDUE_HIGH_RESOLUTION_PROFILING +# define RESIDUE_HIGH_PROFILE_CHECKPOINT(id, result, idx, refidx) RESIDUE_PROFILE_CHECKPOINT(id, result, idx, refidx) +# define RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(id, result, idx, refidx) RESIDUE_PROFILE_CHECKPOINT_NS(id, result, idx, refidx) +# define RESIDUE_HIGH_PROFILE_CHECKPOINT_MIS(id, result, idx, refidx) RESIDUE_PROFILE_CHECKPOINT_MIS(id, result, idx, refidx) +#else +# define RESIDUE_HIGH_PROFILE_CHECKPOINT(id, result, idx, refidx) +# define RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(id, result, idx, refidx) +# define RESIDUE_HIGH_PROFILE_CHECKPOINT_MIS(id, result, idx, refidx) +#endif + #endif // LOG_H diff --git a/src/logging/logging-queue.h b/src/logging/logging-queue.h index defc253..a881466 100644 --- a/src/logging/logging-queue.h +++ b/src/logging/logging-queue.h @@ -47,22 +47,22 @@ class LoggingQueue final : NonCopyable RawRequest pull(); - inline bool empty() + inline bool empty() const { return m_dispatchQueue->empty(); } - inline std::size_t size() + inline std::size_t size() const { return m_dispatchQueue->size(); } - inline bool backlogEmpty() + inline bool backlogEmpty() const { return m_backlogQueue->empty(); } - inline bool backlogSize() + inline std::size_t backlogSize() const { return m_backlogQueue->size(); } diff --git a/src/logging/residue-log-dispatcher.h b/src/logging/residue-log-dispatcher.h index 943ff68..2732e7f 100644 --- a/src/logging/residue-log-dispatcher.h +++ b/src/logging/residue-log-dispatcher.h @@ -106,6 +106,19 @@ class ResidueLogDispatcher final : public el::LogDispatchCallback, NonCopyable logLine, el::LevelHelper::castToInt(level), errno); + if (logger->id() != RESIDUE_LOGGER_ID) { + // recovery check for dynamic buffer + std::ofstream oftmp(fn.c_str(), std::ios::out | std::ios::app); + if (oftmp.is_open()) { + oftmp << "=== [residue] ==> dynamic buffer recovery check ===\n"; + oftmp.flush(); + if (!oftmp.fail()) { + fs->clear(); + RLOG_IF(logger->id() != RESIDUE_LOGGER_ID, INFO) << "Dynamic buffer recovery check passed for [" << fn << "]"; + } + oftmp.close(); + } + } } else { if (ELPP->hasFlag(el::LoggingFlag::ImmediateFlush) || (logger->isFlushNeeded(level))) { logger->flush(level, fs); @@ -113,6 +126,10 @@ class ResidueLogDispatcher final : public el::LogDispatchCallback, NonCopyable successfullyWritten = true; dispatchDynamicBuffer(fn, fs, logger); + + if (m_previouslyFailed && logger->id() != RESIDUE_LOGGER_ID) { + resetErrorExtensions(); // this resets m_previouslyFailed as well + } } } else { RLOG_IF(logger->id() != RESIDUE_LOGGER_ID, ERROR) @@ -137,8 +154,9 @@ class ResidueLogDispatcher final : public el::LogDispatchCallback, NonCopyable // map of filename -> FailedLogs std::unordered_map m_dynamicBuffer; std::recursive_mutex m_dynamicBufferLock; + std::atomic m_previouslyFailed; - friend class Clients; + friend class Stats; void execLogExtensions(const el::LogDispatchData* data, const el::base::type::string_t& logLine, @@ -175,7 +193,8 @@ class ResidueLogDispatcher final : public el::LogDispatchCallback, NonCopyable unsigned int level, int errorNo) { - if (m_configuration->dispatchErrorExtensions().empty()) { + if (m_configuration->dispatchErrorExtensions().empty() + || loggerId == RESIDUE_LOGGER_ID) { return; } DispatchErrorExtension::Data d { @@ -188,6 +207,18 @@ class ResidueLogDispatcher final : public el::LogDispatchCallback, NonCopyable for (auto& ext : m_configuration->dispatchErrorExtensions()) { ext->trigger(&d); } + m_previouslyFailed = true; + } + + void resetErrorExtensions() + { + if (m_configuration->dispatchErrorExtensions().empty()) { + return; + } + for (auto& ext : m_configuration->dispatchErrorExtensions()) { + static_cast(ext)->reset(); + } + m_previouslyFailed = false; } void addToDynamicBuffer(el::Logger* logger, const std::string& filename, const std::string& logLine) @@ -224,8 +255,9 @@ class ResidueLogDispatcher final : public el::LogDispatchCallback, NonCopyable // close/open again fs->close(); fs->open(fn, std::ios::out | std::ios::app); - - *fs << "=== [residue] ==> " << list->size() << " log" << (list->size() > 1 ? "s" : "") << " from dynamic buffer ===\n"; + auto size = list->size(); + auto dynamicBufferClearStart = std::chrono::high_resolution_clock::now(); + *fs << "=== [residue] ==> " << size << " log" << (size > 1 ? "s" : "") << " from dynamic buffer ===\n"; for (auto& line : *list) { // process all items fs->write(line.c_str(), line.size()); @@ -235,7 +267,11 @@ class ResidueLogDispatcher final : public el::LogDispatchCallback, NonCopyable } } - *fs << "=== [residue] ==> dynamic buffer cleared ===\n"; + auto dynamicBufferClearEnd = std::chrono::high_resolution_clock::now(); + *fs << "=== [residue] ==> dynamic buffer cleared (" << size << " log" + << (size > 1 ? "s" : "") << " written in " + << std::chrono::duration_cast(dynamicBufferClearEnd - dynamicBufferClearStart).count() + << " ms) ===\n"; fs->flush(); } m_dynamicBuffer.erase(fn); diff --git a/src/logging/user-log-builder.cc b/src/logging/user-log-builder.cc index 4f606d9..b4919aa 100644 --- a/src/logging/user-log-builder.cc +++ b/src/logging/user-log-builder.cc @@ -29,6 +29,11 @@ using namespace residue; el::base::type::string_t UserLogBuilder::build(const el::LogMessage* msg, bool appendNewLine) const { + +#ifdef RESIDUE_HIGH_RESOLUTION_PROFILING + types::Time m_timeTakenLogBuilder; + RESIDUE_PROFILE_START(t_log_builder); +#endif const UserMessage* logMessage = static_cast(msg); el::base::TypedConfigurations* tc = logMessage->logger()->typedConfigurations(); if (tc == nullptr || logMessage->request() == nullptr) { @@ -36,11 +41,13 @@ el::base::type::string_t UserLogBuilder::build(const el::LogMessage* msg, std::cout << "Unexpectedly NULL request!, msg => [" << logMessage->message() << "]" << std::endl; return ""; } + const el::base::LogFormat* logFormat = &(tc->logFormat(logMessage->request()->level())); el::base::type::string_t logLine = logFormat->format(); char buff[el::base::consts::kSourceFilenameMaxLength + el::base::consts::kSourceLineMaxLength] = ""; const char* bufLim = buff + sizeof(buff); + RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(t_log_builder, m_timeTakenLogBuilder, 1, 1); if (logFormat->hasFlag(el::base::FormatFlags::AppName)) { // App name el::base::utils::Str::replaceFirstWithEscape(logLine, el::base::consts::kAppNameFormatSpecifier, @@ -99,6 +106,8 @@ el::base::type::string_t UserLogBuilder::build(const el::LogMessage* msg, el::base::utils::Str::replaceFirstWithEscape(logLine, el::base::consts::kMessageFormatSpecifier, logMessage->request()->msg()); } + RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(t_log_builder, m_timeTakenLogBuilder, 2, 1); + el::base::utils::Str::replaceFirstWithEscape(logLine, "%client_id", logMessage->request()->clientId()); el::base::utils::Str::replaceFirstWithEscape(logLine, "%ip", logMessage->request()->ipAddr()); el::base::utils::Str::replaceFirstWithEscape(logLine, "%session_id", logMessage->request()->sessionId()); @@ -110,6 +119,10 @@ el::base::type::string_t UserLogBuilder::build(const el::LogMessage* msg, el::base::utils::Str::replaceFirstWithEscape(logLine, wcsFormatSpecifier, std::string(cfs.resolver()(logMessage))); } #endif // !defined(ELPP_DISABLE_CUSTOM_FORMAT_SPECIFIERS) - if (appendNewLine) logLine += ELPP_LITERAL("\n"); + + if (appendNewLine) { + logLine += ELPP_LITERAL("\n"); + } + RESIDUE_HIGH_PROFILE_CHECKPOINT_NS(t_log_builder, m_timeTakenLogBuilder, 3, 2); return logLine; } diff --git a/src/main.cc b/src/main.cc index 18e166a..a8b29e7 100644 --- a/src/main.cc +++ b/src/main.cc @@ -33,7 +33,6 @@ #include #include -#include "net/asio.h" #include "ripe/Ripe.h" #include "admin/admin-request-handler.h" @@ -243,30 +242,27 @@ int main(int argc, char* argv[]) // admin server threads.push_back(std::thread([&]() { el::Helpers::setThreadName("AdminHandler"); - net::io_service io_service; AdminRequestHandler newAdminRequestHandler(®istry, &commandHandler); - Server svr(io_service, config.adminPort(), &newAdminRequestHandler); - io_service.run(); + Server svr(config.adminPort(), &newAdminRequestHandler); + svr.start(); })); // connect server threads.push_back(std::thread([&]() { el::Helpers::setThreadName("ConnectionHandler"); - net::io_service io_service; ConnectionRequestHandler newConnectionRequestHandler(®istry); - Server svr(io_service, config.connectPort(), &newConnectionRequestHandler); - io_service.run(); + Server svr(config.connectPort(), &newConnectionRequestHandler); + svr.start(); })); // log server threads.push_back(std::thread([&]() { el::Helpers::setThreadName("LogHandler"); - net::io_service io_service; LogRequestHandler logRequestHandler(®istry); logRequestHandler.start(); // Start handling incoming requests registry.setLogRequestHandler(&logRequestHandler); - Server svr(io_service, config.loggingPort(), &logRequestHandler); - io_service.run(); + Server svr(config.loggingPort(), &logRequestHandler); + svr.start(); })); // client integrity task diff --git a/src/net/server.cc b/src/net/server.cc index 9522c67..bb8510c 100644 --- a/src/net/server.cc +++ b/src/net/server.cc @@ -33,9 +33,9 @@ using namespace residue; using net::ip::tcp; -Server::Server(net::io_service& io_service, int port, RequestHandler* requestHandler) : - m_acceptor(io_service, tcp::endpoint(tcp::v4(), port)), - m_socket(io_service), +Server::Server(int port, RequestHandler* requestHandler) : + m_acceptor(m_ioService, tcp::endpoint(tcp::v4(), port)), + m_socket(m_ioService), m_requestHandler(requestHandler) { accept(); @@ -57,3 +57,8 @@ void Server::accept() accept(); }); } + +void Server::start() +{ + m_ioService.run(); +} diff --git a/src/net/server.h b/src/net/server.h index 7b8be40..650b967 100644 --- a/src/net/server.h +++ b/src/net/server.h @@ -38,11 +38,15 @@ class RequestHandler; class Server final : NonCopyable { public: - Server(net::io_service& io_service, int port, RequestHandler* requestHandler); + Server(int port, RequestHandler* requestHandler); ~Server(); + + void start(); + private: void accept(); + net::io_service m_ioService; tcp::acceptor m_acceptor; tcp::socket m_socket; diff --git a/src/net/session.cc b/src/net/session.cc index 576be95..feb0db3 100644 --- a/src/net/session.cc +++ b/src/net/session.cc @@ -67,8 +67,9 @@ void Session::read() net::async_read_until(m_socket, m_streamBuffer, Session::PACKET_DELIMITER, [&, this, self](residue::error_code ec, std::size_t numOfBytes) { -#ifdef RESIDUE_PROFILING +#ifdef RESIDUE_HIGH_RESOLUTION_PROFILING //types::Time m_timeTaken; + //RESIDUE_PROFILE_START(t_read); #endif if (!ec) { RESIDUE_PROFILE_START(t_read); @@ -80,9 +81,9 @@ void Session::read() std::istream is(&m_streamBuffer); std::string buffer((std::istreambuf_iterator(is)), std::istreambuf_iterator()); buffer.erase(numOfBytes - Session::PACKET_DELIMITER_SIZE); - //RESIDUE_PROFILE_CHECKPOINT(t_read, m_timeTaken, 1); + //RESIDUE_HIGH_PROFILE_CHECKPOINT(t_read, m_timeTaken, 1, 1); sendToHandler(std::move(buffer)); - //RESIDUE_PROFILE_CHECKPOINT(t_read, m_timeTaken, 2); + //RESIDUE_HIGH_PROFILE_CHECKPOINT(t_read, m_timeTaken, 2, 1); if (m_requestHandler->registry()->configuration()->hasFlag(Configuration::ENABLE_CLI)) { #ifdef RESIDUE_DEV DRVLOG(RV_TRACE) << "Adding bytes"; @@ -166,6 +167,7 @@ void Session::write(const std::string& s) void Session::write(const char* data, std::size_t length) { + auto self(shared_from_this()); if (m_requestHandler->registry()->configuration()->hasFlag(Configuration::ENABLE_CLI)) { Utils::bigAdd(m_bytesSent, std::to_string(length)); m_requestHandler->registry()->addBytesSent(length); @@ -175,7 +177,8 @@ void Session::write(const char* data, DRVLOG(RV_DEBUG_2) << "Sending " << data; #endif net::async_write(m_socket, net::buffer(data, length), - [&, this](residue::error_code ec, std::size_t) { + [&, this, self](residue::error_code ec, std::size_t) { + // for 'self' - see https://www.youtube.com/watch?v=D-lTwGJRx0o?t=40m if (ec) { #ifdef RESIDUE_DEBUG DRVLOG(RV_DEBUG) << "Failed to send. " << ec.message(); diff --git a/src/tasks/log-rotator.cc b/src/tasks/log-rotator.cc index a7f8091..1948ecf 100644 --- a/src/tasks/log-rotator.cc +++ b/src/tasks/log-rotator.cc @@ -296,9 +296,11 @@ void LogRotator::rotate(const std::string& loggerId) if (doneList.find(fn) != doneList.end()) { return; } - fs->close(); - fs->open(fn, std::fstream::out | std::fstream::trunc); - Utils::updateFilePermissions(fn.data(), logger, m_registry->configuration()); + if (fs && fs->is_open() && !fs->fail()) { + fs->close(); + fs->open(fn, std::fstream::out | std::fstream::trunc); + Utils::updateFilePermissions(fn.data(), logger, m_registry->configuration()); + } doneList.insert(fn); }; diff --git a/src/utils/utils.h b/src/utils/utils.h index 4f82d20..e3d8759 100644 --- a/src/utils/utils.h +++ b/src/utils/utils.h @@ -113,7 +113,7 @@ class Utils final : StaticBase static types::Time nowUtc(); - static std::string formatTime(types::Time time, const char* format = "%h:%m:%s"); + static std::string formatTime(types::Time time, const char* format = "%H:%m:%s"); static std::tm timeToTm(types::Time epochInSec); diff --git a/tools/doxygen/conf b/tools/doxygen/conf index 4967afb..24abecb 100644 --- a/tools/doxygen/conf +++ b/tools/doxygen/conf @@ -1,7 +1,7 @@ PROJECT_NAME="Residue Extensions API" PROJECT_BRIEF="Extend residue server functionality" OUTPUT_DIRECTORY=build/doc -INPUT=tools/doxygen/README_DOXYGEN.md src/extensions/extension.h src/extensions/log-extension.h src/extensions/pre-archive-extension.h src/extensions/post-archive-extension.h src/core/json-doc.h +INPUT=tools/doxygen/README_DOXYGEN.md src/extensions/extension.h src/extensions/log-extension.h src/extensions/pre-archive-extension.h src/extensions/post-archive-extension.h src/extensions/dispatch-error-extension.h src/core/json-doc.h USE_MDFILE_AS_MAINPAGE=tools/doxygen/README_DOXYGEN.md HTML_EXTRA_STYLESHEET=tools/doxygen/customdoxygen.css LAYOUT_FILE=tools/doxygen/DoxygenLayout.xml diff --git a/tools/package.sh b/tools/package.sh index a22323f..aed88bb 100644 --- a/tools/package.sh +++ b/tools/package.sh @@ -48,7 +48,7 @@ if [ "$EXTENSIONS" = "ON" ];then fi -cmake -DCMAKE_BUILD_TYPE=Release -Duse_mine=OFF -Denable_extensions=$EXTENSIONS -Ddebug=$DEBUG_VERSION -Dprofiling=OFF -Dproduction=ON .. +cmake -DCMAKE_BUILD_TYPE=Release -Duse_mine=OFF -Denable_extensions=$EXTENSIONS -Ddebug=$DEBUG_VERSION -Dprofiling=$DEBUG_VERSION -Dproduction=ON .. make -j4 echo "Creating $PACK.tar.gz ..."