diff --git a/bindings/cpp/session.cpp b/bindings/cpp/session.cpp index b6ef80696..369533796 100644 --- a/bindings/cpp/session.cpp +++ b/bindings/cpp/session.cpp @@ -1896,6 +1896,11 @@ async_backend_control_result session::disable_backend(const address &addr, uint3 return update_backend_status(backend_status_params(*this, addr, backend_id, DNET_BACKEND_DISABLE)); } +async_backend_control_result session::remove_backend(const address &addr, uint32_t backend_id) +{ + return update_backend_status(backend_status_params(*this, addr, backend_id, DNET_BACKEND_REMOVE)); +} + async_backend_control_result session::start_defrag(const address &addr, uint32_t backend_id) { backend_status_params params(*this, addr, backend_id, DNET_BACKEND_START_DEFRAG); diff --git a/bindings/python/dnet_client b/bindings/python/dnet_client index 4d1908776..6a6ea4635 100755 --- a/bindings/python/dnet_client +++ b/bindings/python/dnet_client @@ -129,6 +129,7 @@ def process_backend(): actions = { 'enable': lambda x, y, z: process_backend_control(x, y, z, elliptics.Session.enable_backend), 'disable': lambda x, y, z: process_backend_control(x, y, z, elliptics.Session.disable_backend), + 'remove': lambda x, y, z: process_backend_control(x, y, z, elliptics.Session.remove_backend), 'defrag': lambda x, y, z: process_backend_control(x, y, z, elliptics.Session.start_defrag), 'compact': lambda x, y, z: process_backend_control(x, y, z, elliptics.Session.start_compact), 'stop_defrag': lambda x, y, z: process_backend_control(x, y, z, elliptics.Session.stop_defrag), diff --git a/bindings/python/elliptics_session.cpp b/bindings/python/elliptics_session.cpp index b9efffee7..7ebd46c63 100644 --- a/bindings/python/elliptics_session.cpp +++ b/bindings/python/elliptics_session.cpp @@ -445,6 +445,12 @@ class elliptics_session: public session, public bp::wrapper { return create_result(std::move(session::disable_backend(address(host, port, family), backend_id))); } + python_backend_status_result remove_backend(const std::string &host, int port, int family, uint64_t backend_id) { + return create_result( + session::remove_backend(address(host, port, family), backend_id) + ); + } + python_backend_status_result start_defrag(const std::string &host, int port, int family, uint32_t backend_id) { return create_result(std::move(session::start_defrag(address(host, port, family), backend_id))); } @@ -1431,6 +1437,13 @@ void init_elliptics_session() { " Returns AsyncResult which provides new status of the backend\n\n" " new_status = session.disable_backend(elliptics.Address.from_host_port_family(host='host.com', port=1025, family=AF_INET), 0).get()[0].backends[0]") + .def("remove_backend", &elliptics_session::remove_backend, + (bp::arg("host"), bp::arg("port"), bp::arg("family"), bp::arg("backend_id")), + "remove_backend(host, port, family, backend_id)\n" + " Removes backend @backend_id at node addressed by @host, @port, @family\n" + " Returns AsyncResult which provides new status of the backend\n\n" + " new_status = session.remove_backend(elliptics.Address.from_host_port_family(host='host.com', port=1025, family=AF_INET), 0).get()[0].backends[0]") + .def("start_defrag", &elliptics_session::start_defrag, (bp::arg("host"), bp::arg("port"), bp::arg("family"), bp::arg("backend_id")), "start_defrag(host, port, family, backend_id)\n" diff --git a/bindings/python/src/session.py b/bindings/python/src/session.py index 94ff60cd2..fa4728afc 100644 --- a/bindings/python/src/session.py +++ b/bindings/python/src/session.py @@ -197,6 +197,16 @@ def disable_backend(self, address, backend_id): family=address.family, backend_id=backend_id) + def remove_backend(self, address, backend_id): + """ + Removes backend @backend_id on @address. + Return elliptics.AsyncResult that provides new status of backend. + """ + return super(Session, self).remove_backend(host=address.host, + port=address.port, + family=address.family, + backend_id=backend_id) + def start_defrag(self, address, backend_id): """ Starts defragmentation of backend @backend_id on @address. diff --git a/example/config.cpp b/example/config.cpp index 3dceb3345..6e68d4a9c 100644 --- a/example/config.cpp +++ b/example/config.cpp @@ -311,10 +311,28 @@ void parse_options(config_data *data, const config &options) } } +std::shared_ptr dnet_parse_backend(config_data *data, uint32_t backend_id, const config &backend) +{ + auto info = std::make_shared(data->logger, backend_id); + + info->enable_at_start = backend.at("enable", true); + info->read_only_at_start = backend.at("read_only", false); + info->state = DNET_BACKEND_DISABLED; + info->history = backend.at("history", std::string()); + + if (info->enable_at_start) { + // It's parsed to check configuration at start + // It will be reparsed again at backend's initialization anyway + info->parse(data, backend); + } + + return info; +} + void parse_backends(config_data *data, const config &backends) { std::set backends_ids; - auto &backends_info = data->backends->backends; + auto config_backends = data->backends; for (size_t index = 0; index < backends.size(); ++index) { const config backend = backends.at(index); @@ -327,19 +345,11 @@ void parse_backends(config_data *data, const config &backends) << " duplicates one of previous backend_id"; } - while (backend_id + 1 > backends_info.size()) - backends_info.emplace_back(data->logger, backends_info.size()); - - dnet_backend_info &info = backends_info[backend_id]; - info.enable_at_start = backend.at("enable", true); - info.read_only_at_start = backend.at("read_only", false); - info.state = DNET_BACKEND_DISABLED; - info.history = backend.at("history", std::string()); - - if (info.enable_at_start) { - // It's parsed to check configuration at start - // It will be reparsed again at backend's initialization anyway - info.parse(data, backend); + if (!config_backends->get_backend(backend_id)) { + auto info = dnet_parse_backend(data, backend_id, backend); + if (info) { + config_backends->add_backend(info); + } } } } diff --git a/example/config.hpp b/example/config.hpp index 24fa1a43c..95addddb4 100644 --- a/example/config.hpp +++ b/example/config.hpp @@ -354,7 +354,7 @@ struct config_data : public dnet_config_data std::mutex parser_mutex; std::shared_ptr parser; dnet_time config_timestamp; - dnet_backend_info_list backends_guard; + dnet_backend_info_manager backends_guard; std::string logger_value; ioremap::elliptics::logger_base logger_base; ioremap::elliptics::logger logger; @@ -363,6 +363,8 @@ struct config_data : public dnet_config_data std::unique_ptr monitor_config; }; +std::shared_ptr dnet_parse_backend(config_data *data, uint32_t backend_id, const config &backend); + } } } // namespace ioremap::elliptics::config #endif // CONFIG_HPP diff --git a/include/elliptics/packet.h b/include/elliptics/packet.h index b7f398cbd..6f45d25d3 100644 --- a/include/elliptics/packet.h +++ b/include/elliptics/packet.h @@ -111,7 +111,8 @@ enum dnet_backend_command { DNET_BACKEND_READ_ONLY, DNET_BACKEND_WRITEABLE, DNET_BACKEND_CTL, // change internal parameters like delay - DNET_BACKEND_STOP_DEFRAG + DNET_BACKEND_STOP_DEFRAG, + DNET_BACKEND_REMOVE, // stop, cleanup and forget backend }; enum dnet_backend_state { diff --git a/include/elliptics/session.hpp b/include/elliptics/session.hpp index 02b065c2a..f0b59ae64 100644 --- a/include/elliptics/session.hpp +++ b/include/elliptics/session.hpp @@ -76,8 +76,8 @@ void none(const error_info &error, const std::vector &statuses); /*! * This handler allows to remove couple of replicas in case of bad writing * - * If you write to 3 groups and at least 2 succesfull writings are mandotary and - * in case of fail all succesffully written entries must be removed the + * If you write to 3 groups and at least 2 successful writings are mandatory and + * in case of fail all successfully written entries must be removed the * following code may be used: * * ```cpp @@ -482,7 +482,7 @@ class session /*! * Reads data by \a id and passes it through \a converter. If converter returns the same data - * it's threated as data is already up-to-date, othwerwise low-level write-cas with proper + * it's threated as data is already up-to-date, otherwise low-level write-cas with proper * checksum and \a remote_offset is invoked. * * If server returns -EBADFD data is read and processed again. @@ -584,7 +584,7 @@ class session * Lookups information for key \a id, picks lookup_result_enties by following rules: * 1. If there are quorum lookup_result_enties with the same timestamp, they are the final result * 2. Otherwise the final result is lookup_result_enties with the greatest timestamp - * This method is a wrapper over parallel_lookup and usefull in case of you need to find + * This method is a wrapper over parallel_lookup and useful in case of you need to find * quorum identical replicas * * Returns async_lookup_result. @@ -643,16 +643,64 @@ class session */ async_node_status_result request_node_status(const address &addr); + /*! + * Enables backend with @backend_id at node @addr. + */ async_backend_control_result enable_backend(const address &addr, uint32_t backend_id); + + /*! + * Disables backend with @backend_id at node @addr. + */ async_backend_control_result disable_backend(const address &addr, uint32_t backend_id); + + /*! + * Removes backend with @backend_id at node @addr. + * The backend will be stopped and uninitialized. + */ + async_backend_control_result remove_backend(const address &addr, uint32_t backend_id); + + /*! + * Starts defragmentation at backend with @backend_id at node @addr. + */ async_backend_control_result start_defrag(const address &addr, uint32_t backend_id); + + /*! + * Starts compact (lightweight defragmentation) at backend with @backend_id at node @addr. + */ async_backend_control_result start_compact(const address &addr, uint32_t backend_id); + + /*! + * Stops defragmentation at backend with @backend_id at node @addr. + */ async_backend_control_result stop_defrag(const address &addr, uint32_t backend_id); + + /*! + * Updates ids which backend with @backend_id at node @addr serves. + */ async_backend_control_result set_backend_ids(const address &addr, uint32_t backend_id, const std::vector &ids); + + /*! + * Makes backend with @backend_id at node @addr readonly. + * Backend in readonly mode fails all requests which modify data. + */ async_backend_control_result make_readonly(const address &addr, uint32_t backend_id); + + /*! + * Makes backend with @backend_id at node @addr writeable. + * Turns off read-only mode. + */ async_backend_control_result make_writable(const address &addr, uint32_t backend_id); + + /*! + * Sets delay in milliseconds to backend with @backend_id at node @addr. + * Backend with a delay will sleep @delay milliseconds before executing any request. + */ async_backend_control_result set_delay(const address &addr, uint32_t backend_id, uint32_t delay); + + /*! + * Requests status of all backends at node @addr. + */ async_backend_status_result request_backends_status(const address &addr); /*! @@ -818,7 +866,7 @@ class session * \brief Set \a indexes for object \a id. * * It removes object from all indexes which are not in the list \a indexes. - * All data in existen indexes are replaced by so from \a indexes. + * All data in existing indexes are replaced by so from \a indexes. * * Returns async_set_indexes_result. */ diff --git a/library/backend.cpp b/library/backend.cpp index 01294a191..6f8bef3bc 100644 --- a/library/backend.cpp +++ b/library/backend.cpp @@ -196,29 +196,27 @@ static const char *elapsed(const dnet_time &start) return buffer; } -int dnet_backend_init(struct dnet_node *node, size_t backend_id, int *state) +static int dnet_backend_init(struct dnet_node *node, size_t backend_id) { int ids_num; struct dnet_raw_id *ids; - auto &backends = node->config_data->backends->backends; - if (backends.size() <= backend_id) { + auto backend = node->config_data->backends->get_backend(backend_id); + if (!backend) { dnet_log(node, DNET_LOG_ERROR, "backend_init: backend: %zu, invalid backend id", backend_id); return -EINVAL; } - dnet_backend_info &backend = backends[backend_id]; dnet_time start; dnet_current_time(&start); { - std::lock_guard guard(*backend.state_mutex); - *state = backend.state; - if (backend.state != DNET_BACKEND_DISABLED) { + std::lock_guard guard(*backend->state_mutex); + if (backend->state != DNET_BACKEND_DISABLED) { dnet_log(node, DNET_LOG_ERROR, "backend_init: backend: %zu, " "trying to activate not disabled backend, elapsed: %s", backend_id, elapsed(start)); - switch (*state) { + switch (backend->state) { case DNET_BACKEND_ENABLED: return -EALREADY; case DNET_BACKEND_ACTIVATING: @@ -230,7 +228,7 @@ int dnet_backend_init(struct dnet_node *node, size_t backend_id, int *state) return -EINVAL; } } - backend.state = DNET_BACKEND_ACTIVATING; + backend->state = DNET_BACKEND_ACTIVATING; } dnet_log(node, DNET_LOG_INFO, "backend_init: backend: %zu, initializing", backend_id); @@ -250,7 +248,7 @@ int dnet_backend_init(struct dnet_node *node, size_t backend_id, int *state) const config backend_config = backends_config.at(index); const uint32_t config_backend_id = backend_config.at("backend_id"); if (backend_id == config_backend_id) { - backend.parse(&data, backend_config); + backend->parse(&data, backend_config); found = true; break; } @@ -265,7 +263,7 @@ int dnet_backend_init(struct dnet_node *node, size_t backend_id, int *state) } } catch (std::bad_alloc &) { err = -ENOMEM; - dnet_log(node, DNET_LOG_ERROR, "backend_init: backend: %zu, failed as not enouh memory, elapsed: %s", + dnet_log(node, DNET_LOG_ERROR, "backend_init: backend: %zu, failed as not enough memory, elapsed: %s", backend_id, elapsed(start)); goto err_out_exit; } catch (std::exception &exc) { @@ -275,39 +273,39 @@ int dnet_backend_init(struct dnet_node *node, size_t backend_id, int *state) goto err_out_exit; } - backend.config = backend.config_template; - backend.data.assign(backend.data.size(), '\0'); - backend.config.data = backend.data.data(); - backend.config.log = backend.log.get(); + backend->config = backend->config_template; + backend->data.assign(backend->data.size(), '\0'); + backend->config.data = backend->data.data(); + backend->config.log = backend->log.get(); - backend_io = &node->io->backends[backend_id]; + backend_io = dnet_get_backend_io(node->io, backend_id); backend_io->need_exit = 0; - backend_io->read_only = backend.read_only_at_start; + backend_io->read_only = backend->read_only_at_start; - for (auto it = backend.options.begin(); it != backend.options.end(); ++it) { + for (auto it = backend->options.begin(); it != backend->options.end(); ++it) { const dnet_backend_config_entry &entry = *it; - entry.entry->callback(&backend.config, entry.entry->key, entry.value_template.data()); + entry.entry->callback(&backend->config, entry.entry->key, entry.value_template.data()); } - err = backend.config.init(&backend.config); + err = backend->config.init(&backend->config); if (err) { dnet_log(node, DNET_LOG_ERROR, "backend_init: backend: %zu, failed to init backend: %d, elapsed: %s", backend_id, err, elapsed(start)); goto err_out_exit; } - backend_io->cb = &backend.config.cb; + backend_io->cb = &backend->config.cb; - err = dnet_backend_io_init(node, backend_io, backend.io_thread_num, backend.nonblocking_io_thread_num); + err = dnet_backend_io_init(node, backend_io, backend->io_thread_num, backend->nonblocking_io_thread_num); if (err) { dnet_log(node, DNET_LOG_ERROR, "backend_init: backend: %zu, failed to init io pool, err: %d, elapsed: %s", backend_id, err, elapsed(start)); goto err_out_backend_cleanup; } - if (backend.cache_config) { - backend_io->cache = backend.cache = dnet_cache_init(node, backend_io, backend.cache_config.get()); - if (!backend.cache) { + if (backend->cache_config) { + backend_io->cache = backend->cache = dnet_cache_init(node, backend_io, backend->cache_config.get()); + if (!backend->cache) { err = -ENOMEM; dnet_log(node, DNET_LOG_ERROR, "backend_init: backend: %zu, failed to init cache, err: %d, elapsed: %s", backend_id, err, elapsed(start)); @@ -316,15 +314,15 @@ int dnet_backend_init(struct dnet_node *node, size_t backend_id, int *state) } ids_num = 0; - ids = dnet_ids_init(node, backend.history.c_str(), &ids_num, backend.config.storage_free, node->addrs, backend_id); + ids = dnet_ids_init(node, backend->history.c_str(), &ids_num, backend->config.storage_free, node->addrs, backend_id); if (ids == NULL) { err = -EINVAL; dnet_log(node, DNET_LOG_ERROR, "backend_init: backend: %zu, history path: %s, " "failed to initialize ids, elapsed: %s: %s [%d]", - backend_id, backend.history.c_str(), elapsed(start), strerror(-err), err); + backend_id, backend->history.c_str(), elapsed(start), strerror(-err), err); goto err_out_cache_cleanup; } - err = dnet_route_list_enable_backend(node->route, backend_id, backend.group, ids, ids_num); + err = dnet_route_list_enable_backend(node->route, backend_id, backend->group, ids, ids_num); free(ids); if (err) { @@ -336,53 +334,51 @@ int dnet_backend_init(struct dnet_node *node, size_t backend_id, int *state) dnet_log(node, DNET_LOG_INFO, "backend_init: backend: %zu, initialized, elapsed: %s", backend_id, elapsed(start)); { - std::lock_guard guard(*backend.state_mutex); - dnet_current_time(&backend.last_start); - backend.last_start_err = 0; - backend.state = DNET_BACKEND_ENABLED; + std::lock_guard guard(*backend->state_mutex); + dnet_current_time(&backend->last_start); + backend->last_start_err = 0; + backend->state = DNET_BACKEND_ENABLED; } return 0; dnet_route_list_disable_backend(node->route, backend_id); err_out_cache_cleanup: - if (backend.cache) { + if (backend->cache) { /* Set need_exit to stop cache's threads */ backend_io->need_exit = 1; - dnet_cache_cleanup(backend.cache); - backend.cache = NULL; + dnet_cache_cleanup(backend->cache); + backend->cache = NULL; backend_io->cache = NULL; } err_out_backend_io_cleanup: backend_io->need_exit = 1; dnet_backend_io_cleanup(node, backend_io); - node->io->backends[backend_id].cb = NULL; + backend_io->cb = nullptr; err_out_backend_cleanup: - backend.config.cleanup(&backend.config); + backend->config.cleanup(&backend->config); err_out_exit: { - std::lock_guard guard(*backend.state_mutex); - dnet_current_time(&backend.last_start); - backend.last_start_err = err; - backend.state = DNET_BACKEND_DISABLED; + std::lock_guard guard(*backend->state_mutex); + dnet_current_time(&backend->last_start); + backend->last_start_err = err; + backend->state = DNET_BACKEND_DISABLED; } return err; } -int dnet_backend_cleanup(struct dnet_node *node, size_t backend_id, int *state) +static int dnet_backend_cleanup(struct dnet_node *node, size_t backend_id) { - if (backend_id >= node->config_data->backends->backends.size()) { + auto backend = node->config_data->backends->get_backend(backend_id); + if (!backend) { return -EINVAL; } - dnet_backend_info &backend = node->config_data->backends->backends[backend_id]; - { - std::lock_guard guard(*backend.state_mutex); - *state = backend.state; - if (backend.state != DNET_BACKEND_ENABLED) { + std::lock_guard guard(*backend->state_mutex); + if (backend->state != DNET_BACKEND_ENABLED) { dnet_log(node, DNET_LOG_ERROR, "backend_cleanup: backend: %zu, trying to destroy not activated backend", backend_id); - switch (*state) { + switch (backend->state) { case DNET_BACKEND_DISABLED: return -EALREADY; case DNET_BACKEND_DEACTIVATING: @@ -394,7 +390,7 @@ int dnet_backend_cleanup(struct dnet_node *node, size_t backend_id, int *state) return -EINVAL; } } - backend.state = DNET_BACKEND_DEACTIVATING; + backend->state = DNET_BACKEND_DEACTIVATING; } dnet_log(node, DNET_LOG_INFO, "backend_cleanup: backend: %zu, destroying", backend_id); @@ -402,9 +398,9 @@ int dnet_backend_cleanup(struct dnet_node *node, size_t backend_id, int *state) if (node->route) dnet_route_list_disable_backend(node->route, backend_id); - dnet_backend_io *backend_io = node->io ? &node->io->backends[backend_id] : NULL; + dnet_backend_io *backend_io = node->io ? dnet_get_backend_io(node->io, backend_id) : nullptr; - // set @need_exit to true to force cache lifecheck thread to exit and slru cacge to sync all elements to backend + // set @need_exit to true to force cache lifecheck thread to exit and slru cache to sync all elements to backend // this also leads to IO threads to stop, but since we already removed itself from route table, // and cache syncs data to backend either in lifecheck thread or in destructor context, // it is safe to set @need_exit early @@ -412,8 +408,8 @@ int dnet_backend_cleanup(struct dnet_node *node, size_t backend_id, int *state) backend_io->need_exit = 1; dnet_log(node, DNET_LOG_INFO, "backend_cleanup: backend: %zu: cleaning cache", backend_id); - dnet_cache_cleanup(backend.cache); - backend.cache = NULL; + dnet_cache_cleanup(backend->cache); + backend->cache = NULL; dnet_log(node, DNET_LOG_INFO, "backend_cleanup: backend: %zu: cleaning io: %p", backend_id, backend_io); if (backend_io) { @@ -421,12 +417,12 @@ int dnet_backend_cleanup(struct dnet_node *node, size_t backend_id, int *state) backend_io->cb = NULL; } - backend.config.cleanup(&backend.config); - memset(&backend.config.cb, 0, sizeof(backend.config.cb)); + backend->config.cleanup(&backend->config); + memset(&backend->config.cb, 0, sizeof(backend->config.cb)); { - std::lock_guard guard(*backend.state_mutex); - backend.state = DNET_BACKEND_DISABLED; + std::lock_guard guard(*backend->state_mutex); + backend->state = DNET_BACKEND_DISABLED; } dnet_log(node, DNET_LOG_INFO, "backend_cleanup: backend: %zu, destroyed", backend_id); @@ -434,13 +430,71 @@ int dnet_backend_cleanup(struct dnet_node *node, size_t backend_id, int *state) return 0; } +/* Disable and remove backend */ +static int dnet_backend_remove(struct dnet_node *node, size_t backend_id) { + const int err = dnet_backend_cleanup(node, backend_id); + if (err && err != -EALREADY) { + dnet_log(node, DNET_LOG_INFO, + "backend_remove: backend: %zu, failed to disable backend: %s [%d]", + backend_id, strerror(-err), err); + return err; + } + + node->config_data->backends->remove_backend(backend_id); + + dnet_log(node, DNET_LOG_INFO, "backend_remove: backend: %zu, removed", backend_id); + return 0; +} + + +int dnet_backend_create(struct dnet_node *node, size_t backend_id) +{ + auto backends = node->config_data->backends; + auto backend = backends->get_backend(backend_id); + if (backend) + return 0; + + try { + using namespace ioremap::elliptics::config; + auto data = static_cast(node->config_data); + auto parser = data->parse_config(); + config cfg = parser->root(); + const config backends_config = cfg.at("backends"); + + for (size_t index = 0; index < backends_config.size(); ++index) { + const config backend_config = backends_config.at(index); + + if (backend_id == backend_config.at("backend_id")) { + backend = dnet_parse_backend(data, backend_id, backend_config); + } + } + } catch (std::bad_alloc &) { + dnet_log(node, DNET_LOG_ERROR, "backend_create: backend: %zu, failed as not enough memory", + backend_id); + return -ENOMEM; + } catch (std::exception &exc) { + dnet_log(node, DNET_LOG_ERROR, "backend_create: backend: %zu, failed to read configuration file: %s", + backend_id, exc.what()); + return -EBADF; + } + + if (!backend) + return -ENOENT; + + int err = dnet_server_backend_init(node, backend_id); + if (!err) { + backends->add_backend(backend); + } + + return err; +} + int dnet_backend_init_all(struct dnet_node *node) { int err = 1; bool all_ok = true; - int state = DNET_BACKEND_ENABLED; - auto &backends = node->config_data->backends->backends; + auto backends = node->config_data->backends; using namespace ioremap::elliptics::config; auto &data = *static_cast(node->config_data); auto parser = data.parse_config(); @@ -464,9 +518,9 @@ int dnet_backend_init_all(struct dnet_node *node) for (size_t index = 0; index < backends_config.size(); ++index) { const config backend_config = backends_config.at(index); const uint32_t backend_id = backend_config.at("backend_id"); - dnet_backend_info &backend = backends[backend_id]; - if (!backend.enable_at_start) { - backend.parse(&data, backend_config); + auto backend = backends->get_backend(backend_id); + if (!backend->enable_at_start) { + backend->parse(&data, backend_config); continue; } @@ -485,13 +539,13 @@ int dnet_backend_init_all(struct dnet_node *node) for (size_t index = 0; index < backends_config.size(); ++index) { const config backend_config = backends_config.at(index); const uint32_t backend_id = backend_config.at("backend_id"); - dnet_backend_info &backend = backends[backend_id]; - if (!backend.enable_at_start) { - backend.parse(&data, backend_config); + auto backend = backends->get_backend(backend_id); + if (!backend->enable_at_start) { + backend->parse(&data, backend_config); continue; } - int tmp = dnet_backend_init(node, backend_id, &state); + int tmp = dnet_backend_init(node, backend_id); if (!tmp) { err = 0; } else if (err == 1) { @@ -512,27 +566,21 @@ int dnet_backend_init_all(struct dnet_node *node) return err; } -void dnet_backend_cleanup_all(struct dnet_node *node) -{ - int state = DNET_BACKEND_ENABLED; - - auto &backends = node->config_data->backends->backends; - for (size_t backend_id = 0; backend_id < backends.size(); ++backend_id) { - if (backends[backend_id].state != DNET_BACKEND_DISABLED) - dnet_backend_cleanup(node, backend_id, &state); +void dnet_backend_cleanup_all(struct dnet_node *node) { + for (auto backend : node->config_data->backends->get_all_backends()) { + if (backend->state != DNET_BACKEND_DISABLED) + dnet_backend_cleanup(node, backend->backend_id); } } static int dnet_backend_set_ids(dnet_node *node, uint32_t backend_id, dnet_raw_id *ids, uint32_t ids_count) { - auto &backends = node->config_data->backends->backends; - if (backend_id >= backends.size()) { + auto backend = node->config_data->backends->get_backend(backend_id); + if (!backend) { return -EINVAL; } - dnet_backend_info &backend = backends[backend_id]; - - if (backend.history.empty()) { + if (backend->history.empty()) { dnet_log(node, DNET_LOG_ERROR, "backend_set_ids: backend_id: %u, " "failed to open temporary ids file: history is not specified", backend_id); @@ -541,8 +589,8 @@ static int dnet_backend_set_ids(dnet_node *node, uint32_t backend_id, dnet_raw_i char tmp_ids[1024]; char target_ids[1024]; - snprintf(tmp_ids, sizeof(tmp_ids), "%s/ids_%08x%08x", backend.history.c_str(), rand(), rand()); - snprintf(target_ids, sizeof(target_ids), "%s/ids", backend.history.c_str()); + snprintf(tmp_ids, sizeof(tmp_ids), "%s/ids_%08x%08x", backend->history.c_str(), rand(), rand()); + snprintf(target_ids, sizeof(target_ids), "%s/ids", backend->history.c_str()); int err = 0; std::ofstream out(tmp_ids, std::ofstream::binary | std::ofstream::trunc); @@ -567,14 +615,14 @@ static int dnet_backend_set_ids(dnet_node *node, uint32_t backend_id, dnet_raw_i } else { if (!err) { - std::lock_guard guard(*backend.state_mutex); - switch (backend.state) { + std::lock_guard guard(*backend->state_mutex); + switch (backend->state) { case DNET_BACKEND_ENABLED: err = std::rename(tmp_ids, target_ids); if (err) break; err = dnet_route_list_enable_backend(node->route, - backend_id, backend.group, ids, ids_count); + backend_id, backend->group, ids, ids_count); break; case DNET_BACKEND_DISABLED: err = std::rename(tmp_ids, target_ids); @@ -594,44 +642,82 @@ static int dnet_backend_set_ids(dnet_node *node, uint32_t backend_id, dnet_raw_i return err; } -void backend_fill_status_nolock(struct dnet_node *node, struct dnet_backend_status *status, size_t backend_id) +void backend_fill_status_nolock(struct dnet_node *node, struct dnet_backend_status *status, const struct dnet_backend_info *config_backend) { - if (!status) + if (!status || !config_backend) return; - const auto &backends = node->config_data->backends->backends; - const dnet_backend_info &backend = backends[backend_id]; - const dnet_backend_io &io = node->io->backends[backend_id]; + auto backend_id = config_backend->backend_id; + const dnet_backend_io *io = dnet_get_backend_io(node->io, backend_id); - const auto &cb = backend.config.cb; + const auto &cb = config_backend->config.cb; status->backend_id = backend_id; - status->state = backend.state; - if (backend.state == DNET_BACKEND_ENABLED && cb.defrag_status) + status->state = config_backend->state; + if (config_backend->state == DNET_BACKEND_ENABLED && cb.defrag_status) status->defrag_state = cb.defrag_status(cb.command_private); - status->last_start = backend.last_start; - status->last_start_err = backend.last_start_err; - status->read_only = io.read_only; - status->delay = io.delay; + status->last_start = config_backend->last_start; + status->last_start_err = config_backend->last_start_err; + status->read_only = io->read_only; + status->delay = io->delay; } -void backend_fill_status(dnet_node *node, dnet_backend_status *status, size_t backend_id) +void dnet_backend_info_manager::backend_fill_status(dnet_node *node, dnet_backend_status *status, size_t backend_id) const { - const auto &backends = node->config_data->backends->backends; - const dnet_backend_info &backend = backends[backend_id]; - std::lock_guard guard(*backend.state_mutex); + std::shared_ptr backend; + { + std::lock_guard guard(backends_mutex); + auto it = backends.find(backend_id); + if (it != backends.end()) { + backend = it->second; + } + } + if (backend) + { + std::lock_guard guard(*backend->state_mutex); + backend_fill_status_nolock(node, status, backend.get()); + } +} - backend_fill_status_nolock(node, status, backend_id); +std::shared_ptr dnet_backend_info_manager::get_backend(size_t backend_id) const +{ + std::lock_guard guard(backends_mutex); + auto it = backends.find(backend_id); + if (it != backends.end()) { + return it->second; + } + return std::shared_ptr(); +} + +void dnet_backend_info_manager::add_backend(std::shared_ptr &backend) +{ + std::lock_guard guard(backends_mutex); + backends.insert({backend->backend_id, backend}); +} + +void dnet_backend_info_manager::remove_backend(size_t backend_id) { + std::lock_guard guard(backends_mutex); + backends.erase(backend_id); } static int dnet_cmd_backend_control_dangerous(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data) { + int err = 0; dnet_node *node = st->n; - const auto &backends = node->config_data->backends->backends; - struct dnet_backend_control *control = reinterpret_cast(data); - if (control->backend_id >= backends.size()) { + if (dnet_backend_command(control->command) == DNET_BACKEND_ENABLE) { + err = dnet_backend_create(node, control->backend_id); + if (err) { + dnet_log(node, DNET_LOG_ERROR, "backend_control: backend creation failed: %u, state: %s: %d", + control->backend_id, dnet_state_dump_addr(st), err); + return err; + } + } + + auto backends = node->config_data->backends; + auto backend = backends->get_backend(control->backend_id); + if (!backend) { dnet_log(node, DNET_LOG_ERROR, "backend_control: there is no such backend: %u, state: %s", control->backend_id, dnet_state_dump_addr(st)); return -EINVAL; @@ -646,25 +732,25 @@ static int dnet_cmd_backend_control_dangerous(struct dnet_net_state *st, struct dnet_log(node, DNET_LOG_INFO, "backend_control: received BACKEND_CONTROL: backend_id: %u, command: %u, state: %s", control->backend_id, control->command, dnet_state_dump_addr(st)); - const dnet_backend_info &backend = backends[control->backend_id]; - if (backend.state == DNET_BACKEND_UNITIALIZED) { + if (backend->state == DNET_BACKEND_UNITIALIZED) { dnet_log(node, DNET_LOG_ERROR, "backend_control: there is no such backend: %u, state: %s", control->backend_id, dnet_state_dump_addr(st)); return -EINVAL; } - dnet_backend_io &io = node->io->backends[control->backend_id]; + dnet_backend_io *io = dnet_get_backend_io(node->io, control->backend_id); - int state = DNET_BACKEND_DISABLED; - const dnet_backend_callbacks &cb = backend.config.cb; + const dnet_backend_callbacks &cb = backend->config.cb; - int err = -ENOTSUP; switch (dnet_backend_command(control->command)) { case DNET_BACKEND_ENABLE: - err = dnet_backend_init(node, control->backend_id, &state); + err = dnet_backend_init(node, control->backend_id); break; case DNET_BACKEND_DISABLE: - err = dnet_backend_cleanup(node, control->backend_id, &state); + err = dnet_backend_cleanup(node, control->backend_id); + break; + case DNET_BACKEND_REMOVE: + err = dnet_backend_remove(node, control->backend_id); break; case DNET_BACKEND_START_DEFRAG: if (cb.defrag_start) { @@ -685,25 +771,28 @@ static int dnet_cmd_backend_control_dangerous(struct dnet_net_state *st, struct err = dnet_backend_set_ids(st->n, control->backend_id, control->ids, control->ids_count); break; case DNET_BACKEND_READ_ONLY: - if (io.read_only) { + if (io->read_only) { err = -EALREADY; } else { - io.read_only = 1; + io->read_only = 1; err = 0; } break; case DNET_BACKEND_WRITEABLE: - if (!io.read_only) { + if (!io->read_only) { err = -EALREADY; } else { - io.read_only = 0; + io->read_only = 0; err = 0; } break; case DNET_BACKEND_CTL: - io.delay = control->delay; + io->delay = control->delay; err = 0; break; + default: + err = -ENOTSUP; + break; } char buffer[sizeof(dnet_backend_status_list) + sizeof(dnet_backend_status)]; @@ -713,7 +802,7 @@ static int dnet_cmd_backend_control_dangerous(struct dnet_net_state *st, struct dnet_backend_status *status = reinterpret_cast(list + 1); list->backends_count = 1; - backend_fill_status(node, status, control->backend_id); + backends->backend_fill_status(node, status, control->backend_id); if (err) { dnet_send_reply(st, cmd, list, sizeof(buffer), true); @@ -764,7 +853,12 @@ int dnet_cmd_backend_status(struct dnet_net_state *st, struct dnet_cmd *cmd, voi (void) data; dnet_node *node = st->n; - const auto &backends = node->config_data->backends->backends; + auto backends = node->config_data->backends->get_all_backends(); + auto cmp_backends = [](const std::shared_ptr &lhs, const std::shared_ptr &rhs) -> bool { + return lhs->backend_id < rhs->backend_id; + }; + std::sort(backends.begin(), backends.end(), cmp_backends); + const size_t total_size = sizeof(dnet_backend_status_list) + backends.size() * sizeof(dnet_backend_status); std::unique_ptr @@ -773,18 +867,16 @@ int dnet_cmd_backend_status(struct dnet_net_state *st, struct dnet_cmd *cmd, voi return -ENOMEM; } - list->backends_count = backends.size(); - - size_t j = 0; + size_t i = 0; - for (size_t i = 0; i < backends.size(); ++i) { - dnet_backend_status &status = list->backends[j]; - backend_fill_status(st->n, &status, i); + for (auto &backend : backends) { + dnet_backend_status &status = list->backends[i]; + node->config_data->backends->backend_fill_status(st->n, &status, backend->backend_id); if (status.state != DNET_BACKEND_UNITIALIZED) - ++j; + ++i; } - list->backends_count = j; + list->backends_count = i; cmd->flags &= ~DNET_FLAGS_NEED_ACK; diff --git a/library/backend.h b/library/backend.h index 35ab09c39..5fc0e543a 100644 --- a/library/backend.h +++ b/library/backend.h @@ -37,7 +37,7 @@ struct cache_config * * When backend is being initialized, it calls @entry.callback() function for each config entry * - * Please note that backend initalization copies value into temporal copy, + * Please note that backend initialization copies value into temporal copy, * since @entry.callback() can modify this data. */ struct dnet_backend_config_entry @@ -58,7 +58,8 @@ struct dnet_backend_info dnet_backend_info(dnet_logger &logger, uint32_t backend_id) : log(new dnet_logger(logger, make_attributes(backend_id))), - group(0), cache(NULL), enable_at_start(false), read_only_at_start(false), + backend_id(backend_id), group(0), cache(NULL), + enable_at_start(false), read_only_at_start(false), state_mutex(new std::mutex), state(DNET_BACKEND_UNITIALIZED), io_thread_num(0), nonblocking_io_thread_num(0) { @@ -75,6 +76,7 @@ struct dnet_backend_info config_template(other.config_template), log(std::move(other.log)), options(std::move(other.options)), + backend_id(other.backend_id), group(other.group), cache(other.cache), history(other.history), @@ -97,6 +99,7 @@ struct dnet_backend_info config_template = other.config_template; log = std::move(other.log); options = std::move(other.options); + backend_id = other.backend_id; group = other.group; cache = other.cache; history = other.history; @@ -120,6 +123,7 @@ struct dnet_backend_info dnet_config_backend config_template; std::unique_ptr log; std::vector options; + uint32_t backend_id; uint32_t group; void *cache; std::string history; @@ -139,36 +143,51 @@ struct dnet_backend_info int nonblocking_io_thread_num; }; -struct dnet_backend_info_list +class dnet_backend_info_manager { - std::vector backends; +public: + /* + * Locks backend with \a backend_id state mutex and fills \a status + */ + void backend_fill_status(struct dnet_node *node, struct dnet_backend_status *status, size_t backend_id) const; + + std::vector > get_all_backends() const + { + std::vector > result; + std::lock_guard guard(backends_mutex); + for (auto it : backends) { + result.push_back(it.second); + } + return result; + } + + std::shared_ptr get_backend(size_t backend_id) const; + + void add_backend(std::shared_ptr &backend); + void remove_backend(size_t backend_id); + +private: + std::unordered_map > backends; + mutable std::mutex backends_mutex; }; +void backend_fill_status_nolock(struct dnet_node *node, struct dnet_backend_status *status, const struct dnet_backend_info *config_backend); + extern "C" { #else // __cplusplus -typedef struct dnet_backend_info_list_t dnet_backend_info_list; +typedef struct dnet_backend_info_manager dnet_backend_info_manager; +struct dnet_io; #endif // __cplusplus -int dnet_backend_init(struct dnet_node *n, size_t backend_id, int *state); -int dnet_backend_cleanup(struct dnet_node *n, size_t backend_id, int *state); - int dnet_backend_init_all(struct dnet_node *n); void dnet_backend_cleanup_all(struct dnet_node *n); -size_t dnet_backend_info_list_count(dnet_backend_info_list *backends); +int dnet_get_backend_ids(const dnet_backend_info_manager *backends, size_t **backend_ids, size_t *num_backend_ids); int dnet_cmd_backend_control(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data); int dnet_cmd_backend_status(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data); -/* - * Fills \a status of backend with \a backend_id without any locks - */ -void backend_fill_status_nolock(struct dnet_node *node, struct dnet_backend_status *status, size_t backend_id); - -/* - * Locks backend with \a backend_id state mutex and fills \a status - */ -void backend_fill_status(struct dnet_node *node, struct dnet_backend_status *status, size_t backend_id); +struct dnet_backend_io *dnet_get_backend_io(struct dnet_io *io, size_t backend_id); #ifdef __cplusplus } diff --git a/library/common.cpp b/library/common.cpp index 4826b13c4..68646d222 100644 --- a/library/common.cpp +++ b/library/common.cpp @@ -1,6 +1,32 @@ #include "elliptics.h" -size_t dnet_backend_info_list_count(dnet_backend_info_list *backends) +int dnet_get_backend_ids(const dnet_backend_info_manager *backends, size_t **backend_ids, size_t *num_backend_ids) { - return backends ? backends->backends.size() : 0; + if (!backends) + return -EINVAL; + + auto config_backends = backends->get_all_backends(); + *num_backend_ids = config_backends.size(); + *backend_ids = reinterpret_cast(malloc(*num_backend_ids * sizeof(size_t))); + if (!*backend_ids) + return -ENOMEM; + + for (size_t i = 0; i < *num_backend_ids; ++i) { + (*backend_ids)[i] = config_backends[i]->backend_id; + } + + return 0; +} + +struct dnet_backend_io *dnet_get_backend_io(struct dnet_io *io, size_t backend_id) +{ + struct dnet_backend_io *backend_io = nullptr; + + pthread_rwlock_rdlock(&io->backends_lock); + if (backend_id < io->backends_count) { + backend_io = io->backends[backend_id]; + } + pthread_rwlock_unlock(&io->backends_lock); + + return backend_io; } diff --git a/library/elliptics.h b/library/elliptics.h index 19480cf10..737ea59d6 100644 --- a/library/elliptics.h +++ b/library/elliptics.h @@ -430,6 +430,7 @@ struct dnet_work_io { int thread_index; uint64_t trans; pthread_t tid; + int joined; struct dnet_work_pool *pool; }; @@ -502,9 +503,9 @@ struct dnet_io { struct dnet_net_io *net; - struct dnet_backend_io *backends; + struct dnet_backend_io **backends; size_t backends_count; - pthread_mutex_t backends_lock; + pthread_rwlock_t backends_lock; struct dnet_io_pool pool; @@ -519,6 +520,7 @@ struct dnet_io { int dnet_state_accept_process(struct dnet_net_state *st, struct epoll_event *ev); int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg); void *dnet_io_process(void *data_); +int dnet_server_backend_init(struct dnet_node *n, size_t backend_id); int dnet_server_io_init(struct dnet_node *n); /* Set need_exit flag, stop and join pool threads */ void dnet_io_stop(struct dnet_node *n); @@ -539,7 +541,7 @@ struct dnet_config_data { int daemon_mode; int parallel_start; - dnet_backend_info_list *backends; + dnet_backend_info_manager *backends; }; struct dnet_config_data *dnet_config_data_create(); diff --git a/library/pool.c b/library/pool.c index d8b2ec7df..69e37ed28 100644 --- a/library/pool.c +++ b/library/pool.c @@ -55,7 +55,10 @@ static void dnet_work_pool_stop(struct dnet_work_pool_place *place) for (i = 0; i < place->pool->num; ++i) { wio = &place->pool->wio_list[i]; - pthread_join(wio->tid, NULL); + if (!wio->joined) { + pthread_join(wio->tid, NULL); + wio->joined = 1; + } } pthread_mutex_unlock(&place->lock); @@ -120,6 +123,7 @@ static int dnet_work_pool_grow(struct dnet_node *n, struct dnet_work_pool *pool, wio->thread_index = i; wio->pool = pool; wio->trans = ~0ULL; + wio->joined = 0; INIT_LIST_HEAD(&wio->reply_list); INIT_LIST_HEAD(&wio->request_list); @@ -282,6 +286,7 @@ static void dnet_update_trans_timestamp_network(struct dnet_io_req *r) void dnet_schedule_io(struct dnet_node *n, struct dnet_io_req *r) { + struct dnet_backend_io *backend_io = NULL; struct dnet_work_pool_place *place = NULL; struct dnet_work_pool_place *backend_place = NULL; struct dnet_work_pool *pool = NULL; @@ -314,8 +319,12 @@ void dnet_schedule_io(struct dnet_node *n, struct dnet_io_req *r) else if (dnet_cmd_needs_backend(cmd->cmd)) backend_id = dnet_state_search_backend(n, &cmd->id); - if (backend_id >= 0 && backend_id < (ssize_t)n->io->backends_count) { - io_pool = &n->io->backends[backend_id].pool; + if (backend_id >= 0) { + backend_io = dnet_get_backend_io(n->io, backend_id); + } + + if (backend_io) { + io_pool = &backend_io->pool; if (nonblocking) { place = &io_pool->recv_pool_nb; } else { @@ -822,15 +831,19 @@ static int dnet_check_io(struct dnet_io *io) { uint64_t list_size = 0; uint64_t threads_count = 0; + size_t i; dnet_check_io_pool(&io->pool, &list_size, &threads_count); + pthread_rwlock_rdlock(&io->backends_lock); if (io->backends) { - size_t i; for (i = 0; i < io->backends_count; ++i) { - dnet_check_io_pool(&io->backends[i].pool, &list_size, &threads_count); + if (io->backends[i]) { + dnet_check_io_pool(&io->backends[i]->pool, &list_size, &threads_count); + } } } + pthread_rwlock_unlock(&io->backends_lock); if (list_size <= threads_count * 1000) return 1; @@ -1098,7 +1111,7 @@ int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg) goto err_out_free_mutex; } - err = pthread_mutex_init(&n->io->backends_lock, NULL); + err = pthread_rwlock_init(&n->io->backends_lock, NULL); if (err) { err = -err; goto err_out_free_cond; @@ -1172,7 +1185,7 @@ int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg) err_out_cleanup_recv_place: dnet_work_pool_place_cleanup(&n->io->pool.recv_pool); err_out_free_backends_lock: - pthread_mutex_destroy(&n->io->backends_lock); + pthread_rwlock_destroy(&n->io->backends_lock); err_out_free_cond: pthread_cond_destroy(&n->io->full_wait); err_out_free_mutex: @@ -1184,46 +1197,129 @@ int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg) return err; } +int dnet_server_backend_init(struct dnet_node *n, size_t backend_id) +{ + int err; + const size_t new_count = backend_id + 1; + struct dnet_backend_io **backends_tmp; + struct dnet_backend_io *io; + + pthread_rwlock_wrlock(&n->io->backends_lock); + if (new_count > n->io->backends_count || !n->io->backends[backend_id]) { + io = calloc(1, sizeof(struct dnet_backend_io)); + if (!io) { + err = -ENOMEM; + goto err_out_unlock; + } + + io->backend_id = backend_id; + + err = dnet_work_pool_place_init(&io->pool.recv_pool); + if (err) { + goto err_out_free; + } + + err = dnet_work_pool_place_init(&io->pool.recv_pool_nb); + if (err) { + dnet_work_pool_place_cleanup(&io->pool.recv_pool); + goto err_out_free; + } + + if (new_count > n->io->backends_count) { + backends_tmp = realloc(n->io->backends, new_count * sizeof(struct dnet_backend_io *)); + if (backends_tmp) { + memset(backends_tmp + n->io->backends_count, 0, + (new_count - n->io->backends_count) * sizeof(struct dnet_backend_io *)); + n->io->backends = backends_tmp; + n->io->backends_count = new_count; + } else { + err = -ENOMEM; + goto err_out_free; + } + } + + n->io->backends[backend_id] = io; + } + pthread_rwlock_unlock(&n->io->backends_lock); + + return 0; + +err_out_free: + free(io); +err_out_unlock: + pthread_rwlock_unlock(&n->io->backends_lock); + return err; +} + int dnet_server_io_init(struct dnet_node *n) { int err; size_t j = 0, k = 0; + size_t *backend_ids, num_backend_ids; + size_t backends_count; + + err = dnet_get_backend_ids(n->config_data->backends, &backend_ids, &num_backend_ids); + if (err) { + goto err_out_exit; + } + + backends_count = backend_ids[0]; + for (j = 1; j < num_backend_ids; ++j) { + if (backend_ids[j] > backends_count) + backends_count = backend_ids[j]; + } + ++backends_count; - n->io->backends_count = dnet_backend_info_list_count(n->config_data->backends); - n->io->backends = calloc(n->io->backends_count, sizeof(struct dnet_backend_io)); + n->io->backends_count = backends_count; + n->io->backends = calloc(backends_count, sizeof(struct dnet_backend_io *)); if (!n->io->backends) { err = -ENOMEM; - goto err_out_exit; + goto err_out_free_backend_ids; } - for (j = 0; j < n->io->backends_count; ++j) { - struct dnet_backend_io *io = &n->io->backends[j]; - io->backend_id = j; + for (j = 0; j < num_backend_ids; ++j) { + struct dnet_backend_io *io = calloc(1, sizeof(struct dnet_backend_io)); + if (!io) { + err = -ENOMEM; + goto err_out_free_backends_io; + } + + io->backend_id = backend_ids[j]; err = dnet_work_pool_place_init(&io->pool.recv_pool); if (err) { + free(io); goto err_out_free_backends_io; } err = dnet_work_pool_place_init(&io->pool.recv_pool_nb); if (err) { + free(io); dnet_work_pool_place_cleanup(&io->pool.recv_pool); goto err_out_free_backends_io; } + + n->io->backends[io->backend_id] = io; } + + free(backend_ids); return 0; err_out_free_backends_io: for (k = 0; k < j; ++k) { - struct dnet_backend_io *io = &n->io->backends[k]; - io->need_exit = 1; + struct dnet_backend_io *backend_io = n->io->backends[backend_ids[k]]; + backend_io->need_exit = 1; } for (k = 0; k < j; ++k) { - struct dnet_backend_io *io = &n->io->backends[k]; - dnet_work_pool_exit(&io->pool.recv_pool); - dnet_work_pool_exit(&io->pool.recv_pool_nb); + struct dnet_backend_io *backend_io = n->io->backends[backend_ids[k]]; + dnet_work_pool_exit(&backend_io->pool.recv_pool); + dnet_work_pool_exit(&backend_io->pool.recv_pool_nb); + free(backend_io); } free(n->io->backends); + n->io->backends = NULL; +err_out_free_backend_ids: + free(backend_ids); err_out_exit: return err; } @@ -1237,11 +1333,13 @@ void dnet_io_stop(struct dnet_node *n) dnet_set_need_exit(n); for (j = 0; j < n->io->backends_count; ++j) { - struct dnet_backend_io *backend_io = &n->io->backends[j]; - backend_io->need_exit = 1; + struct dnet_backend_io *backend_io = n->io->backends[j]; + if (backend_io) { + backend_io->need_exit = 1; + } } - for (i=0; inet_thread_num; ++i) { + for (i = 0; i < io->net_thread_num; ++i) { pthread_join(io->net[i].tid, NULL); close(io->net[i].epoll_fd); } @@ -1250,11 +1348,13 @@ void dnet_io_stop(struct dnet_node *n) dnet_work_pool_stop(&io->pool.recv_pool); for (j = 0; j < io->backends_count; ++j) { - struct dnet_backend_io *backend_io = &io->backends[j]; - if (backend_io->pool.recv_pool.pool) - dnet_work_pool_stop(&backend_io->pool.recv_pool); - if (backend_io->pool.recv_pool_nb.pool) - dnet_work_pool_stop(&backend_io->pool.recv_pool_nb); + struct dnet_backend_io *backend_io = io->backends[j]; + if (backend_io) { + if (backend_io->pool.recv_pool.pool) + dnet_work_pool_stop(&backend_io->pool.recv_pool); + if (backend_io->pool.recv_pool_nb.pool) + dnet_work_pool_stop(&backend_io->pool.recv_pool_nb); + } } } @@ -1270,14 +1370,21 @@ void dnet_io_cleanup(struct dnet_node *n) dnet_work_pool_place_cleanup(&io->pool.recv_pool); for (i = 0; i < io->backends_count; ++i) { - struct dnet_backend_io *backend_io = &io->backends[i]; - if (backend_io->pool.recv_pool.pool) - dnet_work_pool_cleanup(&backend_io->pool.recv_pool); - if (backend_io->pool.recv_pool_nb.pool) - dnet_work_pool_cleanup(&backend_io->pool.recv_pool_nb); - dnet_work_pool_place_cleanup(&backend_io->pool.recv_pool_nb); - dnet_work_pool_place_cleanup(&backend_io->pool.recv_pool); + struct dnet_backend_io *backend_io = io->backends[i]; + if (backend_io) { + if (backend_io->pool.recv_pool.pool) + dnet_work_pool_cleanup(&backend_io->pool.recv_pool); + if (backend_io->pool.recv_pool_nb.pool) + dnet_work_pool_cleanup(&backend_io->pool.recv_pool_nb); + dnet_work_pool_place_cleanup(&backend_io->pool.recv_pool_nb); + dnet_work_pool_place_cleanup(&backend_io->pool.recv_pool); + + free(backend_io); + io->backends[i] = NULL; + } } + free(io->backends); + io->backends = NULL; dnet_io_cleanup_states(n); diff --git a/library/server.c b/library/server.c index b8f7ffe99..79c2b097f 100644 --- a/library/server.c +++ b/library/server.c @@ -231,13 +231,13 @@ void dnet_server_node_destroy(struct dnet_node *n) */ dnet_monitor_exit(n); - dnet_node_cleanup_common_resources(n); - dnet_route_list_destroy(n->route); n->route = NULL; dnet_backend_cleanup_all(n); + dnet_node_cleanup_common_resources(n); + dnet_srw_cleanup(n); dnet_counter_destroy(n); diff --git a/monitor/backends_stat_provider.cpp b/monitor/backends_stat_provider.cpp index 8de99e9fc..12dda883c 100644 --- a/monitor/backends_stat_provider.cpp +++ b/monitor/backends_stat_provider.cpp @@ -40,17 +40,17 @@ backends_stat_provider::backends_stat_provider(struct dnet_node *node) */ static void fill_backend_backend(rapidjson::Value &stat_value, rapidjson::Document::AllocatorType &allocator, - const struct dnet_backend_io &backend, - const dnet_backend_info &config_backend) { + const struct dnet_backend_io *backend, + const dnet_backend_info *config_backend) { char *json_stat = NULL; size_t size = 0; - struct dnet_backend_callbacks *cb = backend.cb; + struct dnet_backend_callbacks *cb = backend->cb; if (cb->storage_stat_json) { cb->storage_stat_json(cb->command_private, &json_stat, &size); if (json_stat && size) { rapidjson::Document backend_value(&allocator); backend_value.Parse<0>(json_stat); - backend_value["config"].AddMember("group", config_backend.group, allocator); + backend_value["config"].AddMember("group", config_backend->group, allocator); stat_value.AddMember("backend", static_cast(backend_value), allocator); @@ -69,18 +69,18 @@ static void dump_list_stats(rapidjson::Value &stat, list_stat &list_stats, rapid */ static void fill_backend_io(rapidjson::Value &stat_value, rapidjson::Document::AllocatorType &allocator, - const struct dnet_backend_io &backend) { + const struct dnet_backend_io *backend) { rapidjson::Value io_value(rapidjson::kObjectType); struct list_stat stats; rapidjson::Value blocking_stat(rapidjson::kObjectType); - dnet_get_pool_list_stats(backend.pool.recv_pool.pool, &stats); + dnet_get_pool_list_stats(backend->pool.recv_pool.pool, &stats); dump_list_stats(blocking_stat, stats, allocator); io_value.AddMember("blocking", blocking_stat, allocator); rapidjson::Value nonblocking_stat(rapidjson::kObjectType); - dnet_get_pool_list_stats(backend.pool.recv_pool_nb.pool, &stats); + dnet_get_pool_list_stats(backend->pool.recv_pool_nb.pool, &stats); dump_list_stats(nonblocking_stat, stats, allocator); io_value.AddMember("nonblocking", nonblocking_stat, allocator); @@ -92,9 +92,9 @@ static void fill_backend_io(rapidjson::Value &stat_value, */ static void fill_backend_cache(rapidjson::Value &stat_value, rapidjson::Document::AllocatorType &allocator, - const struct dnet_backend_io &backend) { - if (backend.cache) { - ioremap::cache::cache_manager *cache = (ioremap::cache::cache_manager *)backend.cache; + const struct dnet_backend_io *backend) { + if (backend->cache) { + ioremap::cache::cache_manager *cache = (ioremap::cache::cache_manager *)backend->cache; rapidjson::Document caches_value(&allocator); caches_value.Parse<0>(cache->stat_json().c_str()); stat_value.AddMember("cache", @@ -110,8 +110,8 @@ static void fill_backend_status(rapidjson::Value &stat_value, rapidjson::Document::AllocatorType &allocator, struct dnet_node *node, dnet_backend_status &status, - size_t backend_id) { - backend_fill_status_nolock(node, &status, backend_id); + const dnet_backend_info *config_backend) { + backend_fill_status_nolock(node, &status, config_backend); rapidjson::Value status_value(rapidjson::kObjectType); status_value.AddMember("state", status.state, allocator); @@ -143,20 +143,20 @@ static void fill_backend_status(rapidjson::Value &stat_value, */ static void fill_disabled_backend_config(rapidjson::Value &stat_value, rapidjson::Document::AllocatorType &allocator, - const dnet_backend_info &config_backend) { + const dnet_backend_info *config_backend) { rapidjson::Value backend_value(rapidjson::kObjectType); /* If config template provides API for serializing parsed config values to json - use it */ - if (config_backend.config_template.to_json) { + if (config_backend->config_template.to_json) { char *json_stat = NULL; size_t size = 0; - dnet_config_backend config = config_backend.config_template; + dnet_config_backend config = config_backend->config_template; std::vector data(config.size, '\0'); config.data = data.data(); - config.log = config_backend.log.get(); + config.log = config_backend->log.get(); - for (auto it = config_backend.options.begin(); it != config_backend.options.end(); ++it) { + for (auto it = config_backend->options.begin(); it != config_backend->options.end(); ++it) { const dnet_backend_config_entry &entry = *it; entry.entry->callback(&config, entry.entry->key, entry.value_template.data()); } @@ -165,7 +165,7 @@ static void fill_disabled_backend_config(rapidjson::Value &stat_value, if (json_stat && size) { rapidjson::Document config_value(&allocator); config_value.Parse<0>(json_stat); - config_value.AddMember("group", config_backend.group, allocator); + config_value.AddMember("group", config_backend->group, allocator); backend_value.AddMember("config", static_cast(config_value), allocator); @@ -174,13 +174,13 @@ static void fill_disabled_backend_config(rapidjson::Value &stat_value, config.cleanup(&config); } else { rapidjson::Value config_value(rapidjson::kObjectType); - for (auto it = config_backend.options.begin(); it != config_backend.options.end(); ++it) { + for (auto it = config_backend->options.begin(); it != config_backend->options.end(); ++it) { const dnet_backend_config_entry &entry = *it; rapidjson::Value tmp_val(entry.value_template.data(), allocator); config_value.AddMember(entry.entry->key, tmp_val, allocator); } - config_value.AddMember("group", config_backend.group, allocator); + config_value.AddMember("group", config_backend->group, allocator); backend_value.AddMember("config_template", config_value, allocator); } @@ -194,32 +194,34 @@ static rapidjson::Value& backend_stats_json(uint64_t categories, rapidjson::Value &stat_value, rapidjson::Document::AllocatorType &allocator, struct dnet_node *node, - size_t backend_id) { + const dnet_backend_info *config_backend) { dnet_backend_status status; memset(&status, 0, sizeof(status)); - const auto &config_backend = node->config_data->backends->backends[backend_id]; - + auto backend_id = config_backend->backend_id; stat_value.AddMember("backend_id", backend_id, allocator); - fill_backend_status(stat_value, allocator, node, status, backend_id); + fill_backend_status(stat_value, allocator, node, status, config_backend); + struct dnet_backend_io *backend_io = nullptr; if (status.state == DNET_BACKEND_ENABLED && node->io) { - const struct dnet_backend_io & backend = node->io->backends[backend_id]; + backend_io = dnet_get_backend_io(node->io, backend_id); + } + if (backend_io) { if (categories & DNET_MONITOR_COMMANDS) { - const command_stats *stats = (command_stats *)(backend.command_stats); + const command_stats *stats = (command_stats *)(backend_io->command_stats); rapidjson::Value commands_value(rapidjson::kObjectType); stat_value.AddMember("commands", stats->commands_report(NULL, commands_value, allocator), allocator); } if (categories & DNET_MONITOR_BACKEND) { - fill_backend_backend(stat_value, allocator, backend, config_backend); + fill_backend_backend(stat_value, allocator, backend_io, config_backend); } if (categories & DNET_MONITOR_IO) { - fill_backend_io(stat_value, allocator, backend); + fill_backend_io(stat_value, allocator, backend_io); } if (categories & DNET_MONITOR_CACHE) { - fill_backend_cache(stat_value, allocator, backend); + fill_backend_cache(stat_value, allocator, backend_io); } } else if (categories & DNET_MONITOR_BACKEND) { fill_disabled_backend_config(stat_value, allocator, config_backend); @@ -228,10 +230,6 @@ static rapidjson::Value& backend_stats_json(uint64_t categories, return stat_value; } -static bool backend_check_state_nolock(struct dnet_node *node, size_t backend_id) { - return node->config_data->backends->backends[backend_id].state != DNET_BACKEND_UNITIALIZED; -} - /* * Fills all section of all backends */ @@ -239,16 +237,19 @@ static void backends_stats_json(uint64_t categories, rapidjson::Value &stat_value, rapidjson::Document::AllocatorType &allocator, struct dnet_node *node) { - const auto &backends = node->config_data->backends->backends; - for (size_t i = 0; i < backends.size(); ++i) { - std::lock_guard guard(*node->config_data->backends->backends[i].state_mutex); - if (!backend_check_state_nolock(node, i)) - continue; - rapidjson::Value backend_stat(rapidjson::kObjectType); - stat_value.AddMember(std::to_string(static_cast(i)).c_str(), - allocator, - backend_stats_json(categories, backend_stat, allocator, node, i), - allocator); + auto backends = node->config_data->backends->get_all_backends(); + for (auto &backend_ptr : backends) { + auto config_backend = backend_ptr.get(); + auto backend_id = config_backend->backend_id; + + std::lock_guard guard(*config_backend->state_mutex); + if (config_backend->state != DNET_BACKEND_UNITIALIZED) { + rapidjson::Value backend_stat(rapidjson::kObjectType); + stat_value.AddMember(std::to_string(static_cast(backend_id)).c_str(), + allocator, + backend_stats_json(categories, backend_stat, allocator, node, config_backend), + allocator); + } } } diff --git a/tests/backends_test.cpp b/tests/backends_test.cpp index f42d2e65e..4ed71ccc8 100644 --- a/tests/backends_test.cpp +++ b/tests/backends_test.cpp @@ -46,7 +46,11 @@ static server_config default_value(int group) server.backends[0]("enable", false)("group", group); - server.backends.resize(backends_count, server.backends.front()); + server.backends.resize(backends_count + 1, server.backends.front()); + + auto &hidden_backend = server.backends.back(); + hidden_backend("enable", true); + hidden_backend.set_serializable(false); return server; } @@ -116,20 +120,19 @@ static void test_enable_at_start(session &sess) } } -static void test_enable_backend(session &sess) +static void test_enable_backend(session &sess, uint32_t backend_id) { server_node &node = global_data->nodes[0]; std::string host = node.remote().to_string(); - auto tuple = std::make_tuple(host, 0, 1); + auto tuple = std::make_tuple(host, 0, backend_id); auto unique_hosts = get_unique_hosts(sess); BOOST_REQUIRE_MESSAGE(unique_hosts.find(tuple) == unique_hosts.end(), - "Host must not exist: " + host + ", group: 0, backend: 1"); + "Host must not exist: " + host + ", group: 0, backend: " + std::to_string(static_cast(backend_id))); - ELLIPTICS_REQUIRE(enable_result, sess.enable_backend(node.remote(), 1)); - BOOST_REQUIRE_EQUAL(enable_result.get().size(), 1); + ELLIPTICS_REQUIRE(enable_result, sess.enable_backend(node.remote(), backend_id)); // Wait 0.1 secs to ensure that route list was changed usleep(100 * 1000); @@ -137,7 +140,7 @@ static void test_enable_backend(session &sess) unique_hosts = get_unique_hosts(sess); BOOST_REQUIRE_MESSAGE(unique_hosts.find(tuple) != unique_hosts.end(), - "Host must exist: " + host + ", group: 0, backend: 1"); + "Host must exist: " + host + ", group: 0, backend: " + std::to_string(static_cast(backend_id))); } static void test_backend_status(session &sess) @@ -224,6 +227,46 @@ static void test_enable_backend_at_empty_node(session &sess) "Host must exist: " + host + ", group: 2, backend: 1"); } +static void test_enable_backend_after_config_change(session &sess) +{ + server_node &node = global_data->nodes[0]; + + server_config &config = node.config(); + config_data &hidden_backend = config.backends.back(); + uint32_t backend_id = std::stoi(hidden_backend.string_value("backend_id")); + + ELLIPTICS_REQUIRE_ERROR(enable_result, sess.enable_backend(node.remote(), backend_id), -ENOENT); + + hidden_backend.set_serializable(true); + config.write(node.config_path()); + + test_enable_backend(sess, backend_id); +} + +static void test_remove_backend(session &sess) { + server_node &node = global_data->nodes[0]; + + + server_config &config = node.config(); + config_data &last_backend = config.backends.back(); + const uint32_t backend_id = std::stoi(last_backend.string_value("backend_id")); + + ELLIPTICS_REQUIRE(enable_result, sess.remove_backend(node.remote(), backend_id)); + + /* Request all backends status and check that removed backend is missed */ + ELLIPTICS_REQUIRE(async_status_result, sess.request_backends_status(node.remote())); + sync_backend_status_result result = async_status_result; + + BOOST_REQUIRE_EQUAL(result.size(), 1); + + backend_status_result_entry entry = result.front(); + + for (size_t i = 0; i < backends_count; ++i) { + auto status = entry.backend(i); + BOOST_REQUIRE_NE(status->backend_id, backend_id); + } +} + static void test_direct_backend(session &sess) { const key id = std::string("direct-backend-test"); @@ -444,12 +487,14 @@ static void test_change_group(session &sess) bool register_tests(test_suite *suite, node n) { ELLIPTICS_TEST_CASE(test_enable_at_start, create_session(n, { 1, 2, 3 }, 0, 0)); - ELLIPTICS_TEST_CASE(test_enable_backend, create_session(n, { 1, 2, 3 }, 0, 0)); + ELLIPTICS_TEST_CASE(test_enable_backend, create_session(n, { 1, 2, 3 }, 0, 0), 1); ELLIPTICS_TEST_CASE(test_backend_status, create_session(n, { 1, 2, 3 }, 0, 0)); ELLIPTICS_TEST_CASE(test_enable_backend_again, create_session(n, { 1, 2, 3 }, 0, 0)); ELLIPTICS_TEST_CASE(test_disable_backend, create_session(n, { 1, 2, 3 }, 0, 0)); ELLIPTICS_TEST_CASE(test_disable_backend_again, create_session(n, { 1, 2, 3 }, 0, 0)); ELLIPTICS_TEST_CASE(test_enable_backend_at_empty_node, create_session(n, { 1, 2, 3 }, 0, 0)); + ELLIPTICS_TEST_CASE(test_enable_backend_after_config_change, create_session(n, { 1, 2, 3 }, 0, 0)); + ELLIPTICS_TEST_CASE(test_remove_backend, create_session(n, { 1, 2, 3 }, 0, 0)); ELLIPTICS_TEST_CASE(test_direct_backend, create_session(n, { 0 }, 0, 0)); ELLIPTICS_TEST_CASE(test_set_backend_ids_for_disabled, create_session(n, { 0 }, 0, 0)); ELLIPTICS_TEST_CASE(test_set_backend_ids_for_enabled, create_session(n, { 0 }, 0, 0)); diff --git a/tests/cache_test.cpp b/tests/cache_test.cpp index 7f8ff714f..971ee80b4 100644 --- a/tests/cache_test.cpp +++ b/tests/cache_test.cpp @@ -84,7 +84,8 @@ static void test_cache_timestamp(session &sess) static void test_cache_records_sizes(session &sess) { dnet_node *node = global_data->nodes[0].get_native(); - ioremap::cache::cache_manager *cache = (ioremap::cache::cache_manager*) node->io->backends[0].cache; + dnet_backend_io *backend_io = dnet_get_backend_io(node->io, 0); + ioremap::cache::cache_manager *cache = reinterpret_cast(backend_io->cache); const size_t cache_size = cache->cache_size(); const size_t cache_pages_number = cache->cache_pages_number(); argument_data data("0"); @@ -117,7 +118,8 @@ static void test_cache_records_sizes(session &sess) static void test_cache_overflow(session &sess) { dnet_node *node = global_data->nodes[0].get_native(); - ioremap::cache::cache_manager *cache = (ioremap::cache::cache_manager*) node->io->backends[0].cache; + dnet_backend_io *backend_io = dnet_get_backend_io(node->io, 0); + ioremap::cache::cache_manager *cache = reinterpret_cast(backend_io->cache); const size_t cache_size = cache->cache_size(); const size_t cache_pages_number = cache->cache_pages_number(); argument_data data("0"); @@ -236,7 +238,8 @@ void cache_read_check_lru(session &sess, int id, lru_list_emulator_t& lru_list_e static void test_cache_lru_eviction(session &sess) { dnet_node *node = global_data->nodes[0].get_native(); - ioremap::cache::cache_manager *cache = (ioremap::cache::cache_manager*) node->io->backends[0].cache; + dnet_backend_io *backend_io = dnet_get_backend_io(node->io, 0); + ioremap::cache::cache_manager *cache = reinterpret_cast(backend_io->cache); const size_t cache_size = cache->cache_size(); const size_t cache_pages_number = cache->cache_pages_number(); diff --git a/tests/server_send.cpp b/tests/server_send.cpp index 174662e56..0f8575a1e 100644 --- a/tests/server_send.cpp +++ b/tests/server_send.cpp @@ -289,7 +289,7 @@ static void ssend_test_server_send(session &s, int num, const std::string &id_pr static void ssend_test_set_delay(session &s, const std::vector &groups, uint64_t delay) { struct backend { dnet_addr addr; - int backend_id; + uint32_t backend_id; bool operator<(const backend &other) const { if (auto cmp = dnet_addr_cmp(&addr, &other.addr)) diff --git a/tests/test_base.cpp b/tests/test_base.cpp index ddfe75265..0ab821fc3 100644 --- a/tests/test_base.cpp +++ b/tests/test_base.cpp @@ -105,6 +105,11 @@ void create_directory(const std::string &path) #ifndef NO_SERVER +config_data::config_data() +: m_serializable(true) +{ +} + config_data &config_data::operator() (const std::string &name, const std::vector &value) { return (*this)(name, variant(value)); @@ -168,6 +173,16 @@ std::string config_data::string_value(const std::string &name) const return value ? boost::apply_visitor(stringify_visitor(), *value) : std::string(); } +void config_data::set_serializable(bool serializable) +{ + m_serializable = serializable; +} + +bool config_data::is_serializable() const +{ + return m_serializable; +} + config_data::const_iterator config_data::cbegin() const { return m_data.cbegin(); @@ -380,6 +395,9 @@ void server_config::write(const std::string &path) backends_json.SetArray(); for (auto it = backends.begin(); it != backends.end(); ++it) { + if (!it->is_serializable()) + continue; + rapidjson::Value backend; backend.SetObject(); @@ -562,7 +580,7 @@ std::string server_node::config_path() const return m_path; } -server_config server_node::config() const +server_config &server_node::config() { return m_config; } @@ -816,13 +834,6 @@ nodes_data::ptr start_nodes(start_nodes_config &start_config) { create_directory(server_path); - for (size_t j = 0; j < config.backends.size(); ++j) { - std::string prefix = server_path + "/" + boost::lexical_cast(j); - create_directory(prefix); - create_directory(prefix + "/history"); - create_directory(prefix + "/blob"); - } - std::vector remotes; for (size_t j = 0; j < start_config.configs.size(); ++j) { if (j == i) @@ -871,7 +882,11 @@ nodes_data::ptr start_nodes(start_nodes_config &start_config) { ; for (size_t i = 0; i < config.backends.size(); ++i) { - std::string prefix = server_path + "/" + boost::lexical_cast(i); + std::string prefix = server_path + '/' + std::to_string(i); + create_directory(prefix); + create_directory(prefix + "/history"); + create_directory(prefix + "/blob"); + config.backends[i] ("history", prefix + "/history") ("data", prefix + "/blob") diff --git a/tests/test_base.hpp b/tests/test_base.hpp index e4da52336..296dbcff7 100644 --- a/tests/test_base.hpp +++ b/tests/test_base.hpp @@ -135,6 +135,8 @@ class config_data typedef std::vector > container_t; public: + config_data(); + config_data &operator() (const std::string &name, const std::vector &value); config_data &operator() (const std::string &name, const std::string &value); config_data &operator() (const std::string &name, const char *value); @@ -146,6 +148,9 @@ class config_data bool has_value(const std::string &name) const; std::string string_value(const std::string &name) const; + void set_serializable(bool serializable); + bool is_serializable() const; + typedef container_t::const_iterator const_iterator; const_iterator cbegin() const; const_iterator cend() const; @@ -154,6 +159,8 @@ class config_data config_data &operator() (const std::string &name, const variant &value); const variant *value_impl(const std::string &name) const; + // it is used to prevent serialization of this object to a config (json) + bool m_serializable; container_t m_data; friend class server_config; }; @@ -193,7 +200,7 @@ class server_node bool is_stopped() const; std::string config_path() const; - server_config config() const; + server_config &config(); address remote() const; int monitor_port() const;