26#include <boost/enable_shared_from_this.hpp>
37namespace ph = std::placeholders;
42const size_t BUF_SIZE = 32768;
49typedef boost::shared_ptr<Connection> ConnectionPtr;
55class Connection :
public boost::enable_shared_from_this<Connection> {
77 const boost::shared_ptr<UnixDomainSocket>& socket,
78 ConnectionPool& connection_pool,
81 : io_service_(io_service), socket_(socket), timeout_timer_(io_service_),
82 timeout_(timeout), buf_(), response_(),
83 connection_pool_(connection_pool), feed_(), watch_socket_(),
84 use_external_(use_external), defer_shutdown_(false) {
87 .arg(socket_->getNative());
92 watch_socket_.reset(
new util::WatchSocket());
108 timeout_timer_.cancel();
112 void scheduleTimer() {
113 timeout_timer_.setup(std::bind(&Connection::timeoutHandler,
this),
124 if (defer_shutdown_) {
125 io_service_->post(std::bind([](ConnectionPtr c) { c->stop(); }, shared_from_this()));
130 .arg(socket_->getNative());
137 std::string watch_error;
138 if (!watch_socket_->closeSocket(watch_error)) {
145 timeout_timer_.cancel();
160 socket_->asyncReceive(&buf_[0],
sizeof(buf_),
161 std::bind(&Connection::receiveHandler,
162 shared_from_this(), ph::_1, ph::_2));
173 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
174 socket_->asyncSend(&response_[0], chunk_size,
175 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
182 watch_socket_->markReady();
183 }
catch (
const std::exception& ex) {
202 void receiveHandler(
const boost::system::error_code& ec,
203 size_t bytes_transferred);
213 void sendHandler(
const boost::system::error_code& ec,
214 size_t bytes_transferred);
220 void timeoutHandler();
228 boost::shared_ptr<UnixDomainSocket> socket_;
231 IntervalTimer timeout_timer_;
237 std::array<char, BUF_SIZE> buf_;
240 std::string response_;
243 ConnectionPool& connection_pool_;
258 bool defer_shutdown_;
262class ConnectionPool {
268 void start(
const ConnectionPtr& connection) {
269 connection->doReceive();
270 connections_.insert(connection);
276 void stop(
const ConnectionPtr& connection) {
279 connections_.erase(connection);
280 }
catch (
const std::exception& ex) {
288 for (
auto const& conn : connections_) {
291 connections_.clear();
297 std::set<ConnectionPtr> connections_;
302Connection::terminate() {
306 }
catch (
const std::exception& ex) {
313Connection::receiveHandler(
const boost::system::error_code& ec,
314 size_t bytes_transferred) {
316 if (ec.value() == boost::asio::error::eof) {
317 std::stringstream os;
318 if (feed_.getProcessedText().empty()) {
319 os <<
"no input data to discard";
321 os <<
"discarding partial command of "
322 << feed_.getProcessedText().size() <<
" bytes";
328 .arg(socket_->getNative()).arg(os.str());
329 }
else if (ec.value() != boost::asio::error::operation_aborted) {
331 .arg(ec.value()).arg(socket_->getNative());
334 connection_pool_.stop(shared_from_this());
337 }
else if (bytes_transferred == 0) {
339 connection_pool_.stop(shared_from_this());
344 .arg(bytes_transferred).arg(socket_->getNative());
355 feed_.postBuffer(&buf_[0], bytes_transferred);
358 if (feed_.needData()) {
364 if (feed_.feedOk()) {
365 cmd = feed_.toElement();
367 defer_shutdown_ =
true;
369 std::unique_ptr<Connection, void(*)(Connection*)> p(
this, [](Connection* p) { p->defer_shutdown_ =
false; });
373 timeout_timer_.cancel();
382 isc_throw(BadValue, feed_.getErrorMessage());
385 }
catch (
const Exception& ex) {
393 .arg(cmd ? cmd->str() :
"unknown");
395 "internal server error: no response generated");
405 response_ = rsp->str();
412 connection_pool_.stop(shared_from_this());
416Connection::sendHandler(
const boost::system::error_code& ec,
417 size_t bytes_transferred) {
422 watch_socket_->clearReady();
423 }
catch (
const std::exception& ex) {
431 if (ec.value() != boost::asio::error::operation_aborted) {
433 .arg(socket_->getNative()).arg(ec.message());
444 response_.erase(0, bytes_transferred);
447 .arg(bytes_transferred).arg(response_.size())
448 .arg(socket_->getNative());
451 if (!response_.empty()) {
462 connection_pool_.stop(shared_from_this());
466Connection::timeoutHandler() {
468 .arg(socket_->getNative());
473 }
catch (
const std::exception& ex) {
475 .arg(socket_->getNative())
479 std::stringstream os;
480 os <<
"Connection over control channel timed out";
481 if (!feed_.getProcessedText().empty()) {
482 os <<
", discarded partial command of "
483 << feed_.getProcessedText().size() <<
" bytes";
487 response_ = rsp->str();
570 <<
config->getPosition() <<
")");
573 for (
auto const& socket :
config->listValue()) {
579 if (
data.second->usable_) {
583 data.second->usable_ =
false;
600 auto it =
sockets_.find(cmd_config->getSocketName());
603 it->second->usable_ =
true;
609 std::string lock_name = cmd_config->getLockName();
610 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
612 std::string errmsg = strerror(errno);
614 << lock_name <<
", : " << errmsg);
619 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
621 std::string errmsg = strerror(errno);
624 << lock_name <<
", : " << errmsg);
629 static_cast<void>(::remove(cmd_config->getSocketName().c_str()));
632 .arg(cmd_config->getSocketName());
635 socket_info->config_ = cmd_config;
636 socket_info->lock_fd_ = lock_fd;
642 socket_info->acceptor_->open(endpoint);
650 socket_info->acceptor_->bind(endpoint);
652 socket_info->acceptor_->listen();
660 }
catch (
const std::exception& ex) {
662 static_cast<void>(::remove(cmd_config->getLockName().c_str()));
666 sockets_[cmd_config->getSocketName()] = socket_info;
673 if (
info->acceptor_ &&
info->acceptor_->isOpen()) {
677 info->acceptor_->close();
678 static_cast<void>(::remove(
info->config_->getSocketName().c_str()));
679 static_cast<void>(::remove(
info->config_->getLockName().c_str()));
687 if (
info->lock_fd_ != -1) {
688 close(
info->lock_fd_);
691 auto it =
sockets_.find(
info->config_->getSocketName());
719 info->acceptor_->asyncAccept(*
info->socket_,
720 [
this,
info](
const boost::system::error_code& ec) {
723 ConnectionPtr connection(new Connection(io_service_, info->socket_,
727 connection_pool_.start(connection);
729 }
else if (ec.value() != boost::asio::error::operation_aborted) {
730 LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
731 .arg(info->acceptor_->getNative()).arg(ec.message());
735 if (ec.value() != boost::asio::error::operation_aborted) {
745 auto const& it =
sockets_.find(
info->config_->getSocketName());
747 return (it->second->acceptor_->getNative());
750 return (
sockets_.begin()->second->acceptor_->getNative());
760 impl_->openCommandSocket(
config);
765 impl_->openCommandSockets(
config);
770 impl_->closeCommandSocket(
info);
775 impl_->closeCommandSockets();
780 return (impl_->getControlSocketFD(
info));
785 static UnixCommandMgr cmd_mgr;
791 impl_->io_service_ = io_service;
796 impl_->timeout_ = timeout;
801 impl_->use_external_ = use_external;
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
Implements acceptor service for UnixDomainSocket.
Endpoint for UnixDomainSocket.
Represents unix domain socket implemented in terms of boost asio.
An exception indicating that specified socket parameters are invalid.
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
An exception indicating a problem with socket operation.
UNIX command config aka UNIX control socket info class.
Implementation of the UnixCommandMgr.
void closeCommandSocket(UnixSocketInfoPtr info)
Shuts down any open unix control sockets.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
long timeout_
Connection timeout.
ConnectionPool connection_pool_
Pool of connections.
bool use_external_
Use external sockets flag.
void doAccept(UnixSocketInfoPtr info)
Asynchronously accepts next connection.
void closeCommandSockets(bool remove=true)
Shuts down any open unix control sockets.
std::map< std::string, UnixSocketInfoPtr > sockets_
The UNIX socket data (configuration, acceptor, etc.).
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
UnixCommandMgrImpl()
Constructor.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
int getControlSocketFD(UnixSocketInfoPtr info)
Returns unix control socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
int getControlSocketFD(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Returns unix control socket descriptor.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void closeCommandSockets()
Shuts down any open unix control sockets.
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void closeCommandSocket(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Shuts down any open unix control sockets.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
To be removed. Please use ConfigError instead.
void deleteExternalSocket(int socketfd)
Deletes external socket.
static IfaceMgr & instance()
IfaceMgr is a singleton class.
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
RAII device to relax umask (adding group write for sockets).
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
const isc::log::MessageID COMMAND_PROCESS_ERROR1
boost::shared_ptr< UnixSocketInfo > UnixSocketInfoPtr
Pointer to a UnixSocketInfo object.
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
boost::shared_ptr< UnixCommandConfig > UnixCommandConfigPtr
Pointer to a UnixCommandConfig object.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
ElementPtr copy(ConstElementPtr from, unsigned level)
Copy the data up to a nesting level.
boost::shared_ptr< const Element > ConstElementPtr
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Structure used to store UNIX connection data.
Defines the class, WatchSocket.