Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jakobod committed Oct 2, 2020
1 parent f8050af commit 4264e10
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 102 deletions.
60 changes: 30 additions & 30 deletions libcaf_net/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,28 @@ file(GLOB_RECURSE CAF_NET_HEADERS "caf/*.hpp")
# -- add consistency checks for enum to_string implementations -----------------

caf_incubator_add_enum_consistency_check("caf/net/basp/ec.hpp"
"src/basp/ec_strings.cpp")
"src/basp/ec_strings.cpp")
caf_incubator_add_enum_consistency_check("caf/net/basp/message_type.hpp"
"src/basp/message_type_strings.cpp")
"src/basp/message_type_strings.cpp")
caf_incubator_add_enum_consistency_check("caf/net/operation.hpp"
"src/basp/operation_strings.cpp")
"src/basp/operation_strings.cpp")

# -- utility function for setting default properties ---------------------------

function(caf_net_set_default_properties)
foreach (target ${ARGN})
caf_incubator_set_default_properties(${target})
# Make sure we find our headers plus the the generated export header.
target_include_directories(${target} PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_BINARY_DIR}")
target_compile_definitions(${target} PRIVATE libcaf_net_EXPORTS)
# Pull in public dependencies.
target_link_libraries(${target} PUBLIC CAF::core)
if (MSVC)
target_link_libraries(${target} PUBLIC ws2_32 iphlpapi)
endif ()
endforeach ()
foreach(target ${ARGN})
caf_incubator_set_default_properties(${target})
# Make sure we find our headers plus the the generated export header.
target_include_directories(${target} PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_BINARY_DIR}")
target_compile_definitions(${target} PRIVATE libcaf_net_EXPORTS)
# Pull in public dependencies.
target_link_libraries(${target} PUBLIC CAF::core)
if(MSVC)
target_link_libraries(${target} PUBLIC ws2_32 iphlpapi)
endif()
endforeach()
endfunction()

# -- add library targets -------------------------------------------------------
Expand Down Expand Up @@ -66,27 +66,27 @@ add_library(libcaf_net_obj OBJECT ${CAF_NET_HEADERS}
)

add_library(libcaf_net "${PROJECT_SOURCE_DIR}/cmake/dummy.cpp"
$<TARGET_OBJECTS:libcaf_net_obj>)
$<TARGET_OBJECTS:libcaf_net_obj>)

generate_export_header(libcaf_net
EXPORT_MACRO_NAME CAF_NET_EXPORT
EXPORT_FILE_NAME "${CMAKE_BINARY_DIR}/caf/detail/net_export.hpp")
EXPORT_MACRO_NAME CAF_NET_EXPORT
EXPORT_FILE_NAME "${CMAKE_BINARY_DIR}/caf/detail/net_export.hpp")

set_property(TARGET libcaf_net_obj PROPERTY POSITION_INDEPENDENT_CODE ON)

caf_net_set_default_properties(libcaf_net_obj libcaf_net)

target_include_directories(libcaf_net INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}>)
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}>)

add_library(CAF::net ALIAS libcaf_net)

set_target_properties(libcaf_net PROPERTIES
EXPORT_NAME net
SOVERSION ${CAF_VERSION}
VERSION ${CAF_LIB_VERSION}
OUTPUT_NAME caf_net)
EXPORT_NAME net
SOVERSION ${CAF_VERSION}
VERSION ${CAF_LIB_VERSION}
OUTPUT_NAME caf_net)

# -- install library and header files ------------------------------------------

Expand All @@ -106,13 +106,13 @@ install(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/caf"

# -- build unit tests ----------------------------------------------------------

if (NOT CAF_INC_ENABLE_TESTING)
return()
endif ()
if(NOT CAF_INC_ENABLE_TESTING)
return()
endif()

add_executable(caf-net-test
test/net-test.cpp
$<TARGET_OBJECTS:libcaf_net_obj>)
test/net-test.cpp
$<TARGET_OBJECTS:libcaf_net_obj>)

caf_net_set_default_properties(caf-net-test)

Expand Down
1 change: 0 additions & 1 deletion libcaf_net/caf/net/actor_proxy_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include "caf/actor_proxy.hpp"
#include "caf/net/consumer.hpp"
#include "caf/net/socket_manager.hpp"

