diff --git a/NEWS b/NEWS index 6d9a646d5..9d11cd0ff 100644 --- a/NEWS +++ b/NEWS @@ -1,5 +1,6 @@ 7.10.0 - Deprecate `errorhandler`; replace with lambda-friendly "notice handlers." + - Deprecate `notification_receiver`; replace with "notification handlers" - Bump minimum CMake version to 3.28. (#874) - Fixed error message on clashing transaction focuses. (#879) - Don't call`/bin/true`; macOS doesn't have it. Just call `true`. (#885) diff --git a/config/Makefile.in b/config/Makefile.in index 51586bda2..9b1454986 100644 --- a/config/Makefile.in +++ b/config/Makefile.in @@ -124,7 +124,7 @@ am__can_run_installinfo = \ esac am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP) am__DIST_COMMON = $(srcdir)/Makefile.in compile config.guess \ - config.sub install-sh ltmain.sh missing mkinstalldirs + config.sub depcomp install-sh ltmain.sh missing mkinstalldirs DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) ACLOCAL = @ACLOCAL@ AMTAR = @AMTAR@ diff --git a/configure b/configure index 24ad5d85a..5cfcc4260 100755 --- a/configure +++ b/configure @@ -17160,7 +17160,6 @@ then -Wattribute-alias=2 \ -Wlogical-op \ -Wmismatched-tags \ - -Wnoexcept \ -Wredundant-tags \ -Wrestrict \ -Wstringop-overflow \ @@ -18690,7 +18689,7 @@ if test "$found_fslib" != "yes" then as_fn_error $? " There seems to be support, but I could not figure out now to make -it work. You'll have to add set your own build options for this. +it work. You'll have to set your own build options for this. " "$LINENO" 5 fi { printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: $result_msg" >&5 diff --git a/configure.ac b/configure.ac index a4ee470bf..1704b9b38 100644 --- a/configure.ac +++ b/configure.ac @@ -182,7 +182,6 @@ then -Wattribute-alias=2 \ -Wlogical-op \ -Wmismatched-tags \ - -Wnoexcept \ -Wredundant-tags \ -Wrestrict \ -Wstringop-overflow \ diff --git a/doc/requirements.txt b/doc/requirements.txt index 7f84026dc..3194064b8 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -1,4 +1,5 @@ -# XXX: Pin versions. +# TODO: Pin versions. +# TODO: Do we even still need this file? dia sphinx-rtd-theme myst-parser diff --git a/include/pqxx/connection.hxx b/include/pqxx/connection.hxx index 801736187..d54d4cad1 100644 --- a/include/pqxx/connection.hxx +++ b/include/pqxx/connection.hxx @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -119,6 +120,54 @@ class const_connection_largeobject; namespace pqxx { +/// An incoming notification. +/** PostgreSQL extends SQL with a "message bus" using the `LISTEN` and `NOTIFY` + * commands. In libpqxx you use @ref connection::listen() and (optionally) + * @ref transaction_base::notify(). + * + * When you receive a notification for which you have been listening, your + * handler receives it in the form of a `notification` object. + * + * @warning These structs are meant for extremely short lifespans: the fields + * reference memory that may become invalid as soon as your handler has been + * called. + */ +struct notification +{ + /// The connection which received the notification. + /** There will be no _backend_ transaction active on the connection when your + * handler gets called, but there may be a @ref nontransaction. (This is a + * special transaction type in libpqxx which does not start a transaction on + * the backend.) + */ + connection &conn; + + /// Channel name. + /** The notification logic will only pass the notification to a handler which + * was registered to listen on this exact name. + */ + zview channel; + + /// Optional payload text. + /** If the notification did not carry a payload, the string will be empty. + */ + zview payload; + + /// Process ID of the backend that sent the notification. + /** This can be useful in situations where a multiple clients are listening + * on the same channel, and also send notifications on it. + * + * In those situations, it often makes sense for a client to ignore its own + * incoming notifications, but handle all others on the same channel in some + * way. + * + * To check for that, compare this process ID to the return value of the + * connection's `backendpid()`. + */ + int backend_pid; +}; + + /// Flags for skipping initialisation of SSL-related libraries. /** When a running process makes its first SSL connection to a database through * libpqxx, libpq automatically initialises the OpenSSL and libcrypto @@ -246,9 +295,9 @@ public: /// Move constructor. /** Moving a connection is not allowed if it has an open transaction, or has - * error handlers or notification receivers registered on it. In those - * situations, other objects may hold references to the old object which - * would become invalid and might produce hard-to-diagnose bugs. + * error handlers or is listening for notifications. In those situations, + * other objects may hold references to the old object which would become + * invalid and might produce hard-to-diagnose bugs. */ connection(connection &&rhs); @@ -283,9 +332,10 @@ public: {} } + // TODO: Once we drop notification_receiver/errorhandler, move is easier. /// Move assignment. - /** Neither connection can have an open transaction, or registered - * notification receivers. + /** Neither connection can have an open transaction, `errorhandler`, or + * `notification_receiver`. */ connection &operator=(connection &&rhs); @@ -341,7 +391,7 @@ public: /// Process ID for backend process, or 0 if inactive. [[nodiscard]] int PQXX_PURE backendpid() const & noexcept; - /// Socket currently used for connection, or -1 for none. Use with care! + /// Socket currently used for connection, or -1 for none. /** Query the current socket number. This is intended for event loops based * on functions such as select() or poll(), where you're waiting for any of * multiple file descriptors to become ready for communication. @@ -477,34 +527,122 @@ public: /** * @name Notifications and Receivers + * + * This is PostgreSQL-specific extension that goes beyond standard SQL. It's + * a communications mechanism between clients on a database, akin to a + * transactional message bus. + * + * A notification happens on a _channel,_ identified by a name. You can set + * a connection to _listen_ for notifications on the channel, using the + * connection's @ref listen() function. (Internally this will issue a + * `LISTEN` SQL command). Any client on the database can send a + * notification on that channel by executing a `NOTIFY` SQL command. The + * transaction classes implement a convenience function for this, called + * @ref transaction_base::notify(). + * + * Notifications can carry an optional _payload_ string. This is free-form + * text which carries additional information to the receiver. + * + * @warning There are a few pitfalls with the channel names: case sensitivity + * and encodings. They are not too hard to avoid, but the safest thing to do + * is use only lower-case ASCII names. + * + * + * ### Case sensitivity + * + * Channel names are _case-sensitive._ By default, however, PostgreSQL does + * convert the channel name in a `NOTIFY` or `LISTEN` command to lower-case, + * to give the impression that it is _not_ case-sensitive while keeping the + * performance cost low. + * + * Thus, a `LISTEN Hello` will pick up a notification from `NOTIFY Hello` but + * also one from `NOTIFY hello`, because the database converts `Hello` into + * `hello` going in either direction. + * + * You can prevent this conversion by putting the name in double quotes, as + * @ref quote_name() does. This is what libpqxx's notification functions do. + * If you use libpqxx to lisen on `Hello` but raw SQL to notify `Hello`, the + * notification will not arrive because the notification actually uses the + * string `hello` instead. + * + * Confused? Safest thing to do is to use only lower-case letters in the + * channel names! + * + * + * ### Transactions + * + * Both listening and notifying are _transactional_ in the backend: they + * only take effect once the back-end transaction in which you do them is + * committed. + * + * For an outgoing notification, this means that the transaction holds on to + * the outgoing message until you commit. (A @ref nontransaction does not + * start a backend transaction, so if that's the transaction type you're + * using, the message does go out immediately.) + * + * For listening to incoming notifications, it gets a bit more complicated. + * To avoid complicating its internal bookkeeping, libpqxx only lets you + * start listening while no transaction is open. + * + * No notifications will come in while you're in a transaction... again + * unless it's a @ref nontransaction of course, because that does not open a + * transaction on the backend. + * + * + * ### Exceptions + * + * If your handler throws an exception, that will simply propagate up the + * call chain to wherever you were when you received it. + * + * This is differnt from the old `notification_receiver` mechanism which + * logged exceptions but did not propagate them. + * + * + * ### Encoding + * + * When a client sends a notification, it does so in its client encoding. If + * necessary, the back-end converts them to its internal encoding. And then + * when a client receives the notification, the database converts it to the + * receiver's client encoding. + * + * Simple enough, right? + * + * However if you should _change_ your connection's client encoding after you + * start listening on a channel, then any notifications you receive may have + * different channel names than the ones for which you are listening. + * + * If this could be a problem in your scenario, stick to names in pure + * ASCII. Those will look the same in all the encodings postgres supports. */ //@{ /// Check for pending notifications and take appropriate action. /** This does not block. To wait for incoming notifications, either call - * await_notification() (it calls this function); or wait for incoming data - * on the connection's socket (i.e. wait to read), and then call this + * @ref await_notification() (it calls this function); or wait for incoming + * data on the connection's socket (i.e. wait to read), and then call this * function repeatedly until it returns zero. After that, there are no more - * pending notifications so you may want to wait again. + * pending notifications so you may want to wait again, or move on and do + * other work. * * If any notifications are pending when you call this function, it - * processes them by finding any receivers that match the notification string - * and invoking those. If no receivers match, there is nothing to invoke but - * we do consider the notification processed. + * processes them by checking for a matching notification handler, and if it + * finds one, invoking it. If there is no matching handler, nothing happens. * - * If any of the client-registered receivers throws an exception, the - * function will report it using the connection's notice handler. It does - * not re-throw the exceptions. + * If your notifcation handler throws an exception, `get_notifs()` will just + * propagate it back to you. (This is different from the old + * `notification_receiver` mechanism, which would merely log them.) * * @return Number of notifications processed. */ int get_notifs(); /// Wait for a notification to come in. - /** There are other events that will also terminate the wait, such as the - * backend failing. It will also wake up periodically. + /** There are other events that will also cancel the wait, such as the + * backend failing. Also, _the function will wake up by itself from time to + * time._ Your code must be ready to handle this; don't assume after waking + * up that there will always be a pending notifiation. * - * If a notification comes in, the call will process it, along with any other - * notifications that may have been pending. + * If a notification comes in, the call to this function will process it, + * along with any other notifications that may have been pending. * * To wait for notifications into your own event loop instead, wait until * there is incoming data on the connection's socket to be read, then call @@ -515,8 +653,8 @@ public: int await_notification(); /// Wait for a notification to come in, or for given timeout to pass. - /** There are other events that will also terminate the wait, such as the - * backend failing, or timeout expiring. + /** There are other events that will also cancel the wait, such as the + * backend failing, some kinds of signal coming in, or timeout expiring. * * If a notification comes in, the call will process it, along with any other * notifications that may have been pending. @@ -525,9 +663,48 @@ public: * there is incoming data on the connection's socket to be read, then call * @ref get_notifs repeatedly until it returns zero. * - * @return Number of notifications processed + * If your notifcation handler throws an exception, `get_notifs()` will just + * propagate it back to you. (This is different from the old + * `notification_receiver` mechanism, which would merely log them.) + * + * @return Number of notifications processed. + */ + int await_notification(std::time_t seconds, long microseconds = 0); + + /// A handler callback for incoming notifications on a given channel. + /** Your callback must accept a @ref notification object. This object can + * and will exist only for the duration of the handling of that one incoming + * notification. + * + * The handler can be "empty," i.e. contain no code. Setting an empty + * handler on a channel disables listening on that channel. + */ + using notification_handler = std::function; + + /// Attach a handler to a notification channel. + /** Issues a `LISTEN` SQL command for channel `channel`, and stores `handler` + * as the callback for when a notification comes in on that channel. + * + * The handler is a `std::function` (see @ref notification_handler), but you + * can simply pass in a lambda with the right parameters, or a function, or + * an object of a type you define that happens to implemnt the right function + * call operator. + * + * Your handler probably needs to interact with your application's data; the + * simple way to get that working is to pass a lambda with a closure + * referencing the data items you need. + * + * If the handler is empty (the default), then that stops the connection + * listening on the channel. It cancels your subscription, so to speak. + * You can do that as many times as you like, even when you never started + * listening to that channel in the first place. + * + * A connection can only have one handler per channel, so if you register two + * different handlers on the same channel, then the second overwrites the + * first. */ - int await_notification(std::time_t seconds, long microseconds); + void listen(std::string_view channel, notification_handler handler = {}); + //@} /** @@ -772,7 +949,7 @@ public: #endif // TODO: Make "into buffer" variant to eliminate a string allocation. - /// Unescape binary data, e.g. from a table field or notification payload. + /// Unescape binary data, e.g. from a `bytea` field. /** Takes a binary string as escaped by PostgreSQL, and returns a restored * copy of the original binary data. * @@ -892,7 +1069,7 @@ public: return esc(std::string_view{text, maxlen}); } - /// Unescape binary data, e.g. from a table field or notification payload. + /// Unescape binary data, e.g. from a `bytea` field. /** Takes a binary string as escaped by PostgreSQL, and returns a restored * copy of the original binary data. */ @@ -904,7 +1081,7 @@ public: #include "pqxx/internal/ignore-deprecated-post.hxx" } - /// Unescape binary data, e.g. from a table field or notification payload. + /// Unescape binary data, e.g. from a `bytea` field. /** Takes a binary string as escaped by PostgreSQL, and returns a restored * copy of the original binary data. */ @@ -964,7 +1141,6 @@ public: */ void set_notice_handler(std::function handler) { -// XXX: Express noexcept on handler in code, somehow. m_notice_waiters->notice_handler = std::move(handler); } @@ -1154,12 +1330,23 @@ private: /// 9.0: Replace with just notice handler. std::shared_ptr m_notice_waiters; + // TODO: Remove these when we retire notification_receiver. // TODO: Can we make these movable? using receiver_list = std::multimap; /// Notification receivers. receiver_list m_receivers; + /// Notification handlers. + /** These are the functions we call when notifications come in. Each + * corresponds to a `LISTEN` we have executed. + * + * The map does not contain any `std::function` which are empty. If the + * caller registers an empty function, that simply cancels any subscription + * to that channel. + */ + std::map m_notification_handlers; + /// Unique number to use as suffix for identifiers (see adorn_name()). int m_unique_id = 0; }; diff --git a/include/pqxx/notification.hxx b/include/pqxx/notification.hxx index 7dd3befe1..d1e843632 100644 --- a/include/pqxx/notification.hxx +++ b/include/pqxx/notification.hxx @@ -61,10 +61,13 @@ public: * @param cx Connnection to operate on. * @param channel Name of the notification to listen for. */ + [[deprecated("Use pqxx::connection::listen() instead.")]] notification_receiver(connection &cx, std::string_view channel); /// Register the receiver with a connection. + [[deprecated("Use pqxx::connection::listen() instead.")]] notification_receiver(notification_receiver const &) = delete; /// Register the receiver with a connection. + [[deprecated("Use pqxx::connection::listen() instead.")]] notification_receiver &operator=(notification_receiver const &) = delete; /// Deregister the receiver. virtual ~notification_receiver(); @@ -72,7 +75,6 @@ public: /// The channel that this receiver listens on. [[nodiscard]] std::string const &channel() const & { return m_channel; } - // TODO: Change API to take payload as zview instead of string ref. /// Overridable: action to invoke when notification arrives. /** * @param payload An optional string that may have been passed to the NOTIFY diff --git a/include/pqxx/transaction_base.hxx b/include/pqxx/transaction_base.hxx index f7d837cb3..d2fe6b9fa 100644 --- a/include/pqxx/transaction_base.hxx +++ b/include/pqxx/transaction_base.hxx @@ -213,7 +213,7 @@ public: return conn().esc_raw(std::forward(args)...); } - /// Unescape binary data, e.g. from a table field or notification payload. + /// Unescape binary data, e.g. from a `bytea` field. /** Takes a binary string as escaped by PostgreSQL, and returns a restored * copy of the original binary data. */ @@ -225,13 +225,13 @@ public: #include "pqxx/internal/ignore-deprecated-post.hxx" } - /// Unescape binary data, e.g. from a table field or notification payload. + /// Unescape binary data, e.g. from a `bytea` field. /** Takes a binary string as escaped by PostgreSQL, and returns a restored * copy of the original binary data. */ [[nodiscard]] bytes unesc_bin(zview text) { return conn().unesc_bin(text); } - /// Unescape binary data, e.g. from a table field or notification payload. + /// Unescape binary data, e.g. from a `bytea` field. /** Takes a binary string as escaped by PostgreSQL, and returns a restored * copy of the original binary data. */ @@ -243,7 +243,7 @@ public: #include "pqxx/internal/ignore-deprecated-post.hxx" } - /// Unescape binary data, e.g. from a table field or notification payload. + /// Unescape binary data, e.g. from a `bytea` field. /** Takes a binary string as escaped by PostgreSQL, and returns a restored * copy of the original binary data. */ @@ -892,6 +892,24 @@ public: { exec(query, parms).for_each(std::forward(func)); } + + /// Send a notification. + /** Convenience shorthand for executing a "NOTIFY" command. Most of the + * logic for handling _incoming_ notifications is in @ref pqxx::connection + * (particularly @ref pqxx::connection::listen), but _outgoing_ + * notifications happen here. + * + * Unless this transaction is a nontransaction, the actual notification only + * goes out once the outer transaction is committed. + * + * @param channel Name of the "channel" on which clients will need to be + * listening in order to receive this notification. + * + * @param payload Optional argument string which any listeners will also + * receive. If you leave this out, they will receive an empty string as the + * payload. + */ + void notify(std::string_view channel, std::string_view payload = {}); //@} /// Execute a prepared statement, with optional arguments. diff --git a/src/connection.cxx b/src/connection.cxx index 7bfc5109f..3d3cfd191 100644 --- a/src/connection.cxx +++ b/src/connection.cxx @@ -419,6 +419,50 @@ void PQXX_COLD pqxx::connection::add_receiver(pqxx::notification_receiver *n) } +void pqxx::connection::listen( + std::string_view channel, notification_handler handler) +{ + if (m_trans != nullptr) + throw usage_error{pqxx::internal::concat( + "Attempting to listen for notifications on '", channel, + "' while transaction is active.")}; + + std::string str_name{channel}; + + auto const + pos{m_notification_handlers.lower_bound(str_name)}, + handlers_end{std::end(m_notification_handlers)}; + + if (handler) + { + // Setting a handler. + if ((pos != handlers_end) and (pos->first == channel)) + { + // Overwrite existing handler. + m_notification_handlers.insert_or_assign( + pos, std::move(str_name), std::move(handler)); + } + else + { + // We had no handler installed for this name. Start listening. + exec(pqxx::internal::concat("LISTEN ", quote_name(channel))).no_rows(); + m_notification_handlers.emplace_hint(pos, channel, std::move(handler)); + } + } + else + { + // Installing an empty handler. That's equivalent to removing whatever + // handler may have been installed previously. + if (pos != handlers_end) + { + // Yes, we had a handler for this name. Remove it. + exec(pqxx::internal::concat("UNLISTEN ", quote_name(channel))).no_rows(); + m_notification_handlers.erase(pos); + } + } +} + + void PQXX_COLD pqxx::connection::remove_receiver(pqxx::notification_receiver *T) noexcept { @@ -564,16 +608,20 @@ int pqxx::connection::get_notifs() // Even if somehow we receive notifications during our transaction, don't // deliver them. - if (m_trans) + if (m_trans != nullptr) PQXX_UNLIKELY return 0; int notifs = 0; + + // Old mechanism. This is going away. for (auto N{get_notif(m_conn)}; N.get(); N = get_notif(m_conn)) { notifs++; - auto const Hit{m_receivers.equal_range(std::string{N->relname})}; + std::string const channel{N->relname}; + + auto const Hit{m_receivers.equal_range(channel)}; if (Hit.second != Hit.first) { std::string const payload{N->extra}; @@ -605,8 +653,14 @@ int pqxx::connection::get_notifs() } } + auto const handler{m_notification_handlers.find(N->relname)}; + // C++20: Use "dot notation" to initialise struct fields. + if (handler != std::end(m_notification_handlers)) + (handler->second)(notification{*this, channel, N->extra, N->be_pid}); + N.reset(); } + return notifs; } diff --git a/src/result.cxx b/src/result.cxx index 6de441623..7f0e239d9 100644 --- a/src/result.cxx +++ b/src/result.cxx @@ -357,7 +357,9 @@ std::string pqxx::result::status_error() const char const *pqxx::result::cmd_status() const noexcept { - // TODO: PQcmdStatus() could totally take a pointer to const. + // PQcmdStatus() can't take a PGresult const * because it returns a non-const + // pointer into the PGresult's data, and that can't be changed without + // breaking compatibility. return PQcmdStatus(const_cast(m_data.get())); } @@ -379,7 +381,9 @@ pqxx::oid pqxx::result::inserted_oid() const pqxx::result::size_type pqxx::result::affected_rows() const { - // TODO: PQcmdTuples() could take a pointer to const. + // PQcmdTuples() can't take a PGresult const * because it returns a non-const + // pointer into the PGresult's data, and that can't be changed without + // breaking compatibility. auto const rows_str{ PQcmdTuples(const_cast(m_data.get()))}; return (rows_str[0] == '\0') ? 0 : size_type(atoi(rows_str)); diff --git a/src/transaction.cxx b/src/transaction.cxx index dbddf8d58..022b9ba3e 100644 --- a/src/transaction.cxx +++ b/src/transaction.cxx @@ -71,7 +71,7 @@ void pqxx::internal::basic_transaction::do_commit() process_notice(internal::concat(e.what(), "\n")); std::string msg{internal::concat( - "WARNING: Commit of transaction '", name(), + "WARNING: Commit status of transaction '", name(), "' is unknown. " "There is no way to tell whether the transaction succeeded " "or was aborted except to check manually.\n")}; diff --git a/src/transaction_base.cxx b/src/transaction_base.cxx index 3adaa0004..99a39f11b 100644 --- a/src/transaction_base.cxx +++ b/src/transaction_base.cxx @@ -302,6 +302,16 @@ pqxx::result pqxx::transaction_base::internal_exec_params( } +void pqxx::transaction_base::notify( + std::string_view channel, std::string_view payload) +{ + // For some reason, NOTIFY does not work as a parameterised statement, + // even just for the payload (which is supposed to be a normal string). + // Luckily, pg_notify() does. + exec("SELECT pg_notify($1, $2)", params{channel, payload}).one_row(); +} + + void pqxx::transaction_base::set_variable( std::string_view var, std::string_view value) { diff --git a/test/test04.cxx b/test/test04.cxx index 96c03a3e8..9fe0b1552 100644 --- a/test/test04.cxx +++ b/test/test04.cxx @@ -9,57 +9,34 @@ #include -#include #include #include #include "test_helpers.hxx" -using namespace pqxx; - // Example program for libpqxx. Send notification to self. namespace { -int Backend_PID{0}; - - -// Sample implementation of notification receiver. -class TestListener final : public notification_receiver -{ - bool m_done; - -public: - explicit TestListener(connection &cx) : - notification_receiver(cx, "listen"), m_done(false) - {} - - virtual void operator()(std::string const &, int be_pid) override - { - m_done = true; - PQXX_CHECK_EQUAL( - be_pid, Backend_PID, "Notification came from wrong backend process."); - } - - bool done() const { return m_done; } -}; - - void test_004() { - connection cx; + auto const channel{"pqxx_test_notif"}; + pqxx::connection cx; + int backend_pid{0}; + cx.listen( + channel, + [&backend_pid](pqxx::notification n) noexcept + { backend_pid = n.backend_pid; }); - TestListener L{cx}; // Trigger our notification receiver. - perform([&cx, &L] { - work tx(cx); - tx.exec("NOTIFY " + cx.quote_name(L.channel())).no_rows(); - Backend_PID = cx.backendpid(); + pqxx::perform([&cx, &channel] { + pqxx::work tx(cx); + tx.notify(channel); tx.commit(); }); int notifs{0}; - for (int i{0}; (i < 20) and not L.done(); ++i) + for (int i{0}; (i < 20) and (backend_pid == 0); ++i) { PQXX_CHECK_EQUAL(notifs, 0, "Got unexpected notifications."); // Sleep for one second. I'm not proud of this, but how does one inject @@ -68,7 +45,10 @@ void test_004() notifs = cx.get_notifs(); } - PQXX_CHECK_NOT_EQUAL(L.done(), false, "No notification received."); + PQXX_CHECK_EQUAL( + backend_pid, + cx.backendpid(), + "Did not get our notification from our own backend."); PQXX_CHECK_EQUAL(notifs, 1, "Got too many notifications."); } diff --git a/test/test78.cxx b/test/test78.cxx index afc1030fe..2ef1732be 100644 --- a/test/test78.cxx +++ b/test/test78.cxx @@ -2,8 +2,7 @@ #include #include -#include -#include +#include #include #include "test_helpers.hxx" @@ -13,46 +12,22 @@ // notification name with unusal characters, and without polling. namespace { -// Sample implementation of notification receiver. -class TestListener : public pqxx::notification_receiver -{ - bool m_done; - -public: - explicit TestListener(pqxx::connection &cx, std::string const &Name) : - pqxx::notification_receiver(cx, Name), m_done(false) - {} - - void operator()(std::string const &, int be_pid) override - { - m_done = true; - PQXX_CHECK_EQUAL( - be_pid, conn().backendpid(), - "Got notification from wrong backend process."); - - std::cout << "Received notification: " << channel() << " pid=" << be_pid - << std::endl; - } - - bool done() const { return m_done; } -}; - - void test_078() { pqxx::connection cx; + bool done{false}; - std::string const NotifName{"my listener"}; - TestListener L{cx, NotifName}; + std::string const channel{"my listener"}; + cx.listen(channel, [&done](pqxx::notification) noexcept { done = true; }); - pqxx::perform([&cx, &L] { - pqxx::work tx{cx}; - tx.exec("NOTIFY " + tx.quote_name(L.channel())).no_rows(); + pqxx::perform([&cx, &channel] { + pqxx::nontransaction tx{cx}; + tx.notify(channel); tx.commit(); }); int notifs{0}; - for (int i{0}; (i < 20) and not L.done(); ++i) + for (int i{0}; (i < 20) and not done; ++i) { PQXX_CHECK_EQUAL(notifs, 0, "Got unexpected notifications."); std::cout << "."; @@ -60,7 +35,7 @@ void test_078() } std::cout << std::endl; - PQXX_CHECK(L.done(), "No notification received."); + PQXX_CHECK(done, "No notification received."); PQXX_CHECK_EQUAL(notifs, 1, "Got unexpected number of notifications."); } } // namespace diff --git a/test/test79.cxx b/test/test79.cxx index 870f6a55a..73cd0e902 100644 --- a/test/test79.cxx +++ b/test/test79.cxx @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -12,48 +11,29 @@ // Example program for libpqxx. Test waiting for notification with timeout. namespace { -// Sample implementation of notification receiver. -class TestListener final : public pqxx::notification_receiver -{ - bool m_done; - -public: - explicit TestListener(pqxx::connection &cx, std::string const &Name) : - pqxx::notification_receiver(cx, Name), m_done(false) - {} - - void operator()(std::string const &, int be_pid) override - { - m_done = true; - PQXX_CHECK_EQUAL( - be_pid, conn().backendpid(), "Notification came from wrong backend."); - - std::cout << "Received notification: " << channel() << " pid=" << be_pid - << std::endl; - } - - bool done() const { return m_done; } -}; - - void test_079() { pqxx::connection cx; - std::string const NotifName{"mylistener"}; - TestListener L(cx, NotifName); + std::string const channel{"mylistener"}; + int backend_pid{0}; + + cx.listen( + channel, + [&backend_pid](pqxx::notification n) noexcept + { backend_pid = n.backend_pid; }); // First see if the timeout really works: we're not expecting any notifs int notifs{cx.await_notification(0, 1)}; PQXX_CHECK_EQUAL(notifs, 0, "Got unexpected notification."); - pqxx::perform([&cx, &L] { + pqxx::perform([&cx, &channel] { pqxx::work tx{cx}; - tx.exec("NOTIFY " + L.channel()).no_rows(); + tx.notify(channel); tx.commit(); }); - for (int i{0}; (i < 20) and not L.done(); ++i) + for (int i{0}; (i < 20) and (backend_pid == 0); ++i) { PQXX_CHECK_EQUAL(notifs, 0, "Got notifications, but no handler called."); std::cout << "."; @@ -61,7 +41,7 @@ void test_079() } std::cout << std::endl; - PQXX_CHECK(L.done(), "No notifications received."); + PQXX_CHECK_EQUAL(backend_pid, cx.backendpid(), "Wrong backend."); PQXX_CHECK_EQUAL(notifs, 1, "Got unexpected notifications."); } } // namespace diff --git a/test/test87.cxx b/test/test87.cxx index 61caed3e4..6635c542a 100644 --- a/test/test87.cxx +++ b/test/test87.cxx @@ -11,7 +11,6 @@ #include -#include #include #include @@ -24,46 +23,26 @@ // multiple sockets. namespace { -// Sample implementation of notification receiver. -class TestListener final : public pqxx::notification_receiver -{ - bool m_done; - -public: - explicit TestListener(pqxx::connection &cx, std::string Name) : - pqxx::notification_receiver(cx, Name), m_done(false) - {} - - void operator()(std::string const &, int be_pid) override - { - m_done = true; - PQXX_CHECK_EQUAL( - be_pid, conn().backendpid(), - "Notification came from wrong backend process."); - - std::cout << "Received notification: " << channel() << " pid=" << be_pid - << std::endl; - } - - bool done() const { return m_done; } -}; - - void test_087() { pqxx::connection cx; - std::string const NotifName{"my notification"}; - TestListener L{cx, NotifName}; + std::string const channel{"my notification"}; + int backend_pid{0}; + + cx.listen( + channel, + [&backend_pid](pqxx::notification n) noexcept + { backend_pid = n.backend_pid; }); - pqxx::perform([&cx, &L] { + pqxx::perform([&cx, &channel] { pqxx::work tx{cx}; - tx.exec("NOTIFY " + tx.quote_name(L.channel())).no_rows(); + tx.notify(channel); tx.commit(); }); int notifs{0}; - for (int i{0}; (i < 20) and not L.done(); ++i) + for (int i{0}; (i < 20) and (backend_pid == 0); ++i) { PQXX_CHECK_EQUAL(notifs, 0, "Got unexpected notifications."); @@ -74,7 +53,8 @@ void test_087() } std::cout << std::endl; - PQXX_CHECK(L.done(), "No notification received."); + PQXX_CHECK_EQUAL( + backend_pid, cx.backendpid(), "Notification came from wrong backend."); PQXX_CHECK_EQUAL(notifs, 1, "Got unexpected number of notifications."); } } // namespace diff --git a/test/unit/test_notification.cxx b/test/unit/test_notification.cxx index 344022d96..5af65ef5d 100644 --- a/test/unit/test_notification.cxx +++ b/test/unit/test_notification.cxx @@ -8,11 +8,13 @@ #include #include +#include #include "../test_helpers.hxx" namespace { +#include class TestReceiver final : public pqxx::notification_receiver { public: @@ -34,7 +36,7 @@ class TestReceiver final : public pqxx::notification_receiver }; -void test_receive( +void test_receive_classic( pqxx::transaction_base &tx, std::string const &channel, char const payload[] = nullptr) { @@ -68,19 +70,370 @@ void test_receive( } -void test_notification() +void test_notification_classic() { pqxx::connection cx; TestReceiver receiver(cx, "mychannel"); PQXX_CHECK_EQUAL(receiver.channel(), "mychannel", "Bad channel."); pqxx::work tx{cx}; - test_receive(tx, "channel1"); + test_receive_classic(tx, "channel1"); pqxx::nontransaction u(cx); - test_receive(u, "channel2", "payload"); + test_receive_classic(u, "channel2", "payload"); } +#include -PQXX_REGISTER_TEST(test_notification); +void test_notification_to_self_arrives_after_commit() +{ + pqxx::connection cx; + + auto const channel{"pqxx_test_channel"}; + int notifications{0}; + pqxx::connection *conn; + std::string incoming, payload; + int pid{0}; + + cx.listen( + channel, + [¬ifications, &conn, &incoming, &payload, &pid](pqxx::notification n){ + ++notifications; + conn = &n.conn; + incoming = n.channel; + pid = n.backend_pid; + payload = n.payload; + }); + + cx.get_notifs(); + + // No notifications so far. + PQXX_CHECK_EQUAL(notifications, 0, "Unexpected notification."); + + pqxx::work tx{cx}; + tx.notify(channel); + int received{cx.await_notification(0, 300)}; + + // Notification has not been delivered yet, since transaction has not yet + // been committed. + PQXX_CHECK_EQUAL(received, 0, "Notification went out before commit."); + PQXX_CHECK_EQUAL(notifications, 0, "Received uncounted notification."); + + tx.commit(); + + received = cx.await_notification(3); + PQXX_CHECK_EQUAL(received, 1, "Did not receive 1 notification from self."); + PQXX_CHECK_EQUAL(notifications, 1, "Miscounted notifcations."); + PQXX_CHECK(conn == &cx, "Wrong connection on notification from self."); + PQXX_CHECK_EQUAL( + pid, cx.backendpid(), "Notification from self came from wrong connection."); + PQXX_CHECK_EQUAL(incoming, channel, "Notification is on wrong channel."); + PQXX_CHECK_EQUAL(payload, "", "Unexpected payload."); +} + + +void test_notification_has_payload() +{ + pqxx::connection cx; + + auto const channel{"pqxx-ichan"}, payload{"two dozen eggs"}; + int notifications{0}; + std::string received; + + cx.listen( + channel, + [¬ifications, &received](pqxx::notification n) { + ++notifications; + received = n.payload; + }); + + pqxx::work tx{cx}; + tx.notify(channel, payload); + tx.commit(); + + cx.await_notification(3); + + PQXX_CHECK_EQUAL(notifications, 1, "Expeccted 1 self-notification."); + PQXX_CHECK_EQUAL(received, payload, "Unexpected payload."); +} + + +// Functor-shaped notification handler. +struct notify_test_listener +{ + int &received; + notify_test_listener(int &r) : received{r} {} + void operator()(pqxx::notification) { ++received; } +}; + + +void test_listen_supports_different_types_of_callable() +{ + auto const chan{"pqxx-test-listen"}; + pqxx::connection cx; + int received; + + // Using a functor as a handler. + received = 0; + notify_test_listener l(received); + cx.listen(chan, l); + pqxx::work tx1{cx}; + tx1.notify(chan); + tx1.commit(); + cx.await_notification(3); + PQXX_CHECK_EQUAL(received, 1, "Notification did not arrive."); + + // Using a handler that takes a const reference to the notification. + received = 0; + cx.listen(chan, [&received](pqxx::notification const &) { ++received; }); + pqxx::work tx2{cx}; + tx2.notify(chan); + tx2.commit(); + cx.await_notification(3); + PQXX_CHECK_EQUAL(received, 1, "Const ref did not receive notification."); + + // Using a handler that takes an rvalue reference. + received = 0; + cx.listen(chan, [&received](pqxx::notification &&) { ++received; }); + pqxx::work tx3{cx}; + tx3.notify(chan); + tx3.commit(); + cx.await_notification(3); + PQXX_CHECK_EQUAL(received, 1, "Revalue ref did not receive notification."); +} + + +void test_abort_cancels_notification() +{ + auto const chan{"pqxx-test-channel"}; + pqxx::connection cx; + bool received{false}; + cx.listen(chan, [&received](pqxx::notification){ received = true; }); + + pqxx::work tx{cx}; + tx.notify(chan); + tx.abort(); + + cx.await_notification(3); + PQXX_CHECK(not received, "Abort did not cancel notification."); +} + + +void test_notification_channels_are_case_sensitive() +{ + pqxx::connection cx; + std::string in; + cx.listen("pqxx-AbC", [&in](pqxx::notification n){ in = n.channel; }); + + pqxx::work tx{cx}; + tx.notify("pqxx-AbC"); + tx.notify("pqxx-ABC"); + tx.notify("pqxx-abc"); + tx.commit(); + + cx.await_notification(3); + + PQXX_CHECK_EQUAL(in, "pqxx-AbC", "Channel is not case-insensitive."); +} + + +void test_notification_channels_may_contain_weird_chars() +{ + auto const chan{"pqxx-A_#&*!"}; + pqxx::connection cx; + std::string got; + cx.listen(chan, [&got](pqxx::notification n){ got = n.channel; }); + pqxx::work tx{cx}; + tx.notify(chan); + tx.commit(); + cx.await_notification(3); + PQXX_CHECK_EQUAL( + got, chan, "Channel name with weird characters got distorted."); +} + + +/// In a nontransaction, a notification goes out even if you abort. +void test_nontransaction_sends_notification() +{ + auto const chan{"pqxx-test-chan"}; + pqxx::connection cx; + bool got{false}; + cx.listen(chan, [&got](pqxx::notification){ got = true; }); + + pqxx::nontransaction tx{cx}; + tx.notify(chan); + tx.abort(); + + cx.await_notification(3); + PQXX_CHECK(got, "Notification from nontransaction did not arrive."); +} + + +void test_subtransaction_sends_notification() +{ + auto const chan{"pqxx-test-chan6301"}; + pqxx::connection cx; + bool got{false}; + cx.listen(chan, [&got](pqxx::notification){ got = true; }); + + pqxx::work tx{cx}; + pqxx::subtransaction sx{tx}; + sx.notify(chan); + sx.commit(); + tx.commit(); + + cx.await_notification(3); + PQXX_CHECK(got, "Notification from subtransaction did not arrive."); +} + + +void test_subtransaction_abort_cancels_notification() +{ + auto const chan{"pqxx-test-chan123278w"}; + pqxx::connection cx; + bool got{false}; + cx.listen(chan, [&got](pqxx::notification){ got = true; }); + + pqxx::work tx{cx}; + pqxx::subtransaction sx{tx}; + sx.notify(chan); + sx.abort(); + tx.commit(); + + cx.await_notification(3); + PQXX_CHECK(not got, "Subtransaction rollback did not cancel notification."); +} + + +void test_cannot_listen_during_transaction() +{ + pqxx::connection cx; + // Listening while a transaction is active is an error, even when it's just + // a nontransaction. + pqxx::nontransaction tx{cx}; + PQXX_CHECK_THROWS( + cx.listen("pqxx-test-chan02756", [](pqxx::notification){}), + pqxx::usage_error, + "Expected usage_error when listening during transaction."); +} + + +void test_notifications_cross_connections() +{ + auto const chan{"pqxx-chan7529"}; + pqxx::connection cx_listen, cx_notify; + int sender_pid{0}; + cx_listen.listen( + chan, + [&sender_pid](pqxx::notification n){ sender_pid = n.backend_pid; }); + + pqxx::work tx{cx_notify}; + tx.notify(chan); + tx.commit(); + + cx_listen.await_notification(3); + PQXX_CHECK_EQUAL( + sender_pid, cx_notify.backendpid(), "Sender pid mismatch."); +} + + +void test_notification_goes_to_right_handler() +{ + pqxx::connection cx; + std::string got; + int count{0}; + + cx.listen( + "pqxx-chanX", + [&got, &count](pqxx::notification){ got = "chanX"; ++count; }); + cx.listen( + "pqxx-chanY", + [&got, &count](pqxx::notification){ got = "chanY"; ++count; }); + cx.listen( + "pqxx-chanZ", + [&got, &count](pqxx::notification){ got = "chanZ"; ++count; }); + + pqxx::work tx{cx}; + tx.notify("pqxx-chanY"); + tx.commit(); + cx.await_notification(3); + + PQXX_CHECK_EQUAL(got, "chanY", "Wrong handler got called."); + PQXX_CHECK_EQUAL(count, 1, "Wrong number of handler calls."); +} + + +void test_listen_on_same_channel_overwrites() +{ + auto const chan{"pqxx-chan84710"}; + pqxx::connection cx; + std::string got; + int count{0}; + + cx.listen( + chan, [&got, &count](pqxx::notification){ got = "first"; ++count; }); + cx.listen( + chan, [&got, &count](pqxx::notification){ got = "second"; ++count; }); + cx.listen( + chan, [&got, &count](pqxx::notification){ got = "third"; ++count; }); + + pqxx::work tx{cx}; + tx.notify(chan); + tx.commit(); + cx.await_notification(3); + + PQXX_CHECK_EQUAL(count, 1, "Expected 1 notification despite overwrite."); + PQXX_CHECK_EQUAL(got,"third", "Wrong handler called."); +} + + +void test_empty_notification_handler_disables() +{ + auto const chan{"pqxx-chan812710"}; + pqxx::connection cx; + bool got{false}; + cx.listen(chan, [&got](pqxx::notification){ got = true; }); + cx.listen(chan); + pqxx::work tx{cx}; + tx.notify(chan); + tx.commit(); + PQXX_CHECK(not got, "Disabling a notification handler did not work."); +} + + +void test_notifications_do_not_come_in_until_commit() +{ + auto const chan{"pqxx-chan95017834"}; + pqxx::connection cx; + bool got{false}; + cx.listen(chan, [&got](pqxx::notification){ got = true; }); + + // This applies even during a nontransaction. Another test verifies that + // a notification goes _out_ even if we abort the nontransaction, because + // it goes out immediately, not at commit time. What we're establishing + // here is that the notification does not come _in_ during a transaction, + // even if it's a nontransaction. + pqxx::nontransaction tx{cx}; + tx.notify(chan); + cx.await_notification(3); + PQXX_CHECK(not got, "Notification came in during nontransaction."); +} + + +PQXX_REGISTER_TEST(test_notification_classic); +PQXX_REGISTER_TEST(test_notification_to_self_arrives_after_commit); +PQXX_REGISTER_TEST(test_notification_has_payload); +PQXX_REGISTER_TEST(test_listen_supports_different_types_of_callable); +PQXX_REGISTER_TEST(test_abort_cancels_notification); +PQXX_REGISTER_TEST(test_notification_channels_are_case_sensitive); +PQXX_REGISTER_TEST(test_notification_channels_may_contain_weird_chars); +PQXX_REGISTER_TEST(test_nontransaction_sends_notification); +PQXX_REGISTER_TEST(test_subtransaction_sends_notification); +PQXX_REGISTER_TEST(test_subtransaction_abort_cancels_notification); +PQXX_REGISTER_TEST(test_cannot_listen_during_transaction); +PQXX_REGISTER_TEST(test_notifications_cross_connections); +PQXX_REGISTER_TEST(test_notification_goes_to_right_handler); +PQXX_REGISTER_TEST(test_listen_on_same_channel_overwrites); +PQXX_REGISTER_TEST(test_empty_notification_handler_disables); +PQXX_REGISTER_TEST(test_notifications_do_not_come_in_until_commit); } // namespace