12#include <log/macros.h>
16#include "log_admindb_host_reservation_importer.h"
21STMT_NAME_T STMT_FETCH_HOST_UPDATES{
"kch-ahri-fetch-host-updates"};
22STMT_NAME_T STMT_FETCH_FULL_DHCP_TABLE{
"kch-ahri-fetch-full-dhcp-table"};
23STMT_NAME_T STMT_FETCH_MAX_UPDATE_ID{
"kch-ahri-fetch-max-update-id"};
24STMT_NAME_T STMT_PRUNE_OLD_HOST_UPDATES{
"kch-ahri-prune-old-host-updates"};
34 : pqxx::notification_receiver(conn,
AdminDBClient::NOTIFICATION_CHANNEL_NAME)
35 , callback(std::move(cb))
38 void operator()([[maybe_unused]]
const std::string& payload,
39 [[maybe_unused]]
int backend_pid)
final
70#if KCH_PQXX_MAJOR_VERSION < 7
90 STMT_FETCH_HOST_UPDATES,
91 "SELECT u.*, r.t_id * 10000 + r.t_vlan_id AS t_kea_subnet_id "
92 "FROM admindb_dhcp_updates_get() u "
93 "INNER JOIN admindb_get_all_vlan_iprange() r ON u.t_subnet_id = r.t_id "
95 "ORDER BY u.t_id ASC;");
97 STMT_FETCH_FULL_DHCP_TABLE,
98 "SELECT v.t_ip, v.t_mac, v.t_subnet_id * 10000 + v.t_vlan_id as t_subnet_id "
99 "FROM admindb_view_kea_dhcp() v;");
101 "SELECT MAX(t_id) FROM admindb_dhcp_updates_get();");
103 STMT_PRUNE_OLD_HOST_UPDATES,
104 "SELECT admindb_dhcp_updates_cleanup_before(($1 :: TIMESTAMPTZ) :: TIMESTAMP)");
114 std::make_unique<AdminDBClientNotificationHandler>(*
psql_connection, cb);
124 std::this_thread::sleep_for(std::chrono::minutes(1));
126 LOG_INFO(
logger, AHRI_CONNECTION_RESTORED).arg(
"AdminDB");
129 catch (pqxx::broken_connection& e) {
130 LOG_WARN(
logger, AHRI_CONNECTION_FAILURE).arg(
"AdminDB").arg(e.what());
132 catch (pqxx::failure& e) {
136 LOG_FATAL(
logger, AHRI_PQXX_FAILURE_HALT).arg(
"AdminDB reconnect").arg(e.what());
155 LOG_DEBUG(
logger, 10, AHRI_DEBUG_NOTIFICATION_RECEIVER_STARTED);
160 std::chrono::duration_cast<std::chrono::seconds>(
167 catch (pqxx::broken_connection& e) {
168 LOG_WARN(
logger, AHRI_BROKEN_CONNECTION).arg(
"AdminDB");
173 catch (pqxx::failure& e) {
176 LOG_FATAL(
logger, AHRI_PQXX_FAILURE_HALT)
177 .arg(
"AdminDB notification receiver")
198 std::string_view password, std::string_view database,
199 size_t last_processed_host_update_id)
201 std::ostringstream connstr;
202 connstr <<
"postgresql://" << user <<
':' << password <<
'@' << host <<
':' << port <<
'/'
204 p_impl = std::make_unique<AdminDBClientPrivate>(connstr.str(), last_processed_host_update_id);
209 p_impl.reset(
nullptr);
214 std::unique_lock lck{p_impl->psql_connection_mtx};
215 p_impl->handle_notifications_int(cb);
220 const auto prune_date = std::chrono::system_clock::now() - prune_before;
221 const std::time_t prune_date_time_t = std::chrono::system_clock::to_time_t(prune_date);
223 gmtime_r(&prune_date_time_t, &t);
224 char prune_date_str[] =
"yyyy-mm-dd 88:88:88";
225 const std::size_t written = strftime(prune_date_str,
sizeof(prune_date_str),
"%F %T", &t);
226 assert(written + 1 ==
sizeof(prune_date_str));
228 std::lock_guard lck{p_impl->psql_connection_mtx};
229 pqxx::work transaction{*p_impl->psql_connection};
231 transaction.exec_prepared1(STMT_PRUNE_OLD_HOST_UPDATES,
232 prune_date_str + std::string{
" +00:00"});
233 transaction.commit();
238 std::unique_lock lck{p_impl->psql_connection_mtx};
239 LOG_DEBUG(p_impl->logger, 50, AHRI_FULL_SYNC_START_TRANSACTION);
240 pqxx::work transaction{*p_impl->psql_connection};
242 LOG_DEBUG(p_impl->logger, 50, AHRI_FULL_SYNC_START_QUERY);
243 auto fetch_res = transaction.exec_prepared(STMT_FETCH_FULL_DHCP_TABLE);
244 std::vector<HostUpdate> table;
245 auto max_incremental_update_id = transaction.exec_prepared1(STMT_FETCH_MAX_UPDATE_ID)
249 LOG_DEBUG(p_impl->logger, 50, AHRI_FULL_SYNC_BUILD_RESERVATIONS);
250 for (
const auto& row : fetch_res) {
252 auto ip = isc::asiolink::IOAddress(row.at(
"t_ip").as<std::string>());
253 auto mac = isc::dhcp::HWAddr::fromText(row.at(
"t_mac").as<std::string>());
257 LOG_DEBUG(p_impl->logger, 70, AHRI_DEBUG_IGNORE_HOST_RESERVATION)
258 .arg(mac.toText(
false))
260 .arg(
"IPv6 address");
266 {update_type, std::move(ip), std::move(mac), subnet_id, max_incremental_update_id});
269 p_impl->last_returned_update_id = max_incremental_update_id;
271 LOG_DEBUG(p_impl->logger, 50, AHRI_FULL_SYNC_COMMIT_TRANSACTION);
272 transaction.commit();
278 std::unique_lock lck{p_impl->psql_connection_mtx};
279 pqxx::work transaction{*p_impl->psql_connection};
282 transaction.exec_prepared(STMT_FETCH_HOST_UPDATES, p_impl->last_returned_update_id);
283 std::vector<HostUpdate> updates;
284 size_t last_update_id = p_impl->last_returned_update_id;
286 for (
const auto& row : fetch_res) {
287 auto update_id = row.at(
"t_id").as<
size_t>();
290 auto ip = isc::asiolink::IOAddress(row.at(
"t_ip").as<std::string>());
291 auto mac = isc::dhcp::HWAddr::fromText(row.at(
"t_mac").as<std::string>());
292 auto subnet_id = row.at(
"t_kea_subnet_id").as<
size_t>();
295 LOG_DEBUG(p_impl->logger, 70, AHRI_DEBUG_IGNORE_HOST_RESERVATION)
296 .arg(mac.toText(
false))
298 .arg(
"IPv6 address");
303 assert(update_id > last_update_id);
304 updates.push_back({update_type, std::move(ip), std::move(mac), subnet_id, update_id});
305 last_update_id = update_id;
308 p_impl->last_returned_update_id = last_update_id;
310 transaction.commit();
316 std::unique_lock lck{p_impl->psql_connection_mtx};
317 pqxx::work transaction{*p_impl->psql_connection};
318 auto res = transaction.exec_prepared1(STMT_FETCH_MAX_UPDATE_ID);
320 transaction.commit();
321 return res.at(0).as<
size_t>();
void operator()(const std::string &payload, int backend_pid) final
AdminDBClientNotificationHandler(pqxx::connection &conn, AdminDBClient::dhcp_update_callback_t cb)
Client for interaction with a single AdminDB server.
TEST_VIRTUAL std::vector< HostUpdate > fetch_pending_host_updates()
Fetch all not-yet-processed host updates.
static constexpr const size_t UNDEFINED_SUBNET_ID
Subnet ID for an entry where the subnet ID is NULL.
static constexpr const size_t BELOW_LOWEST_INCREMENTAL_UPDATE_ID
TEST_VIRTUAL size_t fetch_latest_available_update_id()
Fetch the maximum available incremental update identifier.
std::function< void()> dhcp_update_callback_t
static constexpr const std::chrono::seconds NOTIFICATION_RECEIVER_TIMEOUT
static HostUpdate::Type host_update_type_from_string(std::string_view type_str)
TEST_VIRTUAL std::vector< HostUpdate > fetch_all_host_reservations()
Fetch a complete set of all host reservations from the AdminDB server.
static constexpr const std::chrono::milliseconds NOTIFICATION_RECEIVER_SLEEP
TEST_VIRTUAL void handle_notifications(const dhcp_update_callback_t &cb)
Handle notifications for the DHCP server by invoking the given function.
TEST_VIRTUAL void prune_old_host_updates(std::chrono::days prune_before=DEFAULT_PRUNE_BEFORE)
Prune old host updates from the AdminDB host update table.
TEST_VIRTUAL ~AdminDBClient()
AdminDBClient(std::string_view host, std::string_view port, std::string_view user, std::string_view password, std::string_view database, size_t last_processed_host_update_id=0)
Create the client and connect to the AdminDB server with the given connection settings.
Collection of common variables used throughout the library.
Compatibility macros and utilities for libpqxx < 7.
AdminDBClientPrivate & operator=(AdminDBClientPrivate &&)=delete
std::unique_ptr< pqxx::connection > psql_connection
AdminDBClientPrivate & operator=(const AdminDBClientPrivate &)=delete
std::recursive_mutex psql_connection_mtx
size_t last_returned_update_id
void notification_receiver_worker()
AdminDBClientPrivate(AdminDBClientPrivate &&)=delete
std::unique_ptr< std::thread > notification_receiver_thread
void handle_notifications_int(const AdminDBClient::dhcp_update_callback_t &cb)
AdminDBClientPrivate(const AdminDBClientPrivate &)=delete
bool run_notification_receiver
void start_notifcation_receiver_thread()
const std::string psql_connstring
std::unique_ptr< AdminDBClientNotificationHandler > active_notification_handler
AdminDBClientPrivate(std::string psql_connstring, size_t initial_last_processed_update_id)