namespace caf::net {

Expand Down
55 changes: 27 additions & 28 deletions libcaf_net/caf/net/basp/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#pragma once

#include <cstdint>
#include <iterator>
#include <memory>
#include <type_traits>
#include <unordered_map>
Expand All @@ -28,7 +27,6 @@

#include "caf/actor.hpp"
#include "caf/actor_addr.hpp"
#include "caf/actor_clock.hpp"
#include "caf/actor_system.hpp"
#include "caf/actor_system_config.hpp"
#include "caf/binary_deserializer.hpp"
Expand All @@ -40,9 +38,6 @@
#include "caf/detail/worker_hub.hpp"
#include "caf/error.hpp"
#include "caf/fwd.hpp"
#include "caf/intrusive/drr_queue.hpp"
#include "caf/intrusive/fifo_inbox.hpp"
#include "caf/intrusive/singly_linked.hpp"
#include "caf/mailbox_element.hpp"
#include "caf/net/actor_proxy_impl.hpp"
#include "caf/net/basp/constants.hpp"
Expand All @@ -53,18 +48,15 @@
#include "caf/net/basp/worker.hpp"
#include "caf/net/consumer.hpp"
#include "caf/net/consumer_queue.hpp"
#include "caf/net/multiplexer.hpp"
#include "caf/net/receive_policy.hpp"
#include "caf/net/socket_manager.hpp"
#include "caf/node_id.hpp"
#include "caf/policy/normal_messages.hpp"
#include "caf/proxy_registry.hpp"
#include "caf/response_promise.hpp"
#include "caf/scoped_execution_unit.hpp"
#include "caf/send.hpp"
#include "caf/tag/message_oriented.hpp"
#include "caf/unit.hpp"
#include "caf/variant.hpp"

namespace caf::net::basp {

Expand Down Expand Up @@ -96,34 +88,35 @@ class CAF_NET_EXPORT application : public consumer {
error init(socket_manager* owner, LowerLayerPtr down, const settings& cfg) {
// Initialize member variables.
owner_ = owner;
system_ = &owner->mpx().system();
system_ = &owner->system();
executor_.system_ptr(system_);
executor_.proxy_registry_ptr(&proxies_);
max_throughput_ = get_or(cfg, "caf.scheduler.max-throughput",
defaults::scheduler::max_throughput);
auto workers = get_or<size_t>(
cfg, "caf.middleman.workers",
std::min(3u, std::thread::hardware_concurrency() / 4u) + 1);
max_throughput_ = get_or(system().config(), "caf.scheduler.max-throughput",
defaults::scheduler::max_throughput);
for (size_t i = 0; i < workers; ++i)
hub_->add_new_worker(*queue_, proxies_);
// Write handshake.
return write_message(
down, header{message_type::handshake, version}, system().node(),
get_or(system().config(), "caf.middleman.app-identifiers",
application::default_app_ids()));
auto app_ids = get_or(cfg, "caf.middleman.app-identifiers",
application::default_app_ids());
return write_message(down, header{message_type::handshake, version},
system().node(), app_ids);
}

template <class LowerLayerPtr>
bool prepare_send(LowerLayerPtr& down) {
CAF_LOG_TRACE("");
if (!handshake_complete())
return true;
if (auto err = dequeue_events(down)) {
CAF_LOG_ERROR("handle_events failed: " << CAF_ARG(err));
CAF_LOG_ERROR("dequeue_events failed: " << CAF_ARG(err));
down->abort_reason(err);
return false;
}
if (auto err = dequeue_messages(down)) {
CAF_LOG_ERROR("handle_messages failed: " << CAF_ARG(err));
CAF_LOG_ERROR("dequeue_messages failed: " << CAF_ARG(err));
down->abort_reason(err);
return false;
}
Expand All @@ -132,6 +125,7 @@ class CAF_NET_EXPORT application : public consumer {

template <class LowerLayerPtr>
ptrdiff_t consume(LowerLayerPtr& down, byte_span buffer) {
CAF_LOG_TRACE(CAF_ARG2("buffer.size", buffer.size()));
if (auto err = handle(down, buffer)) {
CAF_LOG_ERROR("could not handle message: " << CAF_ARG(err));
down->abort_reason(err);
Expand All @@ -142,13 +136,15 @@ class CAF_NET_EXPORT application : public consumer {

template <class LowerLayerPtr>
bool done_sending(LowerLayerPtr&) {
CAF_LOG_TRACE("");
if (mailbox_.blocked())
return true;
return (mailbox_.empty() && mailbox_.try_block());
}

template <class LowerLayerPtr>
void abort(LowerLayerPtr&, const error&) {
CAF_LOG_TRACE("");
// nop
}

Expand All @@ -163,15 +159,12 @@ class CAF_NET_EXPORT application : public consumer {
/// Writes a message to the message buffer of `down`.
template <class LowerLayerPtr, class... Ts>
error write_message(LowerLayerPtr& down, header hdr, Ts&&... xs) {
CAF_LOG_TRACE(CAF_ARG(hdr));
down->begin_message();
auto& buf = down->message_buffer();
binary_serializer sink{&executor_, buf};
if (!sink.apply_object(hdr))
if (!sink.apply_objects(hdr, xs...))
return sink.get_error();
if constexpr (sizeof...(xs) >= 1) {
if (!sink.apply_objects(xs...))
return sink.get_error();
}
down->end_message();
return none;
}
Expand Down Expand Up @@ -212,6 +205,7 @@ class CAF_NET_EXPORT application : public consumer {

template <class LowerLayerPtr>
error dequeue_events(LowerLayerPtr& down) {
CAF_LOG_TRACE("");
if (!mailbox_.blocked()) {
mailbox_.fetch_more();
auto& q = std::get<0>(mailbox_.queue().queues());
Expand All @@ -220,14 +214,14 @@ class CAF_NET_EXPORT application : public consumer {
for (auto ptr = q.next(); ptr != nullptr; ptr = q.next()) {
auto f = detail::make_overload(
[&](consumer_queue::event::resolve_request& x) {
write_resolve_request(down, x.locator, x.listener);
write_resolve_request(down, x.path, x.listener);
},
[&](consumer_queue::event::new_proxy& x) { new_proxy(down, x.id); },
[&](consumer_queue::event::local_actor_down& x) {
local_actor_down(down, x.id, std::move(x.reason));
},
[&](consumer_queue::event::timeout& x) {
timeout(down, x.type, x.id);
timeout(down, std::move(x.type), x.id);
});
visit(f, ptr->value);
}
Expand All @@ -250,27 +244,31 @@ class CAF_NET_EXPORT application : public consumer {
}

template <class LowerLayerPtr>
void new_proxy(LowerLayerPtr& down, actor_id id) {
void new_proxy(LowerLayerPtr& down, actor_id aid) {
CAF_LOG_TRACE(CAF_ARG(aid));
if (auto err = write_message(down, header{message_type::monitor_message,
static_cast<uint64_t>(id)}))
static_cast<uint64_t>(aid)}))
down->abort_reason(err);
}

template <class LowerLayerPtr>
void local_actor_down(LowerLayerPtr& down, actor_id id, error reason) {
void local_actor_down(LowerLayerPtr& down, actor_id aid, error reason) {
CAF_LOG_TRACE(CAF_ARG(aid) << CAF_ARG(reason));
if (auto err = write_message(
down, header{message_type::down_message, static_cast<uint64_t>(id)},
down, header{message_type::down_message, static_cast<uint64_t>(aid)},
reason))
down->abort_reason(err);
}

template <class LowerLayerPtr>
void timeout(LowerLayerPtr& down, std::string type, uint64_t id) {
CAF_LOG_TRACE(CAF_ARG(type) << CAF_ARG(id));
down->timeout(std::move(type), id);
}

template <class LowerLayerPtr>
error dequeue_messages(LowerLayerPtr& down) {
CAF_LOG_TRACE("");
for (size_t count = 0; count < max_throughput_; ++count) {
auto ptr = next_message();
if (ptr == nullptr)
Expand Down Expand Up @@ -376,6 +374,7 @@ class CAF_NET_EXPORT application : public consumer {

template <class LowerLayerPtr>
error handle_actor_message(LowerLayerPtr&, header hdr, byte_span payload) {
CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size()));
auto worker = hub_->pop();
if (worker != nullptr) {
CAF_LOG_DEBUG("launch BASP worker for deserializing an actor_message");
Expand Down
4 changes: 2 additions & 2 deletions libcaf_net/caf/net/consumer_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class CAF_NET_EXPORT consumer_queue {
class event final : public element {
public:
struct resolve_request {
std::string locator;
std::string path;
actor listener;
};

Expand All @@ -79,7 +79,7 @@ class CAF_NET_EXPORT consumer_queue {
uint64_t id;
};

event(std::string locator, actor listener);
event(std::string path, actor listener);

event(actor_id proxy_id);

Expand Down
2 changes: 1 addition & 1 deletion libcaf_net/caf/net/endpoint_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class endpoint_manager_impl : public endpoint_manager {
for (auto ptr = q.next(); ptr != nullptr; ptr = q.next()) {
auto f = detail::make_overload(
[&](consumer_queue::event::resolve_request& x) {
transport_.resolve(*this, x.locator, x.listener);
transport_.resolve(*this, x.path, x.listener);
},
[&](consumer_queue::event::new_proxy& x) {
transport_.new_proxy(*this, x.id);
Expand Down
Loading

0 comments on commit 4264e10

Please sign in to comment.