kea-custom-hooks
FeM custom hooks libraries for Kea DHCP
AdminDBClient.cpp
Go to the documentation of this file.
1#include <cassert>
2#include <chrono>
3#include <ctime>
4#include <log/logger.h>
5#include <memory>
6#include <sstream>
7#include <thread>
8
9#include <pqxx/pqxx>
10#include <utility>
11
12#include <log/macros.h>
13
14#include "AdminDBClient.hpp"
15#include "common_vars.hpp"
16#include "log_admindb_host_reservation_importer.h"
17#include "util/pqxx_compat.hpp"
18
19namespace
20{
21STMT_NAME_T STMT_FETCH_HOST_UPDATES{"kch-ahri-fetch-host-updates"};
22STMT_NAME_T STMT_FETCH_FULL_DHCP_TABLE{"kch-ahri-fetch-full-dhcp-table"};
23STMT_NAME_T STMT_FETCH_MAX_UPDATE_ID{"kch-ahri-fetch-max-update-id"};
24STMT_NAME_T STMT_PRUNE_OLD_HOST_UPDATES{"kch-ahri-prune-old-host-updates"};
25} // namespace
26
27namespace ahri
28{
29class AdminDBClientNotificationHandler : public pqxx::notification_receiver
30{
31public:
32 AdminDBClientNotificationHandler(pqxx::connection& conn,
34 : pqxx::notification_receiver(conn, AdminDBClient::NOTIFICATION_CHANNEL_NAME)
35 , callback(std::move(cb))
36 {}
37
38 void operator()([[maybe_unused]] const std::string& payload,
39 [[maybe_unused]] int backend_pid) final
40 {
41 callback();
42 }
43
44private:
46
47 friend struct AdminDBClientPrivate;
48};
49
51{
52 AdminDBClientPrivate(std::string psql_connstring, size_t initial_last_processed_update_id)
54 , last_returned_update_id(initial_last_processed_update_id)
55 {
56 reconnect();
57 }
58
63
65 {
67 std::unique_lock lck{psql_connection_mtx};
68 // Better keep this order, it seems to prevent the notification receiver thread not joining
69 // properly.
70#if KCH_PQXX_MAJOR_VERSION < 7
71 psql_connection->disconnect();
72#else
73 psql_connection->close();
74#endif
75 active_notification_handler.reset(nullptr);
78 }
79
80 lck.unlock();
81 }
82
83 void reconnect()
84 {
85 std::lock_guard lck{psql_connection_mtx};
86 psql_connection = std::make_unique<pqxx::connection>(psql_connstring);
87
88 // Inner join might cause the result to become unordered, so we need to order explicitly
89 psql_connection->prepare(
90 STMT_FETCH_HOST_UPDATES,
91 "SELECT u.*, r.t_id * 10000 + r.t_vlan_id AS t_kea_subnet_id "
92 "FROM admindb_dhcp_updates_get() u "
93 "INNER JOIN admindb_get_all_vlan_iprange() r ON u.t_subnet_id = r.t_id "
94 "WHERE u.t_id > $1 "
95 "ORDER BY u.t_id ASC;");
96 psql_connection->prepare(
97 STMT_FETCH_FULL_DHCP_TABLE,
98 "SELECT v.t_ip, v.t_mac, v.t_subnet_id * 10000 + v.t_vlan_id as t_subnet_id "
99 "FROM admindb_view_kea_dhcp() v;");
100 psql_connection->prepare(STMT_FETCH_MAX_UPDATE_ID,
101 "SELECT MAX(t_id) FROM admindb_dhcp_updates_get();");
102 psql_connection->prepare(
103 STMT_PRUNE_OLD_HOST_UPDATES,
104 "SELECT admindb_dhcp_updates_cleanup_before(($1 :: TIMESTAMPTZ) :: TIMESTAMP)");
105
108 }
109 }
110
112 {
114 std::make_unique<AdminDBClientNotificationHandler>(*psql_connection, cb);
116 }
117
119 {
120 while (true) {
121 try {
122 // Account for failover durations in case one of the AdminDB masters has
123 // problems
124 std::this_thread::sleep_for(std::chrono::minutes(1));
125 reconnect();
126 LOG_INFO(logger, AHRI_CONNECTION_RESTORED).arg("AdminDB");
127 break;
128 }
129 catch (pqxx::broken_connection& e) {
130 LOG_WARN(logger, AHRI_CONNECTION_FAILURE).arg("AdminDB").arg(e.what());
131 }
132 catch (pqxx::failure& e) {
133 // Before we know how to handle every failure properly, resort to
134 // halting operation, keeping the DHCP server operational without
135 // causing further issues
136 LOG_FATAL(logger, AHRI_PQXX_FAILURE_HALT).arg("AdminDB reconnect").arg(e.what());
138 break;
139 }
140 }
141 }
142
144 {
146 return;
147 }
148
150 std::make_unique<std::thread>([this]() { notification_receiver_worker(); });
151 }
152
154 {
155 LOG_DEBUG(logger, 10, AHRI_DEBUG_NOTIFICATION_RECEIVER_STARTED);
157 try {
158 std::unique_lock lck{psql_connection_mtx};
159 psql_connection->await_notification(
160 std::chrono::duration_cast<std::chrono::seconds>(
162 .count(),
163 0);
164 lck.unlock();
165 std::this_thread::sleep_for(AdminDBClient::NOTIFICATION_RECEIVER_SLEEP);
166 }
167 catch (pqxx::broken_connection& e) {
168 LOG_WARN(logger, AHRI_BROKEN_CONNECTION).arg("AdminDB");
171 }
172 }
173 catch (pqxx::failure& e) {
174 // Before we know how to handle every failure properly, resort to halting operation,
175 // keeping the DHCP server operational without causing further issues
176 LOG_FATAL(logger, AHRI_PQXX_FAILURE_HALT)
177 .arg("AdminDB notification receiver")
178 .arg(e.what());
179 break;
180 }
181 }
182 }
183
184 const std::string psql_connstring;
185 std::unique_ptr<pqxx::connection> psql_connection{nullptr};
186 std::recursive_mutex psql_connection_mtx;
187
188 std::unique_ptr<AdminDBClientNotificationHandler> active_notification_handler;
190 std::unique_ptr<std::thread> notification_receiver_thread;
191
193
194 isc::log::Logger logger{"ahri"};
195};
196
197AdminDBClient::AdminDBClient(std::string_view host, std::string_view port, std::string_view user,
198 std::string_view password, std::string_view database,
199 size_t last_processed_host_update_id)
200{
201 std::ostringstream connstr;
202 connstr << "postgresql://" << user << ':' << password << '@' << host << ':' << port << '/'
203 << database;
204 p_impl = std::make_unique<AdminDBClientPrivate>(connstr.str(), last_processed_host_update_id);
205}
206
208{
209 p_impl.reset(nullptr);
210}
211
213{
214 std::unique_lock lck{p_impl->psql_connection_mtx};
215 p_impl->handle_notifications_int(cb);
216}
217
218void AdminDBClient::prune_old_host_updates(std::chrono::days prune_before)
219{
220 const auto prune_date = std::chrono::system_clock::now() - prune_before;
221 const std::time_t prune_date_time_t = std::chrono::system_clock::to_time_t(prune_date);
222 struct tm t = {};
223 gmtime_r(&prune_date_time_t, &t);
224 char prune_date_str[] = "yyyy-mm-dd 88:88:88";
225 const std::size_t written = strftime(prune_date_str, sizeof(prune_date_str), "%F %T", &t);
226 assert(written + 1 == sizeof(prune_date_str));
227
228 std::lock_guard lck{p_impl->psql_connection_mtx};
229 pqxx::work transaction{*p_impl->psql_connection};
230 // Add +00:00 to the timestamp to indicate we are using UTC
231 transaction.exec_prepared1(STMT_PRUNE_OLD_HOST_UPDATES,
232 prune_date_str + std::string{" +00:00"});
233 transaction.commit();
234}
235
236std::vector<AdminDBClient::HostUpdate> AdminDBClient::fetch_all_host_reservations()
237{
238 std::unique_lock lck{p_impl->psql_connection_mtx};
239 LOG_DEBUG(p_impl->logger, 50, AHRI_FULL_SYNC_START_TRANSACTION);
240 pqxx::work transaction{*p_impl->psql_connection};
241
242 LOG_DEBUG(p_impl->logger, 50, AHRI_FULL_SYNC_START_QUERY);
243 auto fetch_res = transaction.exec_prepared(STMT_FETCH_FULL_DHCP_TABLE);
244 std::vector<HostUpdate> table;
245 auto max_incremental_update_id = transaction.exec_prepared1(STMT_FETCH_MAX_UPDATE_ID)
246 .at(0)
248
249 LOG_DEBUG(p_impl->logger, 50, AHRI_FULL_SYNC_BUILD_RESERVATIONS);
250 for (const auto& row : fetch_res) {
252 auto ip = isc::asiolink::IOAddress(row.at("t_ip").as<std::string>());
253 auto mac = isc::dhcp::HWAddr::fromText(row.at("t_mac").as<std::string>());
254 auto subnet_id = row.at("t_subnet_id").as<size_t>(UNDEFINED_SUBNET_ID);
255
256 if (ip.isV6()) {
257 LOG_DEBUG(p_impl->logger, 70, AHRI_DEBUG_IGNORE_HOST_RESERVATION)
258 .arg(mac.toText(false))
259 .arg(ip.toText())
260 .arg("IPv6 address");
261 continue;
262 }
263 assert(ip.isV4());
264
265 table.push_back(
266 {update_type, std::move(ip), std::move(mac), subnet_id, max_incremental_update_id});
267 }
268
269 p_impl->last_returned_update_id = max_incremental_update_id;
270
271 LOG_DEBUG(p_impl->logger, 50, AHRI_FULL_SYNC_COMMIT_TRANSACTION);
272 transaction.commit();
273 return table;
274}
275
276std::vector<AdminDBClient::HostUpdate> AdminDBClient::fetch_pending_host_updates()
277{
278 std::unique_lock lck{p_impl->psql_connection_mtx};
279 pqxx::work transaction{*p_impl->psql_connection};
280
281 auto fetch_res =
282 transaction.exec_prepared(STMT_FETCH_HOST_UPDATES, p_impl->last_returned_update_id);
283 std::vector<HostUpdate> updates;
284 size_t last_update_id = p_impl->last_returned_update_id;
285
286 for (const auto& row : fetch_res) {
287 auto update_id = row.at("t_id").as<size_t>();
288 HostUpdate::Type update_type =
289 host_update_type_from_string(row.at("t_action").as<std::string>());
290 auto ip = isc::asiolink::IOAddress(row.at("t_ip").as<std::string>());
291 auto mac = isc::dhcp::HWAddr::fromText(row.at("t_mac").as<std::string>());
292 auto subnet_id = row.at("t_kea_subnet_id").as<size_t>();
293
294 if (ip.isV6()) {
295 LOG_DEBUG(p_impl->logger, 70, AHRI_DEBUG_IGNORE_HOST_RESERVATION)
296 .arg(mac.toText(false))
297 .arg(ip.toText())
298 .arg("IPv6 address");
299 continue;
300 }
301 assert(ip.isV4());
302
303 assert(update_id > last_update_id);
304 updates.push_back({update_type, std::move(ip), std::move(mac), subnet_id, update_id});
305 last_update_id = update_id;
306 }
307
308 p_impl->last_returned_update_id = last_update_id;
309
310 transaction.commit();
311 return updates;
312}
313
315{
316 std::unique_lock lck{p_impl->psql_connection_mtx};
317 pqxx::work transaction{*p_impl->psql_connection};
318 auto res = transaction.exec_prepared1(STMT_FETCH_MAX_UPDATE_ID);
319
320 transaction.commit();
321 return res.at(0).as<size_t>();
322}
323} // namespace ahri
void operator()(const std::string &payload, int backend_pid) final
AdminDBClientNotificationHandler(pqxx::connection &conn, AdminDBClient::dhcp_update_callback_t cb)
Client for interaction with a single AdminDB server.
TEST_VIRTUAL std::vector< HostUpdate > fetch_pending_host_updates()
Fetch all not-yet-processed host updates.
static constexpr const size_t UNDEFINED_SUBNET_ID
Subnet ID for an entry where the subnet ID is NULL.
static constexpr const size_t BELOW_LOWEST_INCREMENTAL_UPDATE_ID
TEST_VIRTUAL size_t fetch_latest_available_update_id()
Fetch the maximum available incremental update identifier.
std::function< void()> dhcp_update_callback_t
static constexpr const std::chrono::seconds NOTIFICATION_RECEIVER_TIMEOUT
static HostUpdate::Type host_update_type_from_string(std::string_view type_str)
TEST_VIRTUAL std::vector< HostUpdate > fetch_all_host_reservations()
Fetch a complete set of all host reservations from the AdminDB server.
static constexpr const std::chrono::milliseconds NOTIFICATION_RECEIVER_SLEEP
TEST_VIRTUAL void handle_notifications(const dhcp_update_callback_t &cb)
Handle notifications for the DHCP server by invoking the given function.
TEST_VIRTUAL void prune_old_host_updates(std::chrono::days prune_before=DEFAULT_PRUNE_BEFORE)
Prune old host updates from the AdminDB host update table.
TEST_VIRTUAL ~AdminDBClient()
AdminDBClient(std::string_view host, std::string_view port, std::string_view user, std::string_view password, std::string_view database, size_t last_processed_host_update_id=0)
Create the client and connect to the AdminDB server with the given connection settings.
Collection of common variables used throughout the library.
Compatibility macros and utilities for libpqxx < 7.
#define STMT_NAME_T
Definition: pqxx_compat.hpp:24
AdminDBClientPrivate & operator=(AdminDBClientPrivate &&)=delete
std::unique_ptr< pqxx::connection > psql_connection
AdminDBClientPrivate & operator=(const AdminDBClientPrivate &)=delete
std::recursive_mutex psql_connection_mtx
AdminDBClientPrivate(AdminDBClientPrivate &&)=delete
std::unique_ptr< std::thread > notification_receiver_thread
void handle_notifications_int(const AdminDBClient::dhcp_update_callback_t &cb)
AdminDBClientPrivate(const AdminDBClientPrivate &)=delete
const std::string psql_connstring
std::unique_ptr< AdminDBClientNotificationHandler > active_notification_handler
AdminDBClientPrivate(std::string psql_connstring, size_t initial_last_processed_update_id)