@@ -1138,9 +1138,15 @@ Status TabletServer::SetTserverCatalogMessageList(
1138
1138
1139
1139
int shm_index = -1 ;
1140
1140
InvalidationMessagesQueue *db_message_lists = nullptr ;
1141
- auto scope_exit = ScopeExit ([this , &shm_index, &db_message_lists, db_oid, new_catalog_version] {
1141
+ auto scope_exit = ScopeExit ([this , &shm_index, &db_message_lists, db_oid, is_breaking_change,
1142
+ new_catalog_version] {
1142
1143
if (shm_index >= 0 ) {
1143
1144
shared_object ()->SetYsqlDbCatalogVersion (static_cast <size_t >(shm_index), new_catalog_version);
1145
+ if (FLAGS_log_ysql_catalog_versions) {
1146
+ LOG_WITH_FUNC (INFO) << " set db " << db_oid
1147
+ << " catalog version: " << new_catalog_version
1148
+ << " is_breaking_change: " << is_breaking_change;
1149
+ }
1144
1150
InvalidatePgTableCache ({{db_oid, new_catalog_version}} /* db_oids_updated */ ,
1145
1151
{} /* db_oids_deleted */ );
1146
1152
}
@@ -1461,6 +1467,9 @@ void TabletServer::SetYsqlDBCatalogVersionsUnlocked(
1461
1467
// debugging the shared memory array db_catalog_versions_ (e.g., when we can dump
1462
1468
// the shared memory file to examine its contents).
1463
1469
shared_object ()->SetYsqlDbCatalogVersion (static_cast <size_t >(shm_index), 0 );
1470
+ if (FLAGS_log_ysql_catalog_versions) {
1471
+ LOG_WITH_FUNC (INFO) << " reset deleted db " << db_oid << " catalog version to 0" ;
1472
+ }
1464
1473
} else {
1465
1474
++it;
1466
1475
}
@@ -1512,6 +1521,7 @@ void TabletServer::SetYsqlDBCatalogVersionsWithInvalMessages(
1512
1521
void TabletServer::SetYsqlDBCatalogInvalMessagesUnlocked (
1513
1522
const master::DBCatalogInvalMessagesDataPB& db_catalog_inval_messages_data) {
1514
1523
if (db_catalog_inval_messages_data.db_catalog_inval_messages_size () == 0 ) {
1524
+ LOG (INFO) << " empty db_catalog_inval_messages" ;
1515
1525
return ;
1516
1526
}
1517
1527
uint32_t current_db_oid = 0 ;
@@ -1604,12 +1614,14 @@ void TabletServer::MergeInvalMessagesIntoQueueUnlocked(
1604
1614
// We do need to perform a merge, because both the queue and the incoming messages are sorted by
1605
1615
// version, it is similar to a merge sort strategy.
1606
1616
DoMergeInvalMessagesIntoQueueUnlocked (
1607
- db_catalog_inval_messages_data, start_index, end_index, &it->second .queue );
1617
+ db_oid, db_catalog_inval_messages_data, start_index, end_index, &it->second .queue );
1608
1618
}
1609
1619
1610
1620
void TabletServer::DoMergeInvalMessagesIntoQueueUnlocked (
1621
+ uint32_t db_oid,
1611
1622
const master::DBCatalogInvalMessagesDataPB& db_catalog_inval_messages_data,
1612
1623
int start_index, int end_index, InvalidationMessagesQueue *db_message_lists) {
1624
+ bool changed = false ;
1613
1625
auto it = db_message_lists->begin ();
1614
1626
// Scan through each incoming pair, and insert it into the queue in the right position if
1615
1627
// it does not already exist in the queue.
@@ -1637,16 +1649,22 @@ void TabletServer::DoMergeInvalMessagesIntoQueueUnlocked(
1637
1649
++it;
1638
1650
++start_index;
1639
1651
} else if (incoming_version < existing_version) {
1640
- // The incoming version is lower, insert it before it.
1641
- VLOG (2 ) << " inserting version " << incoming_version;
1652
+ std::string msg_info =
1653
+ incoming_message_list.has_value () ? std::to_string (incoming_message_list.value ().size ())
1654
+ : " nullopt" ;
1655
+ // The incoming version is lower, insert before the iterator.
1656
+ LOG (INFO) << " inserting version " << incoming_version << " , incoming_message_list: "
1657
+ << msg_info << " before existing version " << existing_version
1658
+ << " , db " << db_oid;
1642
1659
it = db_message_lists->insert (it, std::make_pair (incoming_version, incoming_message_list));
1660
+ changed = true ;
1643
1661
// After insertion, it points to the newly inserted incoming version, advance it to the
1644
1662
// original existing version.
1645
1663
++it;
1646
1664
++start_index;
1647
1665
DCHECK_EQ (it->first , existing_version);
1648
1666
} else {
1649
- // The incoming version is higher, move it to the next existing slot in the queue.
1667
+ // The incoming version is higher, move iterator to the next existing slot in the queue.
1650
1668
// Keep start_index unchanged so that it can be compared with the next slot in the queue.
1651
1669
VLOG (2 ) << " existing version: " << existing_version
1652
1670
<< " , higher incoming version: " << incoming_version;
@@ -1660,13 +1678,19 @@ void TabletServer::DoMergeInvalMessagesIntoQueueUnlocked(
1660
1678
const uint64_t current_version = db_inval_messages.current_version ();
1661
1679
const std::optional<std::string>& message_list = db_inval_messages.has_message_list () ?
1662
1680
std::optional<std::string>(db_inval_messages.message_list ()) : std::nullopt;
1663
- VLOG (2 ) << " appending version " << current_version;
1681
+ std::string msg_info = message_list.has_value () ? std::to_string (message_list.value ().size ())
1682
+ : " nullopt" ;
1683
+ LOG (INFO) << " appending version " << current_version << " , message_list: " << msg_info
1684
+ << " , db " << db_oid;
1664
1685
db_message_lists->emplace_back (std::make_pair (current_version, message_list));
1686
+ changed = true ;
1665
1687
}
1666
- VLOG (2 ) << " queue size: " << db_message_lists->size ();
1667
- // We may have added more messages to the queue that exceeded the max size.
1668
- while (db_message_lists->size () > FLAGS_ysql_max_invalidation_message_queue_size) {
1669
- db_message_lists->pop_front ();
1688
+ if (changed) {
1689
+ LOG (INFO) << " queue size: " << db_message_lists->size ();
1690
+ // We may have added more messages to the queue that exceeded the max size.
1691
+ while (db_message_lists->size () > FLAGS_ysql_max_invalidation_message_queue_size) {
1692
+ db_message_lists->pop_front ();
1693
+ }
1670
1694
}
1671
1695
}
1672
1696
0 commit comments