Merge branch 'main' into idpe-17789/rename-internal-variables

pull/24376/head
wiedld 2023-07-28 14:25:16 -07:00 committed by GitHub
commit cfcef35680
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 886 additions and 131 deletions

16
Cargo.lock generated
View File

@ -4906,18 +4906,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
version = "1.0.176"
version = "1.0.177"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76dc28c9523c5d70816e393136b86d48909cfb27cecaa902d338c19ed47164dc"
checksum = "63ba2516aa6bf82e0b19ca8b50019d52df58455d3cf9bdaf6315225fdd0c560a"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.176"
version = "1.0.177"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4e7b8c5dc823e3b90651ff1d3808419cd14e5ad76de04feaf37da114e7a306f"
checksum = "401797fe7833d72109fedec6bfcbe67c0eed9b99772f26eb8afd261f0abc6fd3"
dependencies = [
"proc-macro2",
"quote",
@ -5830,9 +5830,9 @@ dependencies = [
[[package]]
name = "tikv-jemalloc-ctl"
version = "0.5.0"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e37706572f4b151dff7a0146e040804e9c26fe3a3118591112f05cf12a4216c1"
checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c"
dependencies = [
"libc",
"paste",
@ -5841,9 +5841,9 @@ dependencies = [
[[package]]
name = "tikv-jemalloc-sys"
version = "0.5.3+5.3.0-patched"
version = "0.5.4+5.3.0-patched"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a678df20055b43e57ef8cddde41cdfda9a3c1a060b67f4c5836dfb1d78543ba8"
checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
dependencies = [
"cc",
"libc",

View File

@ -386,7 +386,7 @@ pub struct Table {
}
/// Column definitions for a table
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableSchema {
/// the table id
pub id: TableId,

View File

@ -24,7 +24,27 @@ message FrameMessage {
}
message Ping {}
message Pong {}
// A response to a PING, containg the set of peers known to the sender.
//
// A sequence of ping/pong frames acts as a peer-exchange mechanism between
// peers.
message Pong {
// A set of peers known to the sender.
//
// Some of these peers may already be unreachable, and the receiver should not
// add them to their peer list without validating liveness.
repeated Peer peers = 1;
}
message Peer {
// A unique identifer (UUID) self-assigned by this peer as raw BE bytes.
bytes identity = 1;
// A socket (IP & port) address in the form "ip:port", as discovered by the
// PONG sender.
string address = 2;
}
// An application payload from the caller of the gossip library.
message UserPayload {

View File

@ -26,9 +26,11 @@ impl Dispatcher for tokio::sync::mpsc::Sender<Bytes> {
}
}
// A no-op dispatcher.
#[cfg(test)]
/// A no-op [`Dispatcher`].
#[derive(Debug, Default, Clone, Copy)]
pub struct NopDispatcher;
#[async_trait::async_trait]
impl Dispatcher for () {
impl Dispatcher for NopDispatcher {
async fn dispatch(&self, _payload: crate::Bytes) {}
}

View File

@ -1,13 +1,22 @@
//! A work-in-progress, simple gossip primitive for metadata distribution
//! A simple gossip & broadcast primitive for best-effort metadata distribution
//! between IOx nodes.
//!
//! # Peers
//!
//! Peers are uniquely identified by their self-reported "identity" UUID. A
//! unique UUID is generated for each gossip instance, ensuring the identity
//! changes across restarts of the underlying node.
//!
//! An identity is associated with an immutable socket address used for peer
//! communication.
//!
//! # Transport
//!
//! Prefer small payloads where possible, and expect loss of some messages -
//! this primitive provides *best effort* delivery.
//!
//! This implementation sends unicast UDP frames between peers, with support for
//! both control frames & user payloads. The maximum message size is 65,507
//! both control frames & user payloads. The maximum UDP message size is 65,507
//! bytes ([`MAX_USER_PAYLOAD_BYTES`] for application-level payloads), but a
//! packet this large is fragmented into smaller (at most MTU-sized) packets and
//! is at greater risk of being dropped due to a lost fragment.
@ -20,6 +29,80 @@
//!
//! The security model of this implementation expects the peers to be running in
//! a trusted environment, secure from malicious users.
//!
//! # Peer Exchange
//!
//! When a gossip instance is initialised, it advertises itself to the set of
//! user-provided "seed" peers - other gossip instances with fixed, known
//! addresses. The peer then bootstraps the peer list from these seed peers.
//!
//! Peers are discovered through PONG messages from peers, which contain the
//! list of peers the sender has successfully communicated with.
//!
//! On receipt of a PONG frame, a node will send PING frames to all newly
//! discovered peers without adding the peer to its local peer list. Once the
//! discovered peer responds with a PONG, the peer is added to the peer list.
//! This acts as a liveness check, ensuring a node only adds peers it can
//! communicate with to its peer list.
//!
//! ```text
//! ┌──────────┐
//! │ Seed │
//! └──────────┘
//! ▲ │
//! │ │
//! (1) │ │ (2)
//! PING │ │ PONG
//! │ │ (contains Peer A)
//! │ ▼
//! ┌──────────┐
//! │ Local │
//! └──────────┘
//! ▲ │
//! │ │
//! (4) │ │ (3)
//! PONG │ │ PING
//! │ │
//! │ ▼
//! ┌──────────┐
//! │ Peer A │
//! └──────────┘
//! ```
//!
//! The above illustrates this process when the "local" node joins:
//!
//! 1. Send PING messages to all configured seeds
//! 2. Receive a PONG response containing the list of all known peers
//! 3. Send PING frames to all discovered peers - do not add to peer list
//! 4. Receive PONG frames from discovered peers - add to peer list
//!
//! The peer addresses sent during PEX rounds contain the advertised peer
//! identity and the socket address the PONG sender discovered.
//!
//! # Dead Peer Removal
//!
//! All peers are periodically sent a PING frame, and a per-peer counter is
//! incremented. If a message of any sort is received (including the PONG
//! response to the soliciting PING), the peer's counter is reset to 0.
//!
//! Once a peer's counter reaches [`MAX_PING_UNACKED`], indicating a number of
//! PINGs have been sent without receiving any response, the peer is removed
//! from the node's peer list.
//!
//! Dead peers age out of the cluster once all nodes perform the above routine.
//! If a peer dies, it is still sent in PONG messages as part of PEX until it is
//! removed from the sender's peer list, but the receiver of the PONG will not
//! add it to the node's peer list unless it successfully commutates, ensuring
//! dead peers are not propagated.
//!
//! Ageing out dead peers is strictly an optimisation (and not for correctness).
//! A dead peer consumes a tiny amount of RAM, but also will have frames
//! dispatched to it - over time, as the number of dead peers accumulates, this
//! would cause the number of UDP frames sent per broadcast to increase,
//! needlessly increasing gossip traffic.
//!
//! This process is heavily biased towards reliability/deliverability and is too
//! slow for use as a general peer health check.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
@ -34,6 +117,7 @@
unused_crate_dependencies,
missing_docs
)]
#![allow(clippy::default_constructed_unit_structs)]
mod builder;
mod dispatcher;
@ -60,7 +144,13 @@ pub use handle::*;
const RESOLVE_TIMEOUT: Duration = Duration::from_secs(5);
/// Defines the interval between PING frames sent to all configured seed peers.
const SEED_PING_INTERVAL: std::time::Duration = Duration::from_secs(15);
const SEED_PING_INTERVAL: Duration = Duration::from_secs(60);
/// How often a PING frame should be sent to a known peer.
///
/// This value and [`MAX_PING_UNACKED`] defines the approximate duration of time
/// until a dead peer is removed from the peer list.
const PEER_PING_INTERVAL: Duration = Duration::from_secs(30);
/// The maximum payload size allowed.
///
@ -78,6 +168,18 @@ const USER_PAYLOAD_OVERHEAD: usize = 22;
/// dropped. Smaller is always better for UDP transports!
pub const MAX_USER_PAYLOAD_BYTES: usize = MAX_FRAME_BYTES - USER_PAYLOAD_OVERHEAD;
/// The number of PING messages sent to a peer without a response (of any kind)
/// before the peer is considered dead and removed from the peer list.
///
/// Increasing this value does not affect correctness - messages will be sent to
/// this peer for longer before being marked as dead, and the (small amount of)
/// RAM used by this peer will be held for longer.
///
/// This value should be large so that the occasional dropped frame does not
/// cause a peer to be spuriously marked as "dead" - doing so would cause it to
/// miss broadcast frames until it is re-discovered.
const MAX_PING_UNACKED: usize = 10;
#[cfg(test)]
#[allow(clippy::assertions_on_constants)]
mod tests {

View File

@ -5,12 +5,12 @@ use hashbrown::{hash_map::RawEntryMut, HashMap};
use metric::U64Counter;
use prost::bytes::Bytes;
use tokio::net::UdpSocket;
use tracing::{trace, warn};
use tracing::{info, trace, warn};
use uuid::Uuid;
use crate::{
metric::{SentBytes, SentFrames},
MAX_FRAME_BYTES,
MAX_FRAME_BYTES, MAX_PING_UNACKED,
};
/// A unique generated identity containing 128 bits of randomness (V4 UUID).
@ -53,6 +53,15 @@ impl TryFrom<Bytes> for Identity {
}
}
impl TryFrom<Vec<u8>> for Identity {
type Error = uuid::Error;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
let uuid = Uuid::from_slice(&value)?;
Ok(Self(Bytes::from(value), uuid))
}
}
impl Identity {
/// Generate a new random identity.
pub(crate) fn new() -> Self {
@ -70,6 +79,12 @@ impl Identity {
pub(crate) struct Peer {
identity: Identity,
addr: SocketAddr,
/// The number of PING messages sent to this peer since the last message
/// observed from it.
///
/// This value is reset to 0 each time a message from this peer is received.
unacked_ping_count: usize,
}
impl Peer {
@ -100,7 +115,7 @@ impl Peer {
Ok(n_bytes) => {
frames_sent.inc(1);
bytes_sent.inc(*n_bytes);
trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "send frame")
trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "socket write")
}
Err(e) => {
warn!(error=%e, identity=%self.identity, peer_addr=%self.addr, "frame send error")
@ -108,6 +123,26 @@ impl Peer {
}
ret
}
/// Record that a PING has been sent to this peer, incrementing the unacked
/// PING count and returning the new value.
pub(crate) fn record_ping(&mut self) -> usize {
self.unacked_ping_count += 1;
self.unacked_ping_count
}
pub(crate) fn mark_observed(&mut self) {
self.unacked_ping_count = 0;
}
}
impl From<&Peer> for crate::proto::Peer {
fn from(p: &Peer) -> Self {
Self {
identity: p.identity.as_bytes().clone(), // Ref-count
address: p.addr.to_string(),
}
}
}
/// The set of currently active/known peers.
@ -115,24 +150,36 @@ impl Peer {
pub(crate) struct PeerList {
list: HashMap<Identity, Peer>,
/// The number of known, believed-to-be-healthy peers.
metric_peer_count: metric::U64Counter,
/// The number of peers discovered.
metric_peer_discovered_count: metric::U64Counter,
/// The number of peers removed due to health checks.
metric_peer_removed_count: metric::U64Counter,
// The difference between these two metrics will always match the number of
// entries in the peer list (ignoring races & lack of thread
// synchronisation).
}
impl PeerList {
/// Initialise the [`PeerList`] with capacity for `cap` number of [`Peer`]
/// instances.
pub(crate) fn with_capacity(cap: usize, metrics: &metric::Registry) -> Self {
let metric_peer_count = metrics
let metric_peer_discovered_count = metrics
.register_metric::<U64Counter>(
"gossip_known_peers",
"number of likely healthy peers known to this node",
"gossip_peers_discovered",
"number of peers discovered by this node",
)
.recorder(&[]);
let metric_peer_removed_count = metrics
.register_metric::<U64Counter>(
"gossip_peers_removed",
"number of peers removed due to health check failures",
)
.recorder(&[]);
Self {
list: HashMap::with_capacity(cap),
metric_peer_count,
metric_peer_discovered_count,
metric_peer_removed_count,
}
}
@ -141,17 +188,29 @@ impl PeerList {
self.list.keys().map(|v| **v).collect()
}
/// Returns an iterator of all known peers in the peer list.
pub(crate) fn peers(&self) -> impl Iterator<Item = &'_ Peer> {
self.list.values()
}
/// Returns true if `identity` is already in the peer list.
pub(crate) fn contains(&self, identity: &Identity) -> bool {
self.list.contains_key(identity)
}
/// Upsert a peer identified by `identity` to the peer list, associating it
/// with the provided `peer_addr`.
pub(crate) fn upsert(&mut self, identity: &Identity, peer_addr: SocketAddr) -> &mut Peer {
let p = match self.list.raw_entry_mut().from_key(identity) {
RawEntryMut::Vacant(v) => {
self.metric_peer_count.inc(1);
info!(%identity, %peer_addr, "discovered new peer");
self.metric_peer_discovered_count.inc(1);
v.insert(
identity.to_owned(),
Peer {
addr: peer_addr,
identity: identity.to_owned(),
unacked_ping_count: 0,
},
)
.1
@ -184,6 +243,36 @@ impl PeerList {
})
.await
}
/// Send PING frames to all known nodes, and remove any that have not
/// responded after [`MAX_PING_UNACKED`] attempts.
pub(crate) async fn ping_gc(
&mut self,
ping_frame: &[u8],
socket: &UdpSocket,
frames_sent: &SentFrames,
bytes_sent: &SentBytes,
) {
// First broadcast the PING frames.
//
// If a peer has exceeded the allowed maximum, it will be removed next,
// but if it responds to this PING, it will be re-added again.
self.broadcast(ping_frame, socket, frames_sent, bytes_sent)
.await;
// Increment the PING counts and remove all peers that have not
// responded to more than the allowed number of PINGs.
let dead = self.list.extract_if(|_ident, p| {
// Increment the counter and grab the incremented value.
let missed = p.record_ping();
missed > MAX_PING_UNACKED
});
for (identity, p) in dead {
warn!(%identity, addr=%p.addr, "removed unreachable peer");
self.metric_peer_removed_count.inc(1);
}
}
}
#[cfg(test)]

View File

@ -4,6 +4,7 @@ use prost::{bytes::BytesMut, Message};
use tokio::{
net::UdpSocket,
sync::mpsc::{self},
time,
};
use tracing::{debug, error, info, trace, warn};
@ -12,7 +13,7 @@ use crate::{
peers::{Identity, PeerList},
proto::{self, frame_message::Payload, FrameMessage},
seed::{seed_ping_task, Seed},
Dispatcher, Request, MAX_FRAME_BYTES,
Dispatcher, Request, MAX_FRAME_BYTES, PEER_PING_INTERVAL,
};
#[derive(Debug)]
@ -50,7 +51,7 @@ impl Drop for AbortOnDrop {
}
}
/// An event loop for gossip frames processing.
/// An event loop actor for gossip frame processing.
///
/// This actor task is responsible for driving peer discovery, managing the set
/// of known peers and exchanging gossip frames between peers.
@ -67,6 +68,8 @@ pub(crate) struct Reactor<T> {
/// A cached wire frame, used to generate outgoing messages.
cached_frame: proto::Frame,
/// A cached, immutable ping frame.
cached_ping_frame: Arc<[u8]>,
/// A re-used buffer for serialising outgoing messages into.
serialisation_buf: Vec<u8>,
@ -129,7 +132,7 @@ where
&mut serialisation_buf,
)
.unwrap();
serialisation_buf.clone()
Arc::from(serialisation_buf.clone())
};
// Initialise the various metrics with wrappers to help distinguish
@ -145,7 +148,7 @@ where
let seed_ping_task = AbortOnDrop(tokio::spawn(seed_ping_task(
Arc::clone(&seed_list),
Arc::clone(&socket),
cached_ping_frame,
Arc::clone(&cached_ping_frame),
metric_frames_sent.clone(),
metric_bytes_sent.clone(),
)));
@ -154,6 +157,7 @@ where
dispatch,
identity,
cached_frame,
cached_ping_frame,
serialisation_buf,
peer_list: PeerList::with_capacity(seed_list.len(), metrics),
seed_list,
@ -166,6 +170,10 @@ where
}
}
/// Execute the reactor event loop, handing requests from the
/// [`GossipHandle`] over `rx`.
///
/// [`GossipHandle`]: crate::GossipHandle
pub(crate) async fn run(mut self, mut rx: mpsc::Receiver<Request>) {
info!(
identity = %self.identity,
@ -173,9 +181,15 @@ where
"gossip reactor started",
);
// Start a timer to periodically perform health check PINGs and remove
// dead nodes.
let mut gc_interval = time::interval(PEER_PING_INTERVAL);
gc_interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
loop {
tokio::select! {
msg = self.read() => {
// Process a message received from a peer.
match msg {
Ok(()) => {},
Err(Error::NoPayload { peer, addr }) => {
@ -201,6 +215,7 @@ where
}
}
op = rx.recv() => {
// Process an operation from the handle.
match op {
None => {
info!("stopping gossip reactor");
@ -228,6 +243,16 @@ where
}
}
}
_ = gc_interval.tick() => {
// Perform a periodic PING & prune dead peers.
debug!("peer ping & gc sweep");
self.peer_list.ping_gc(
&self.cached_ping_frame,
&self.socket,
&self.metric_frames_sent,
&self.metric_bytes_sent
).await;
}
};
}
}
@ -260,10 +285,7 @@ where
return Ok(());
}
// Find or create the peer in the peer list.
let peer = self.peer_list.upsert(&identity, peer_addr);
let mut out_messages = Vec::with_capacity(1);
let mut out_messages = Vec::new();
for msg in frame.messages {
// Extract the payload from the frame message
let payload = msg.payload.ok_or_else(|| Error::NoPayload {
@ -272,15 +294,28 @@ where
})?;
// Handle the frame message from the peer, optionally returning a
// response frame.
// response frame to be sent back.
let response = match payload {
Payload::Ping(_) => Some(Payload::Pong(proto::Pong {})),
Payload::Pong(_) => {
debug!(%identity, %peer_addr, "pong");
Payload::Ping(_) => Some(Payload::Pong(proto::Pong {
peers: self.peer_list.peers().map(proto::Peer::from).collect(),
})),
Payload::Pong(pex) => {
debug!(
%identity,
%peer_addr,
pex_nodes=pex.peers.len(),
"pong"
);
// If handling the PEX frame fails, the sender sent a bad
// frame and is not added to the peer list / marked as
// healthy.
self.handle_pex(pex).await?;
None
}
Payload::UserData(data) => {
self.dispatch.dispatch(data.payload).await;
let data = data.payload;
debug!(%identity, %peer_addr, n_bytes=data.len(), "dispatch payload");
self.dispatch.dispatch(data).await;
None
}
};
@ -290,6 +325,12 @@ where
}
}
// Find or create the peer in the peer list.
let peer = self.peer_list.upsert(&identity, peer_addr);
// Track that this peer has been observed as healthy.
peer.mark_observed();
// Sometimes no message will be returned to the peer - there's no need
// to send an empty frame.
if out_messages.is_empty() {
@ -314,6 +355,60 @@ where
Ok(())
}
/// The PONG response to a PING contains the set of peers known to the sender
/// - this is the peer exchange mechanism.
async fn handle_pex(&mut self, pex: proto::Pong) -> Result<(), Error> {
// Process the peers from the remote, ignoring the local node and known
// peers.
for p in pex.peers {
let pex_identity = match Identity::try_from(p.identity) {
// Ignore any references to this node's identity, or known
// peers.
Ok(v) if v == self.identity => continue,
Ok(v) if self.peer_list.contains(&v) => continue,
Ok(v) => v,
Err(e) => {
warn!(
error=%e,
"received invalid identity via PEX",
);
continue;
}
};
let pex_addr = match p.address.parse() {
Ok(v) => v,
Err(e) => {
warn!(
%pex_identity,
error=%e,
"received invalid peer address via PEX",
);
continue;
}
};
// Send a PING frame to this peer without adding it to the local
// peer list.
//
// The peer will be added to the local peer list if it responds to
// this solicitation (or otherwise communicates with the local
// node), ensuring only live and reachable peers are added.
//
// Immediately ping this new peer if new (a fast UDP send).
ping(
&self.cached_ping_frame,
&self.socket,
pex_addr,
&self.metric_frames_sent,
&self.metric_bytes_sent,
)
.await;
}
Ok(())
}
/// Return the randomised identity assigned to this instance.
pub(crate) fn identity(&self) -> &Identity {
&self.identity
@ -371,11 +466,11 @@ fn populate_frame(
// messages must be shorter than this value.
if frame.encoded_len() > MAX_FRAME_BYTES {
error!(
n_bytes=buf.len(),
n_bytes=frame.encoded_len(),
n_max=%MAX_FRAME_BYTES,
"attempted to send frame larger than configured maximum"
);
return Err(Error::MaxSize(buf.len()));
return Err(Error::MaxSize(frame.encoded_len()));
}
buf.clear();
@ -399,19 +494,21 @@ pub(crate) async fn ping(
sent_frames: &SentFrames,
sent_bytes: &SentBytes,
) -> usize {
// Check the payload length as a primitive/best-effort way of ensuring only
// ping frames are sent via this mechanism.
//
// Normal messaging should be performed through a Peer's send() method.
debug_assert_eq!(ping_frame.len(), 22);
match socket.send_to(ping_frame, &addr).await {
Ok(n_bytes) => {
debug!(addr = %addr, "ping");
debug!(n_bytes, %addr, "ping");
sent_frames.inc(1);
sent_bytes.inc(n_bytes);
n_bytes
}
Err(e) => {
warn!(
error=%e,
addr = %addr,
"ping failed"
);
warn!(error=%e, %addr, "ping failed");
0
}
}

View File

@ -63,10 +63,15 @@ async fn resolve(addr: &str) -> Option<SocketAddr> {
///
/// This method immediately pings all the seeds, and then pings periodically at
/// [`SEED_PING_INTERVAL`].
///
/// Seeds must be periodically pinged to ensure they're discovered when they
/// come online - if seeds were not pinged continuously, this node could become
/// isolated from all peers, mark all known peers as dead, and would never
/// rejoin.
pub(super) async fn seed_ping_task(
seeds: Arc<[Seed]>,
socket: Arc<UdpSocket>,
ping_frame: Vec<u8>,
ping_frame: Arc<[u8]>,
sent_frames: SentFrames,
sent_bytes: SentBytes,
) {

View File

@ -1,10 +1,31 @@
use std::{sync::Arc, time::Duration};
#![allow(clippy::default_constructed_unit_structs)]
use std::{
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};
use test_helpers::{maybe_start_logging, timeout::FutureTimeout};
use tokio::{net::UdpSocket, sync::mpsc};
use gossip::*;
// How long to wait for various time-limited test loops to complete.
const TIMEOUT: Duration = Duration::from_secs(5);
/// Bind a UDP socket on a random port and return it alongside the socket
/// address.
async fn random_udp() -> (UdpSocket, SocketAddr) {
// Bind a UDP socket to a random port
let socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("failed to bind UDP socket");
let addr = socket.local_addr().expect("failed to read local addr");
(socket, addr)
}
/// Assert that starting up a reactor performs the initial peer discovery
/// from a set of seeds, resulting in both peers known of one another.
#[tokio::test]
@ -13,20 +34,8 @@ async fn test_payload_exchange() {
let metrics = Arc::new(metric::Registry::default());
// How long to wait for peer discovery to complete.
const TIMEOUT: Duration = Duration::from_secs(5);
// Bind a UDP socket to a random port
let a_socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("failed to bind UDP socket");
let a_addr = a_socket.local_addr().expect("failed to read local addr");
// And a socket for the second reactor
let b_socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("failed to bind UDP socket");
let b_addr = b_socket.local_addr().expect("failed to read local addr");
let (a_socket, a_addr) = random_udp().await;
let (b_socket, b_addr) = random_udp().await;
// Initialise the dispatchers for the reactors
let (a_tx, mut a_rx) = mpsc::channel(5);
@ -81,3 +90,358 @@ async fn test_payload_exchange() {
.expect("reactor stopped");
assert_eq!(got, a_payload);
}
/// Construct a set of peers such that peer exchange has to occur for them to
/// know the full set of peers.
///
/// This configures the following topology:
///
/// ```
/// A <--> B <--> C
/// ```
///
/// Where only node B knows of both A & C - in order for A to discover C, it has
/// to perform PEX with B, and the same for C to discover A.
///
/// To further drive the discovery mechanism, B is not informed of any peer - it
/// discovers A & B through their announcement PINGs at startup.
#[tokio::test]
async fn test_peer_exchange() {
maybe_start_logging();
let metrics = Arc::new(metric::Registry::default());
let (a_socket, _a_addr) = random_udp().await;
let (b_socket, b_addr) = random_udp().await;
let (c_socket, _c_addr) = random_udp().await;
let (a_tx, mut a_rx) = mpsc::channel(5);
let a = Builder::new(vec![b_addr.to_string()], a_tx, Arc::clone(&metrics)).build(a_socket);
let b = Builder::new(vec![], NopDispatcher::default(), Arc::clone(&metrics)).build(b_socket);
let c = Builder::new(
vec![b_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(c_socket);
// At this point, A is configured with B as a seed, B knows of no other
// peers, and C knows of B.
// Wait for peer exchange to occur
async {
loop {
if a.get_peers().await.len() == 2 {
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
let a_peers = a.get_peers().await;
assert!(a_peers.contains(&b.identity()));
assert!(a_peers.contains(&c.identity()));
assert!(!a_peers.contains(&a.identity()));
let b_peers = b.get_peers().await;
assert!(b_peers.contains(&a.identity()));
assert!(b_peers.contains(&c.identity()));
assert!(!b_peers.contains(&b.identity()));
let c_peers = c.get_peers().await;
assert!(c_peers.contains(&a.identity()));
assert!(c_peers.contains(&b.identity()));
assert!(!c_peers.contains(&c.identity()));
// Prove that C has discovered A, which would only be possible through PEX
// with B.
let payload = Bytes::from_static(b"bananas");
c.broadcast(payload.clone()).await.unwrap();
let got = a_rx
.recv()
.with_timeout_panic(TIMEOUT)
.await
.expect("reactor stopped");
assert_eq!(got, payload);
}
/// Construct a set of peers such that a delayed peer exchange has to occur for
/// them to know the full set of peers.
///
/// This configures the following topology:
///
/// ```
/// A <--> B
/// ```
///
/// And once PEX has completed between them, a new node C is introduced with B
/// as a seed:
///
/// ```
/// A <--> B <-- C
/// ```
///
/// At which point A will discover C when C sends PING messages to all peers it
/// discovers from B.
#[tokio::test]
async fn test_delayed_peer_exchange() {
maybe_start_logging();
let metrics = Arc::new(metric::Registry::default());
let (a_socket, a_addr) = random_udp().await;
let (b_socket, b_addr) = random_udp().await;
let (c_socket, _c_addr) = random_udp().await;
let a = Builder::new(
vec![b_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(a_socket);
let b = Builder::new(
vec![a_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(b_socket);
// At this point, A is configured with B as a seed, B knows of no other
// peers, and C knows of B.
// Wait for peer exchange to occur between A and B
async {
loop {
if a.get_peers().await.len() == 1 {
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
let a_peers = a.get_peers().await;
assert!(a_peers.contains(&b.identity()));
let b_peers = b.get_peers().await;
assert!(b_peers.contains(&a.identity()));
// Start C now the initial PEX is complete, seeding it with the address of
// B.
let c = Builder::new(
vec![b_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(c_socket);
// C will perform PEX with B, learning of A.
async {
loop {
if c.get_peers().await.len() == 2 {
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
let c_peers = c.get_peers().await;
assert!(c_peers.contains(&a.identity()));
assert!(c_peers.contains(&b.identity()));
// And eventually A should discover C through B.
async {
loop {
if a.get_peers().await.len() == 2 {
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
let a_peers = a.get_peers().await;
assert!(a_peers.contains(&b.identity()));
assert!(a_peers.contains(&c.identity()));
}
/// Initialise a pair of nodes A & B with an unreachable third node C listed as
/// a seed.
///
/// Ensure this seed C is not added to the local peer list.
#[tokio::test]
async fn test_seed_health_check() {
maybe_start_logging();
let metrics = Arc::new(metric::Registry::default());
let (a_socket, a_addr) = random_udp().await;
let (b_socket, b_addr) = random_udp().await;
let (_c_socket, c_addr) = random_udp().await;
let seeds = vec![a_addr.to_string(), b_addr.to_string(), c_addr.to_string()];
let a = Builder::new(
seeds.to_vec(),
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(a_socket);
let b = Builder::new(seeds, NopDispatcher::default(), Arc::clone(&metrics)).build(b_socket);
// Wait for peer exchange to occur between A and B
async {
loop {
if !a.get_peers().await.is_empty() {
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
// Assert that only the live peers were added.
let a_peers = a.get_peers().await;
assert_eq!(a_peers.len(), 1);
assert!(a_peers.contains(&b.identity()));
let b_peers = b.get_peers().await;
assert_eq!(b_peers.len(), 1);
assert!(b_peers.contains(&a.identity()));
}
/// Initialise a pair of nodes A & B, kill B after PEX, and then introduce a new
/// node C.
///
/// This test ensures that dead peers are not added to the joiner's local peer
/// list, causing them to age out of the cluster and not be propagated forever.
#[tokio::test]
async fn test_discovery_health_check() {
maybe_start_logging();
let metrics = Arc::new(metric::Registry::default());
let (a_socket, a_addr) = random_udp().await;
let (b_socket, b_addr) = random_udp().await;
let a = Builder::new(
vec![b_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(a_socket);
let b = Builder::new(
vec![a_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(b_socket);
// Wait for peer exchange to occur between A and B
async {
loop {
if a.get_peers().await.len() == 1 {
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
// Stop B.
let b_identity = b.identity();
drop(b);
// Introduce C to the cluster, seeding it with A
let (c_socket, _c_addr) = random_udp().await;
let c = Builder::new(
vec![a_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(c_socket);
// Wait for peer exchange to occur between A and B
async {
loop {
if !c.get_peers().await.is_empty() {
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
// Assert that only the live peer A was added.
let c_peers = c.get_peers().await;
assert_eq!(c_peers.len(), 1);
assert!(c_peers.contains(&a.identity()));
assert!(!c_peers.contains(&b_identity));
}
/// Drive peer removal / age-out.
///
/// Start two nodes, wait for PEX to complete, kill one node, and wait for it to
/// eventually remove the dead peer. Because this is a periodic/time-based
/// action, this test uses the tokio's "auto-advance time" test functionality.
#[tokio::test]
async fn test_peer_removal() {
maybe_start_logging();
let metrics = Arc::new(metric::Registry::default());
let (a_socket, a_addr) = random_udp().await;
let (b_socket, b_addr) = random_udp().await;
let a = Builder::new(
vec![b_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(a_socket);
let b = Builder::new(
vec![a_addr.to_string()],
NopDispatcher::default(),
Arc::clone(&metrics),
)
.build(b_socket);
// Wait for peer exchange to occur between A and B
async {
loop {
let peers = a.get_peers().await;
if peers.len() == 1 {
assert!(peers.contains(&b.identity()));
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
// Stop B.
drop(b);
// Make time tick as fast as necessary to advance to the next timer event.
tokio::time::pause();
for _ in 0..50 {
tokio::time::advance(Duration::from_secs(35)).await;
}
// Use the stdlib time to avoid tokio's paused time and bound the loop time.
let started_at = Instant::now();
loop {
let peers = a.get_peers().await;
if peers.is_empty() {
break;
}
if Instant::now().duration_since(started_at) > TIMEOUT {
panic!("timeout waiting for peer removal");
}
}
}

View File

@ -67,19 +67,19 @@ libc = { version = "0.2" }
num_cpus = "1.16.0"
once_cell = { version = "1.18", features = ["parking_lot"] }
rustyline = { version = "12.0", default-features = false, features = ["with-file-history"]}
serde = "1.0.176"
serde = "1.0.177"
serde_json = "1.0.104"
snafu = "0.7"
tempfile = "3.7.0"
thiserror = "1.0.44"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tikv-jemalloc-ctl = { version = "0.5.4", optional = true }
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time", "io-std"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.8", features = ["compat"] }
tonic = { workspace = true }
uuid = { version = "1", features = ["v4"] }
# jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive
tikv-jemalloc-sys = { version = "0.5.3", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-sys = { version = "0.5.4", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
parking_lot = "0.12.1"

View File

@ -11,10 +11,10 @@ pub mod metrics;
mod read_through_cache;
pub use read_through_cache::*;
use std::{error::Error, fmt::Debug, sync::Arc};
use std::{collections::BTreeMap, error::Error, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use data_types::{ColumnsByName, NamespaceName, NamespaceSchema, TableSchema};
/// An abstract cache of [`NamespaceSchema`].
#[async_trait]
@ -50,11 +50,15 @@ pub trait NamespaceCache: Debug + Send + Sync {
/// associated [`NamespaceCache::put_schema()`] call.
#[derive(Debug, PartialEq, Eq)]
pub struct ChangeStats {
/// The number of tables added to the cache.
pub(crate) new_tables: usize,
/// The new tables added to the cache, keyed by table name.
pub(crate) new_tables: BTreeMap<String, TableSchema>,
/// The number of columns added to the cache (across all tables).
pub(crate) new_columns: usize,
/// The new columns added to cache for all pre-existing tables, keyed by
/// the table name.
pub(crate) new_columns_per_table: BTreeMap<String, ColumnsByName>,
/// The number of new columns added across new and existing tables.
pub(crate) num_new_columns: usize,
/// Indicates whether the change took place when an entry already
/// existed.

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use data_types::{ColumnsByName, NamespaceName, NamespaceSchema};
use hashbrown::HashMap;
use parking_lot::RwLock;
use thiserror::Error;
@ -56,12 +56,11 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
Some(old) => merge_schema_additive(schema, old),
None => {
let change_stats = ChangeStats {
new_columns: schema
.tables
.values()
.map(|v| v.column_count())
.sum::<usize>(),
new_tables: schema.tables.len(),
new_tables: schema.tables.clone(),
// There are no pre-existing tables for columns to be added
// to, so don't need to build another map.
new_columns_per_table: Default::default(),
num_new_columns: schema.tables.values().map(|v| v.column_count()).sum(),
did_update: false,
};
(schema, change_stats)
@ -86,11 +85,14 @@ fn merge_schema_additive(
// invariant: Namespace partition template override should never change for a given name
assert_eq!(old_ns.partition_template, new_ns.partition_template);
let old_table_count = old_ns.tables.len();
let mut old_column_count = 0;
let mut new_columns_per_table: BTreeMap<String, ColumnsByName> = Default::default();
let mut num_new_columns = 0;
// Table schema missing from the new schema are added from the old. If the
// table exists in both the new and the old namespace schema then any column
// schema missing from the new table schema are added from the old.
// schema missing from the new table schema are added from the old, while
// columns added that are not in the old schema get placed in the
// `new_columns` set to be included in the returned [`ChangeStats`].
//
// This code performs get_mut() & insert() operations to populate `new_ns`,
// instead of using the BTreeMap's entry() API. This allows this loop to
@ -102,14 +104,32 @@ fn merge_schema_additive(
// to 0 as the schemas become fully populated, leaving the common path free
// of overhead.
for (old_table_name, old_table) in &old_ns.tables {
old_column_count += old_table.column_count();
match new_ns.tables.get_mut(old_table_name) {
Some(new_table) => {
for (column_name, column) in old_table.columns.iter() {
if !new_table.contains_column_name(column_name) {
new_table.add_column_schema(column_name.to_string(), *column);
// Insert old columns missing from the new table schema
for (old_column_name, old_column) in old_table.columns.iter() {
if !new_table.contains_column_name(old_column_name) {
new_table.add_column_schema(old_column_name.clone(), *old_column);
}
}
// Then take note of any columns added to the new table schema
// that are not present in the previous
let new_columns_in_table = new_table
.columns
.iter()
.filter_map(|(new_column_name, new_column_schema)| {
if old_table.contains_column_name(new_column_name) {
None
} else {
Some((new_column_name.clone(), *new_column_schema))
}
})
.collect::<BTreeMap<_, _>>();
if !new_columns_in_table.is_empty() {
num_new_columns += new_columns_in_table.len();
new_columns_per_table
.insert(old_table_name.clone(), new_columns_in_table.into());
}
}
None => {
new_ns
@ -119,17 +139,28 @@ fn merge_schema_additive(
}
}
// Work out the set of new tables added to the namespace schema and capture
// their schema in the [`ChangeStats`].
let new_tables = new_ns
.tables
.iter()
.filter_map(|(new_table_name, new_table_schema)| {
if old_ns.tables.contains_key(new_table_name) {
None
} else {
num_new_columns += new_table_schema.column_count();
Some((new_table_name.clone(), new_table_schema.clone()))
}
})
.collect();
// To compute the change stats for the merge it is still necessary to iterate
// over the tables present in the new schema. The new schema may have
// introduced additional tables that won't be visited by the merge logic's logic.
let change_stats = ChangeStats {
new_tables: new_ns.tables.len() - old_table_count,
new_columns: new_ns
.tables
.values()
.map(|v| v.column_count())
.sum::<usize>()
- old_column_count,
new_tables,
new_columns_per_table,
num_new_columns,
did_update: true,
};
(new_ns, change_stats)
@ -172,7 +203,7 @@ mod tests {
};
assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), (new, s) => {
assert_eq!(*new, schema1);
assert_eq!(s.new_tables, 0);
assert!(s.new_tables.is_empty());
});
assert_eq!(
*cache.get_schema(&ns).await.expect("lookup failure"),
@ -190,7 +221,7 @@ mod tests {
assert_matches!(cache.put_schema(ns.clone(), schema2.clone()), (new, s) => {
assert_eq!(*new, schema2);
assert_eq!(s.new_tables, 0);
assert!(s.new_tables.is_empty());
});
assert_eq!(
*cache.get_schema(&ns).await.expect("lookup failure"),
@ -239,21 +270,21 @@ mod tests {
let schema_update_1 = NamespaceSchema {
id: NamespaceId::new(42),
tables: BTreeMap::from([(String::from(table_name), first_write_table_schema)]),
tables: BTreeMap::from([(String::from(table_name), first_write_table_schema.clone())]),
max_columns_per_table: 50,
max_tables: 24,
retention_period_ns: None,
partition_template: Default::default(),
};
let schema_update_2 = NamespaceSchema {
tables: BTreeMap::from([(String::from(table_name), second_write_table_schema)]),
tables: BTreeMap::from([(String::from(table_name), second_write_table_schema.clone())]),
..schema_update_1.clone()
};
let want_namespace_schema = {
let mut want_table_schema = empty_table_schema(table_id);
want_table_schema.add_column(column_1);
want_table_schema.add_column(column_2);
want_table_schema.add_column(column_1.clone());
want_table_schema.add_column(column_2.clone());
NamespaceSchema {
tables: BTreeMap::from([(String::from(table_name), want_table_schema)]),
..schema_update_1.clone()
@ -275,13 +306,21 @@ mod tests {
assert_eq!(*new_schema, schema_update_1);
assert_eq!(
new_stats,
ChangeStats { new_tables: 1, new_columns: 1, did_update: false }
ChangeStats { new_tables: schema_update_1.tables.clone(), new_columns_per_table: Default::default(), num_new_columns: schema_update_1.tables.values().map(|v| v.column_count()).sum(), did_update: false }
);
}
);
assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => {
assert_eq!(*new_schema, want_namespace_schema);
assert_eq!(new_stats, ChangeStats{ new_tables: 0, new_columns: 1, did_update: true});
let want_new_columns = [(
String::from(table_name),
[(
column_2.name.clone(),
*second_write_table_schema.columns.get(column_2.name.as_str()).expect("should have column 2")
)].into_iter().collect::<BTreeMap<_,_>>().into(),
)].into_iter().collect::<BTreeMap<_,_>>();
assert_eq!(new_stats, ChangeStats{ new_tables: Default::default(), new_columns_per_table: want_new_columns.clone(), num_new_columns: want_new_columns.values().map(|v| v.column_count()).sum(), did_update: true});
});
let got_namespace_schema = cache
@ -304,26 +343,29 @@ mod tests {
// Each table has been given a column to assert the table merge logic
// produces the correct metrics.
let mut table_1 = empty_table_schema(TableId::new(1));
table_1.add_column(Column {
let column_1 = Column {
id: ColumnId::new(1),
table_id: TableId::new(1),
name: "column_a".to_string(),
column_type: ColumnType::String,
});
};
table_1.add_column(column_1);
let mut table_2 = empty_table_schema(TableId::new(2));
table_2.add_column(Column {
let column_2 = Column {
id: ColumnId::new(2),
table_id: TableId::new(2),
name: "column_b".to_string(),
column_type: ColumnType::String,
});
};
table_2.add_column(column_2);
let mut table_3 = empty_table_schema(TableId::new(3));
table_3.add_column(Column {
let column_3 = Column {
id: ColumnId::new(3),
table_id: TableId::new(3),
name: "column_c".to_string(),
column_type: ColumnType::String,
});
};
table_3.add_column(column_3);
let schema_update_1 = NamespaceSchema {
id: NamespaceId::new(42),
@ -346,9 +388,9 @@ mod tests {
let want_namespace_schema = NamespaceSchema {
tables: BTreeMap::from([
(String::from("table_1"), table_1),
(String::from("table_2"), table_2),
(String::from("table_3"), table_3),
(String::from("table_1"), table_1.clone()),
(String::from("table_2"), table_2.clone()),
(String::from("table_3"), table_3.clone()),
]),
..schema_update_1.clone()
};
@ -368,13 +410,24 @@ mod tests {
assert_eq!(*new_schema, schema_update_1);
assert_eq!(
new_stats,
ChangeStats { new_tables: 2, new_columns: 2, did_update: false }
ChangeStats {
new_tables: schema_update_1.tables.clone(),
new_columns_per_table: Default::default(),
num_new_columns: schema_update_1.tables.values().map(|v| v.column_count()).sum(),
did_update: false,
}
);
}
);
assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => {
assert_eq!(*new_schema, want_namespace_schema);
assert_eq!(new_stats, ChangeStats{ new_tables: 1, new_columns: 1, did_update: true});
let want_new_tables = [(String::from("table_3"), table_3)].into_iter().collect::<BTreeMap<_, _>>();
assert_eq!(new_stats, ChangeStats{
new_tables: want_new_tables.clone(),
new_columns_per_table: Default::default(),
num_new_columns: want_new_tables.values().map(|v| v.column_count()).sum(),
did_update: true,
});
});
let got_namespace_schema = cache
@ -443,10 +496,10 @@ mod tests {
}
}
/// Reduce `ns` into a set of `(table_name, column_name)` for all tables &
/// Reduce `ns_tables` into a set of `(table_name, column_name)` for all tables &
/// columns.
fn into_set(ns: &NamespaceSchema) -> HashSet<(String, String)> {
ns.tables
fn into_set(ns_tables: &BTreeMap<String, TableSchema>) -> HashSet<(String, String)> {
ns_tables
.iter()
.flat_map(|(table_name, col_set)| {
// Build a set of tuples in the form (table_name, column_name)
@ -459,6 +512,28 @@ mod tests {
.collect()
}
/// Construct a set of `(table_name, column_name)` from a set of table schema and
/// table-associated column schema.
fn into_set_with_columns(
new_tables: &BTreeMap<String, TableSchema>,
new_columns: &BTreeMap<String, ColumnsByName>,
) -> HashSet<(String, String)> {
let new_table_set = into_set(new_tables);
let new_column_set = new_columns
.iter()
.flat_map(|(table_name, col_set)| {
col_set
.names()
.into_iter()
.map(|col_name| (table_name.to_string(), col_name.to_string()))
})
.collect();
new_table_set
.union(&new_column_set)
.map(|v| v.to_owned())
.collect::<HashSet<_>>()
}
proptest! {
#[test]
fn prop_schema_merge(
@ -466,8 +541,8 @@ mod tests {
b in arbitrary_namespace_schema()
) {
// Convert inputs into sets
let known_a = into_set(&a);
let known_b = into_set(&b);
let known_a = into_set(&a.tables);
let known_b = into_set(&b.tables);
// Compute the union set of the input schema sets.
//
@ -479,12 +554,18 @@ mod tests {
let cache = Arc::new(MemoryNamespaceCache::default());
let (got, stats_1) = cache.put_schema(name.clone(), a.clone());
assert_eq!(*got, a); // The new namespace should be unchanged
assert_eq!(stats_1.new_tables, a.tables);
// Drive the merging logic
let (got, stats_2) = cache.put_schema(name, b.clone());
// Check the change stats return the difference
let want_change_stat_set = known_b.difference(&known_a).map(|v| v.to_owned()).collect::<HashSet<_>>();
let got_change_stat_set = into_set_with_columns(&stats_2.new_tables, &stats_2.new_columns_per_table);
assert_eq!(got_change_stat_set, want_change_stat_set);
// Reduce the merged schema into a comparable set.
let got_set = into_set(&got);
let got_set = into_set(&got.tables);
// Assert the table/column sets merged by the known good hashset
// union implementation, and the cache merging logic are the same.
@ -495,13 +576,6 @@ mod tests {
assert_eq!(got.max_columns_per_table, b.max_columns_per_table);
assert_eq!(got.max_tables, b.max_tables);
assert_eq!(got.retention_period_ns, b.retention_period_ns);
// Finally, assert the reported "newly added" statistics sum to the
// total of the inputs.
assert_eq!(stats_1.new_columns + stats_2.new_columns, want.len());
let tables = a.tables.keys().chain(b.tables.keys()).collect::<HashSet<_>>();
assert_eq!(stats_1.new_tables + stats_2.new_tables, tables.len());
}
}
}

View File

@ -113,11 +113,9 @@ where
};
}
// Figure out the difference between the new namespace and the
// evicted old namespace
// Adjust the metrics to reflect the change
self.table_count.inc(change_stats.new_tables as u64);
self.column_count.inc(change_stats.new_columns as u64);
// Adjust the metrics to reflect the change in table and column counts
self.table_count.inc(change_stats.new_tables.len() as u64);
self.column_count.inc(change_stats.num_new_columns as u64);
(result, change_stats)
}