Kea 3.0.3-git
unix_command_mgr.cc
Go to the documentation of this file.
1// Copyright (C) 2015-2026 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
11#include <asiolink/io_service.h>
15#include <config/command_mgr.h>
18#include <cc/data.h>
20#include <cc/json_feed.h>
21#include <dhcp/iface_mgr.h>
22#include <config/config_log.h>
23#include <config/timeouts.h>
24#include <util/filesystem.h>
25#include <util/watch_socket.h>
26#include <boost/enable_shared_from_this.hpp>
27#include <array>
28#include <functional>
29#include <unistd.h>
30#include <sys/file.h>
31
32using namespace isc;
33using namespace isc::asiolink;
34using namespace isc::config;
35using namespace isc::data;
36using namespace isc::dhcp;
37namespace ph = std::placeholders;
38
39namespace {
40
42const size_t BUF_SIZE = 32768;
43
44class ConnectionPool;
45
46class Connection;
47
49typedef boost::shared_ptr<Connection> ConnectionPtr;
50
55class Connection : public boost::enable_shared_from_this<Connection> {
56public:
57
76 Connection(const IOServicePtr& io_service,
77 const boost::shared_ptr<UnixDomainSocket>& socket,
78 ConnectionPool& connection_pool,
79 const long timeout,
80 bool use_external)
81 : io_service_(io_service), socket_(socket), timeout_timer_(io_service_),
82 timeout_(timeout), buf_(), response_(),
83 connection_pool_(connection_pool), feed_(), watch_socket_(),
84 use_external_(use_external), defer_shutdown_(false) {
85
87 .arg(socket_->getNative());
88
89 // Callback value of 0 is used to indicate that callback function is
90 // not installed.
91 if (use_external_) {
92 watch_socket_.reset(new util::WatchSocket());
93 IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
94 IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
95 }
96
97 // Initialize state model for receiving and preparsing commands.
98 feed_.initModel();
99
100 // Start timer for detecting timeouts.
101 scheduleTimer();
102 }
103
107 ~Connection() {
108 timeout_timer_.cancel();
109 }
110
112 void scheduleTimer() {
113 timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
114 timeout_, IntervalTimer::ONE_SHOT);
115 }
116
123 void stop() {
124 if (defer_shutdown_) {
125 io_service_->post(std::bind([](ConnectionPtr c) { c->stop(); }, shared_from_this()));
126 return;
127 }
128
130 .arg(socket_->getNative());
131
132 if (use_external_) {
133 IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
134 IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
135
136 // Close watch socket and log errors if occur.
137 std::string watch_error;
138 if (!watch_socket_->closeSocket(watch_error)) {
140 .arg(watch_error);
141 }
142 }
143
144 socket_->close();
145 timeout_timer_.cancel();
146 }
147
152 void terminate();
153
159 void doReceive() {
160 socket_->asyncReceive(&buf_[0], sizeof(buf_),
161 std::bind(&Connection::receiveHandler,
162 shared_from_this(), ph::_1, ph::_2));
163 }
164
172 void doSend() {
173 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
174 socket_->asyncSend(&response_[0], chunk_size,
175 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
176
177 if (use_external_) {
178 // Asynchronous send has been scheduled and we need to indicate this
179 // to break the synchronous select(). The handler should clear this
180 // status when invoked.
181 try {
182 watch_socket_->markReady();
183 } catch (const std::exception& ex) {
185 .arg(ex.what());
186 }
187 }
188 }
189
198 //
202 void receiveHandler(const boost::system::error_code& ec,
203 size_t bytes_transferred);
204
213 void sendHandler(const boost::system::error_code& ec,
214 size_t bytes_transferred);
215
220 void timeoutHandler();
221
222private:
223
225 IOServicePtr io_service_;
226
228 boost::shared_ptr<UnixDomainSocket> socket_;
229
231 IntervalTimer timeout_timer_;
232
234 long timeout_;
235
237 std::array<char, BUF_SIZE> buf_;
238
240 std::string response_;
241
243 ConnectionPool& connection_pool_;
244
247 JSONFeed feed_;
248
251 util::WatchSocketPtr watch_socket_;
252
254 bool use_external_;
255
258 bool defer_shutdown_;
259};
260
262class ConnectionPool {
263public:
264
268 void start(const ConnectionPtr& connection) {
269 connection->doReceive();
270 connections_.insert(connection);
271 }
272
276 void stop(const ConnectionPtr& connection) {
277 try {
278 connection->stop();
279 connections_.erase(connection);
280 } catch (const std::exception& ex) {
282 .arg(ex.what());
283 }
284 }
285
287 void stopAll() {
288 for (auto const& conn : connections_) {
289 conn->stop();
290 }
291 connections_.clear();
292 }
293
294private:
295
297 std::set<ConnectionPtr> connections_;
298
299};
300
301void
302Connection::terminate() {
303 try {
304 socket_->shutdown();
305
306 } catch (const std::exception& ex) {
308 .arg(ex.what());
309 }
310}
311
312void
313Connection::receiveHandler(const boost::system::error_code& ec,
314 size_t bytes_transferred) {
315 if (ec) {
316 if (ec.value() == boost::asio::error::eof) {
317 std::stringstream os;
318 if (feed_.getProcessedText().empty()) {
319 os << "no input data to discard";
320 } else {
321 os << "discarding partial command of "
322 << feed_.getProcessedText().size() << " bytes";
323 }
324
325 // Foreign host has closed the connection. We should remove it from the
326 // connection pool.
328 .arg(socket_->getNative()).arg(os.str());
329 } else if (ec.value() != boost::asio::error::operation_aborted) {
331 .arg(ec.value()).arg(socket_->getNative());
332 }
333
334 connection_pool_.stop(shared_from_this());
335 return;
336
337 } else if (bytes_transferred == 0) {
338 // Nothing received. Close the connection.
339 connection_pool_.stop(shared_from_this());
340 return;
341 }
342
344 .arg(bytes_transferred).arg(socket_->getNative());
345
346 // Reschedule the timer because the transaction is ongoing.
347 scheduleTimer();
348
349 ConstElementPtr cmd;
350 ConstElementPtr rsp;
351
352 try {
353 // Received some data over the socket. Append them to the JSON feed
354 // to see if we have reached the end of command.
355 feed_.postBuffer(&buf_[0], bytes_transferred);
356 feed_.poll();
357 // If we haven't yet received the full command, continue receiving.
358 if (feed_.needData()) {
359 doReceive();
360 return;
361 }
362
363 // Received entire command. Parse the command into JSON.
364 if (feed_.feedOk()) {
365 cmd = feed_.toElement();
366
367 defer_shutdown_ = true;
368
369 std::unique_ptr<Connection, void(*)(Connection*)> p(this, [](Connection* p) { p->defer_shutdown_ = false; });
370
371 // Cancel the timer to make sure that long lasting command
372 // processing doesn't cause the timeout.
373 timeout_timer_.cancel();
374
375 // If successful, then process it as a command.
377
378 } else {
379 // Failed to parse command as JSON or process the received command.
380 // This exception will be caught below and the error response will
381 // be sent.
382 isc_throw(BadValue, feed_.getErrorMessage());
383 }
384
385 } catch (const Exception& ex) {
387 rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
388 }
389
390 // No response generated. Connection will be closed.
391 if (!rsp) {
393 .arg(cmd ? cmd->str() : "unknown");
395 "internal server error: no response generated");
396
397 } else {
398
399 // Reschedule the timer as it may be either canceled or need to be
400 // updated to not timeout before we manage to the send the reply.
401 scheduleTimer();
402
403 // Let's convert JSON response to text. Note that at this stage
404 // the rsp pointer is always set.
405 response_ = rsp->str();
406
407 doSend();
408 return;
409 }
410
411 // Close the connection if we have sent the entire response.
412 connection_pool_.stop(shared_from_this());
413}
414
415void
416Connection::sendHandler(const boost::system::error_code& ec,
417 size_t bytes_transferred) {
418 if (use_external_) {
419 // Clear the watch socket so as the future send operation can mark it
420 // again to interrupt the synchronous select() call.
421 try {
422 watch_socket_->clearReady();
423 } catch (const std::exception& ex) {
425 .arg(ex.what());
426 }
427 }
428
429 if (ec) {
430 // If an error occurred, log this error and stop the connection.
431 if (ec.value() != boost::asio::error::operation_aborted) {
433 .arg(socket_->getNative()).arg(ec.message());
434 }
435
436 } else {
437
438 // Reschedule the timer because the transaction is ongoing.
439 scheduleTimer();
440
441 // No error. We are in a process of sending a response. Need to
442 // remove the chunk that we have managed to sent with the previous
443 // attempt.
444 response_.erase(0, bytes_transferred);
445
447 .arg(bytes_transferred).arg(response_.size())
448 .arg(socket_->getNative());
449
450 // Check if there is any data left to be sent and sent it.
451 if (!response_.empty()) {
452 doSend();
453 return;
454 }
455
456 // Gracefully shutdown the connection and close the socket if
457 // we have sent the whole response.
458 terminate();
459 }
460
461 // All data sent or an error has occurred. Close the connection.
462 connection_pool_.stop(shared_from_this());
463}
464
465void
466Connection::timeoutHandler() {
468 .arg(socket_->getNative());
469
470 try {
471 socket_->cancel();
472
473 } catch (const std::exception& ex) {
475 .arg(socket_->getNative())
476 .arg(ex.what());
477 }
478
479 std::stringstream os;
480 os << "Connection over control channel timed out";
481 if (!feed_.getProcessedText().empty()) {
482 os << ", discarded partial command of "
483 << feed_.getProcessedText().size() << " bytes";
484 }
485
487 response_ = rsp->str();
488 doSend();
489}
490
491}
492
493namespace isc {
494namespace config {
495
498public:
499
504
511
520
525
529 void closeCommandSockets(bool remove = true);
530
535
544
548
550 ConnectionPool connection_pool_;
551
553 std::map<std::string, UnixSocketInfoPtr> sockets_;
554
557
560};
561
562void
564 if (!config) {
565 isc_throw(BadSocketInfo, "Missing config parameters, can't create socket.");
566 }
567
568 if (config->getType() != Element::list) {
569 isc_throw(DhcpConfigError, "expected list type ("
570 << config->getPosition() << ")");
571 }
572
573 for (auto const& socket : config->listValue()) {
574 openCommandSocket(socket);
575 }
576
577 auto copy = sockets_;
578 for (auto const& data : copy) {
579 if (data.second->usable_) {
580 // If the connection can be used (just created) or reused, keep it
581 // in the list and clear the flag. It will be marked again on next
582 // configuration event if needed.
583 data.second->usable_ = false;
584 } else {
585 // If the connection can not be reused, stop it and remove it from the list.
586 closeCommandSocket(data.second);
587 }
588 }
589}
590
591void
593 if (!config) {
594 isc_throw(BadSocketInfo, "Missing config parameters, can't create socket.");
595 }
596
598
599 // Search for the specific connection and reuse the existing one if found.
600 auto it = sockets_.find(cmd_config->getSocketName());
601 if (it != sockets_.end()) {
602 // If the connection can be reused, mark it as usable.
603 it->second->usable_ = true;
604 return;
605 }
606
607 // Connection not found so it needs to be created.
608 // First let's open lock file.
609 std::string lock_name = cmd_config->getLockName();
610 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
611 if (lock_fd == -1) {
612 std::string errmsg = strerror(errno);
613 isc_throw(SocketError, "cannot create socket lockfile, "
614 << lock_name << ", : " << errmsg);
615 }
616
617 // Try to acquire lock. If we can't somebody else is actively
618 // using it.
619 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
620 if (ret != 0) {
621 std::string errmsg = strerror(errno);
622 close(lock_fd);
623 isc_throw(SocketError, "cannot lock socket lockfile, "
624 << lock_name << ", : " << errmsg);
625 }
626
627 // We have the lock, so let's remove the pre-existing socket
628 // file if it exists.
629 static_cast<void>(::remove(cmd_config->getSocketName().c_str()));
630
632 .arg(cmd_config->getSocketName());
633
634 UnixSocketInfoPtr socket_info(new UnixSocketInfo());
635 socket_info->config_ = cmd_config;
636 socket_info->lock_fd_ = lock_fd;
637
638 try {
639 // Start asynchronous acceptor service.
640 socket_info->acceptor_.reset(new UnixDomainSocketAcceptor(io_service_));
641 UnixDomainSocketEndpoint endpoint(cmd_config->getSocketName());
642 socket_info->acceptor_->open(endpoint);
643 // Kea umask (0027) is too restrictive: the bind() system call
644 // creates the socket file with 0777 (or on some systems fchmod
645 // provided value) masked by the current umask. The 'connect'
646 // system call requires write access to socket file.
647 // So relaxing the umask to allow group write access.
648 {
650 socket_info->acceptor_->bind(endpoint);
651 }
652 socket_info->acceptor_->listen();
653 if (use_external_) {
654 // Install this socket in Interface Manager.
655 IfaceMgr::instance().addExternalSocket(socket_info->acceptor_->getNative(), 0);
656 }
657
658 doAccept(socket_info);
659
660 } catch (const std::exception& ex) {
661 close(lock_fd);
662 static_cast<void>(::remove(cmd_config->getLockName().c_str()));
663 isc_throw(SocketError, ex.what());
664 }
665
666 sockets_[cmd_config->getSocketName()] = socket_info;
667}
668
669void
671 if (info) {
672 // Close acceptor if the acceptor is open.
673 if (info->acceptor_ && info->acceptor_->isOpen()) {
674 if (use_external_) {
675 IfaceMgr::instance().deleteExternalSocket(info->acceptor_->getNative());
676 }
677 info->acceptor_->close();
678 static_cast<void>(::remove(info->config_->getSocketName().c_str()));
679 static_cast<void>(::remove(info->config_->getLockName().c_str()));
680 }
681
682 // Stop all connections which can be closed. The only connection that won't
683 // be closed is the one over which we have received a request to reconfigure
684 // the server. This connection will be held until the UnixCommandMgr
685 // responds to such request.
686 connection_pool_.stopAll();
687 if (info->lock_fd_ != -1) {
688 close(info->lock_fd_);
689 info->lock_fd_ = -1;
690 }
691 auto it = sockets_.find(info->config_->getSocketName());
692 if (it != sockets_.end()) {
693 sockets_.erase(it);
694 }
695 try {
696 io_service_->pollOne();
697 } catch (...) {
698 }
699 } else {
700 closeCommandSockets(false);
701 }
702}
703
704void
706 auto copy = sockets_;
707 for (auto const& data : copy) {
708 closeCommandSocket(data.second);
709 }
710 if (remove) {
711 sockets_.clear();
712 }
713}
714
715void
717 // Create a socket into which the acceptor will accept new connection.
718 info->socket_.reset(new UnixDomainSocket(io_service_));
719 info->acceptor_->asyncAccept(*info->socket_,
720 [this, info](const boost::system::error_code& ec) {
721 if (!ec) {
722 // New connection is arriving. Start asynchronous transmission.
723 ConnectionPtr connection(new Connection(io_service_, info->socket_,
724 connection_pool_,
725 timeout_,
726 use_external_));
727 connection_pool_.start(connection);
728
729 } else if (ec.value() != boost::asio::error::operation_aborted) {
730 LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
731 .arg(info->acceptor_->getNative()).arg(ec.message());
732 }
733
734 // Unless we're stopping the service, start accepting connections again.
735 if (ec.value() != boost::asio::error::operation_aborted) {
736 doAccept(info);
737 }
738 });
739}
740
741int
743 // Return the most recent listener or null.
744 if (info) {
745 auto const& it = sockets_.find(info->config_->getSocketName());
746 if (it != sockets_.end()) {
747 return (it->second->acceptor_->getNative());
748 }
749 } else if (sockets_.size()) {
750 return (sockets_.begin()->second->acceptor_->getNative());
751 }
752 return (-1);
753}
754
755UnixCommandMgr::UnixCommandMgr() : impl_(new UnixCommandMgrImpl()) {
756}
757
758void
762
763void
767
768void
770 impl_->closeCommandSocket(info);
771}
772
773void
775 impl_->closeCommandSockets();
776}
777
778int
780 return (impl_->getControlSocketFD(info));
781}
782
785 static UnixCommandMgr cmd_mgr;
786 return (cmd_mgr);
787}
788
789void
791 impl_->io_service_ = io_service;
792}
793
794void
796 impl_->timeout_ = timeout;
797}
798
799void
801 impl_->use_external_ = use_external;
802}
803
804} // end of isc::config
805} // end of isc
if(!(yy_init))
@ list
Definition data.h:159
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
An exception indicating that specified socket parameters are invalid.
Definition command_mgr.h:17
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
An exception indicating a problem with socket operation.
UNIX command config aka UNIX control socket info class.
Implementation of the UnixCommandMgr.
void closeCommandSocket(UnixSocketInfoPtr info)
Shuts down any open unix control sockets.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
long timeout_
Connection timeout.
ConnectionPool connection_pool_
Pool of connections.
bool use_external_
Use external sockets flag.
void doAccept(UnixSocketInfoPtr info)
Asynchronously accepts next connection.
void closeCommandSockets(bool remove=true)
Shuts down any open unix control sockets.
std::map< std::string, UnixSocketInfoPtr > sockets_
The UNIX socket data (configuration, acceptor, etc.).
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
int getControlSocketFD(UnixSocketInfoPtr info)
Returns unix control socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
int getControlSocketFD(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Returns unix control socket descriptor.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void closeCommandSockets()
Shuts down any open unix control sockets.
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void closeCommandSocket(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Shuts down any open unix control sockets.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
To be removed. Please use ConfigError instead.
void deleteExternalSocket(int socketfd)
Deletes external socket.
Definition iface_mgr.cc:352
static IfaceMgr & instance()
IfaceMgr is a singleton class.
Definition iface_mgr.cc:54
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Definition iface_mgr.cc:329
RAII device to relax umask (adding group write for sockets).
Definition filesystem.h:85
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition macros.h:32
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition macros.h:20
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition macros.h:26
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition macros.h:14
const isc::log::MessageID COMMAND_PROCESS_ERROR1
boost::shared_ptr< UnixSocketInfo > UnixSocketInfoPtr
Pointer to a UnixSocketInfo object.
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
boost::shared_ptr< UnixCommandConfig > UnixCommandConfigPtr
Pointer to a UnixCommandConfig object.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Definition timeouts.h:17
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const int DBG_COMMAND
Definition config_log.h:24
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
Definition config_log.h:21
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
ElementPtr copy(ConstElementPtr from, unsigned level)
Copy the data up to a nesting level.
Definition data.cc:1517
boost::shared_ptr< const Element > ConstElementPtr
Definition data.h:30
@ info
Definition db_log.h:120
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Structure used to store UNIX connection data.
Defines the class, WatchSocket.