diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 085165e9e02..8f2e50ce6d3 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -47,7 +47,7 @@ use lightning::ln::functional_test_utils::*; use lightning::ln::inbound_payment::ExpandedKey; use lightning::ln::outbound_payment::{RecipientOnionFields, Retry}; use lightning::ln::peer_handler::{ - IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor, + IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor, CHUNK_SIZE, }; use lightning::ln::script::ShutdownScript; use lightning::ln::types::ChannelId; @@ -201,6 +201,15 @@ struct Peer<'a> { } impl<'a> SocketDescriptor for Peer<'a> { fn send_data(&mut self, data: &[u8], _continue_read: bool) -> usize { + // After the Noise handshake, all writes must be exactly CHUNK_SIZE bytes + // (or 0 for empty force-writes). The only non-chunk write is 50 bytes for + // the handshake act_two sent by an inbound peer before encryption is ready. + assert!( + data.len() == 0 || data.len() == 50 || data.len() == CHUNK_SIZE, + "Unexpected send_data size: {} (expected 0, 50, or {})", + data.len(), + CHUNK_SIZE, + ); data.len() } fn disconnect_socket(&mut self) { diff --git a/fuzz/src/peer_crypt.rs b/fuzz/src/peer_crypt.rs index b01aa02400b..15d59c598bd 100644 --- a/fuzz/src/peer_crypt.rs +++ b/fuzz/src/peer_crypt.rs @@ -79,9 +79,11 @@ pub fn do_test(data: &[u8]) { let mut buf = [0; 65536 + 16]; loop { if get_slice!(1)[0] == 0 { - crypter.encrypt_buffer(MessageBuf::from_encoded(&get_slice!(slice_to_be16( - get_slice!(2) - )))); + let mut dest = Vec::new(); + crypter.encrypt_buffer_into( + &mut dest, + MessageBuf::from_encoded(&get_slice!(slice_to_be16(get_slice!(2)))), + ); } else { let len = match crypter.decrypt_length_header(get_slice!(16 + 2)) { Ok(len) => len, diff --git a/lightning/src/ln/peer_channel_encryptor.rs b/lightning/src/ln/peer_channel_encryptor.rs index 5554c5a8c19..96d61b7f891 100644 --- a/lightning/src/ln/peer_channel_encryptor.rs +++ b/lightning/src/ln/peer_channel_encryptor.rs @@ -28,7 +28,7 @@ use bitcoin::secp256k1::{PublicKey, SecretKey}; use crate::crypto::chacha20poly1305rfc::ChaCha20Poly1305RFC; use crate::crypto::utils::hkdf_extract_expand_twice; -use crate::util::ser::VecWriter; +use crate::util::ser::Writer; /// Maximum Lightning message data length according to /// [BOLT-8](https://github.com/lightning/bolts/blob/v1.0/08-transport.md#lightning-message-specification) @@ -501,16 +501,16 @@ impl PeerChannelEncryptor { Ok(self.their_node_id.unwrap().clone()) } - /// Builds sendable bytes for a message. + /// Builds sendable bytes for a message at the given offset within a buffer. /// - /// `msgbuf` must begin with 16 + 2 dummy/0 bytes, which will be filled with the encrypted - /// message length and its MAC. It should then be followed by the message bytes themselves - /// (including the two byte message type). + /// The buffer at `buf[offset..]` must begin with 16 + 2 dummy/0 bytes, which will be filled + /// with the encrypted message length and its MAC. It should then be followed by the message + /// bytes themselves (including the two byte message type). /// - /// For effeciency, the [`Vec::capacity`] should be at least 16 bytes larger than the + /// For efficiency, the [`Vec::capacity`] should be at least 16 bytes larger than the /// [`Vec::len`], to avoid reallocating for the message MAC, which will be appended to the vec. - fn encrypt_message_with_header_0s(&mut self, msgbuf: &mut Vec) { - let msg_len = msgbuf.len() - 16 - 2; + pub(crate) fn encrypt_with_header_0s_at(&mut self, buf: &mut Vec, offset: usize) { + let msg_len = buf.len() - offset - 16 - 2; if msg_len > LN_MAX_MSG_LEN { panic!("Attempted to encrypt message longer than 65535 bytes!"); } @@ -525,7 +525,7 @@ impl PeerChannelEncryptor { } Self::encrypt_with_ad( - &mut msgbuf[0..16 + 2], + &mut buf[offset..offset + 16 + 2], *sn, sk, &[0; 0], @@ -533,34 +533,41 @@ impl PeerChannelEncryptor { ); *sn += 1; - Self::encrypt_in_place_with_ad(msgbuf, 16 + 2, *sn, sk, &[0; 0]); + Self::encrypt_in_place_with_ad(buf, offset + 16 + 2, *sn, sk, &[0; 0]); *sn += 1; }, _ => panic!("Tried to encrypt a message prior to noise handshake completion"), } } - /// Encrypts the given pre-serialized message, returning the encrypted version. - /// panics if msg.len() > 65535 or Noise handshake has not finished. - pub fn encrypt_buffer(&mut self, mut msg: MessageBuf) -> Vec { - self.encrypt_message_with_header_0s(&mut msg.0); - msg.0 + /// Encrypts the given message directly into the destination buffer, appending encrypted bytes. + pub fn encrypt_message_into( + &mut self, dest: &mut Vec, message: wire::Message, + ) { + let offset = dest.len(); + // Reserve 16+2 header bytes, then serialize the message type and body. + dest.resize(offset + 16 + 2, 0); + + let mut writer = BorrowedVecWriter(dest); + message + .type_id() + .write(&mut writer) + .expect("In-memory messages must never fail to serialize"); + message.write(&mut writer).expect("In-memory messages must never fail to serialize"); + let dest = writer.0; + + self.encrypt_with_header_0s_at(dest, offset); } - /// Encrypts the given message, returning the encrypted version. - /// panics if the length of `message`, once encoded, is greater than 65535 or if the Noise - /// handshake has not finished. - pub fn encrypt_message(&mut self, message: wire::Message) -> Vec { - // Allocate a buffer with 2KB, fitting most common messages. Reserve the first 16+2 bytes - // for the 2-byte message type prefix and its MAC. - let mut res = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE)); - res.0.resize(16 + 2, 0); - - message.type_id().write(&mut res).expect("In-memory messages must never fail to serialize"); - message.write(&mut res).expect("In-memory messages must never fail to serialize"); - - self.encrypt_message_with_header_0s(&mut res.0); - res.0 + /// Encrypts the given pre-serialized gossip [`MessageBuf`] directly into the destination + /// buffer, appending encrypted bytes. + pub fn encrypt_buffer_into(&mut self, dest: &mut Vec, msg: MessageBuf) { + let offset = dest.len(); + let encoded = &msg.0[16 + 2..]; + // Write the header placeholder + encoded message bytes into dest. + dest.resize(offset + 16 + 2, 0); + dest.extend_from_slice(encoded); + self.encrypt_with_header_0s_at(dest, offset); } /// Decrypts a message length header from the remote peer. @@ -624,6 +631,16 @@ impl PeerChannelEncryptor { } } +/// A [`Writer`] adapter that borrows a `Vec` rather than owning it. +struct BorrowedVecWriter<'a>(&'a mut Vec); +impl<'a> Writer for BorrowedVecWriter<'a> { + #[inline] + fn write_all(&mut self, buf: &[u8]) -> Result<(), crate::io::Error> { + self.0.extend_from_slice(buf); + Ok(()) + } +} + /// A buffer which stores an encoded message (including the two message-type bytes) with some /// padding to allow for future encryption/MACing. pub struct MessageBuf(Vec); @@ -1002,7 +1019,8 @@ mod tests { for i in 0..1005 { let msg = [0x68, 0x65, 0x6c, 0x6c, 0x6f]; - let mut res = outbound_peer.encrypt_buffer(MessageBuf::from_encoded(&msg)); + let mut res = Vec::new(); + outbound_peer.encrypt_buffer_into(&mut res, MessageBuf::from_encoded(&msg)); assert_eq!(res.len(), 5 + 2 * 16 + 2); let len_header = res[0..2 + 16].to_vec(); @@ -1047,7 +1065,8 @@ mod tests { fn max_message_len_encryption() { let mut outbound_peer = get_outbound_peer_for_initiator_test_vectors(); let msg = [4u8; LN_MAX_MSG_LEN + 1]; - outbound_peer.encrypt_buffer(MessageBuf::from_encoded(&msg)); + let mut dest = Vec::new(); + outbound_peer.encrypt_buffer_into(&mut dest, MessageBuf::from_encoded(&msg)); } #[test] diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 759a1e7d887..7c37052efb9 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -741,11 +741,27 @@ enum MessageBatchImpl { CommitmentSigned(Vec), } -/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until -/// we have fewer than this many messages in the outbound buffer again. -/// We also use this as the target number of outbound gossip messages to keep in the write buffer, -/// refilled as we send bytes. -const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12; +/// The fixed chunk size (in bytes) for all outbound traffic. All data written to the socket +/// will be in increments of this size, with Ping messages used as padding filler. +/// +/// Sized to fit in a single TCP/IP packet on a standard 1500-byte Ethernet MTU: +/// 1500 (MTU) - 40 (IPv6 header) - 32 (TCP header with timestamps) = 1428 bytes available. +/// 1400 provides a 28-byte margin that also survives PPPoE tunnels (MTU 1492). +#[cfg(fuzzing)] +pub const CHUNK_SIZE: usize = 1400; +#[cfg(not(fuzzing))] +const CHUNK_SIZE: usize = 1400; + +/// The minimum size of an encrypted Ping message: 6-byte plaintext (2-byte type + 2-byte ponglen +/// + 2-byte CollectionLength) + 34 bytes encryption overhead (18-byte header + 16-byte body MAC). +const MIN_ENCRYPTED_PING_SIZE: usize = 40; + +/// When the outbound chunk buffer has at least this many pending bytes, we'll stop reading bytes +/// from the peer until we have fewer pending bytes again. +/// +/// Set to 10 chunks (~14 KB), roughly equivalent to the old 12-message limit given that typical +/// encrypted messages are a few hundred bytes each. +const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10 * CHUNK_SIZE; /// If we've sent a ping, and are still awaiting a response, we may need to churn our way through /// the socket receive buffer before receiving the ping. @@ -783,6 +799,179 @@ const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; /// the equivalent maximum buffer size for gossip backfill is zero. const OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP: usize = 64 * 1024 * 2; +/// A message queue that accumulates encrypted bytes and writes them to the socket in fixed-size +/// chunks of [`CHUNK_SIZE`] bytes. Any remaining space in a chunk is padded with encrypted Ping +/// messages, so that an observer sees only constant-size writes on the wire. +struct ChunkedMessageQueue { + /// Contiguous buffer of encrypted bytes. Bytes `[send_offset..len]` are pending. + buffer: Vec, + /// Next byte to send; bytes before this have already been written to the socket. + send_offset: usize, + /// Tracks the number of unsent bytes in `buffer` that correspond to actual encrypted + /// messages (i.e., NOT Ping padding). Used for gossip drop threshold calculations so that + /// Ping padding overhead doesn't artificially reduce the gossip budget. + pending_msg_bytes: usize, + /// Pre-serialized (unencrypted) gossip messages, ready for encryption. + gossip_broadcast_buffer: VecDeque, +} + +impl ChunkedMessageQueue { + fn new() -> Self { + ChunkedMessageQueue { + buffer: Vec::with_capacity(2 * CHUNK_SIZE), + send_offset: 0, + pending_msg_bytes: 0, + gossip_broadcast_buffer: VecDeque::new(), + } + } + + /// Returns the number of unsent bytes in the buffer. + fn pending_bytes(&self) -> usize { + self.buffer.len() - self.send_offset + } + + /// Returns whether a full chunk is ready to send. + fn has_full_chunk(&self) -> bool { + self.pending_bytes() >= CHUNK_SIZE + } + + /// Encrypts and appends a message to the buffer. + fn encrypt_and_push_message( + &mut self, encryptor: &mut PeerChannelEncryptor, message: wire::Message, + ) { + let before = self.buffer.len(); + encryptor.encrypt_message_into(&mut self.buffer, message); + self.pending_msg_bytes += self.buffer.len() - before; + } + + /// Encrypts and appends a pre-serialized gossip [`MessageBuf`] to the buffer. + fn encrypt_and_push_gossip(&mut self, encryptor: &mut PeerChannelEncryptor, msg: MessageBuf) { + let before = self.buffer.len(); + encryptor.encrypt_buffer_into(&mut self.buffer, msg); + self.pending_msg_bytes += self.buffer.len() - before; + } + + /// Appends raw (pre-encryption) bytes to the buffer. Used for handshake act bytes. + fn push_raw(&mut self, data: &[u8]) { + self.buffer.extend_from_slice(data); + self.pending_msg_bytes += data.len(); + } + + /// Fills remaining space in the current chunk with Ping padding, then encrypts the padding. + /// + /// If the remaining space is less than [`MIN_ENCRYPTED_PING_SIZE`], the Ping overflows into + /// the next chunk. In that case, a second Ping is used to pad the remainder of the second + /// chunk. At most two Pings are ever needed because `CHUNK_SIZE` >> `MIN_ENCRYPTED_PING_SIZE`. + fn pad_and_finalize_chunk(&mut self, encryptor: &mut PeerChannelEncryptor) { + let pending = self.pending_bytes(); + if pending == 0 { + return; + } + + let remainder = CHUNK_SIZE - (pending % CHUNK_SIZE); + if remainder == CHUNK_SIZE { + // Already chunk-aligned. + return; + } + + if remainder >= MIN_ENCRYPTED_PING_SIZE { + // Enough space for a single Ping to fill the remainder exactly. + self.push_ping_padding(encryptor, remainder); + } else { + // Not enough space for even a minimal Ping. Send a minimal Ping that overflows + // into the next chunk, then pad the rest of that next chunk with a second Ping. + self.push_ping_padding(encryptor, MIN_ENCRYPTED_PING_SIZE); + let pending = self.pending_bytes(); + let remainder2 = CHUNK_SIZE - (pending % CHUNK_SIZE); + debug_assert_ne!(remainder2, 0); + debug_assert!(remainder2 >= MIN_ENCRYPTED_PING_SIZE); + self.push_ping_padding(encryptor, remainder2); + } + + debug_assert_eq!(self.pending_bytes() % CHUNK_SIZE, 0); + } + + /// Writes encrypted Ping padding of exactly `size` encrypted bytes into the buffer. + /// + /// The Ping is constructed with `ponglen = 65533` so the counterparty does NOT respond with a + /// Pong (per BOLT-1: "if `ponglen` is less than 65532 it MUST respond ..."). + fn push_ping_padding(&mut self, encryptor: &mut PeerChannelEncryptor, size: usize) { + debug_assert!(size >= MIN_ENCRYPTED_PING_SIZE); + + // Encryption overhead: 18-byte header (2-byte encrypted length + 16-byte MAC) + // + 16-byte body MAC = 34 bytes + // Ping plaintext: 2-byte type (0x0012) + 2-byte ponglen + 2-byte CollectionLength + zeros + let plaintext_len = size - 34; // total - overhead + let byteslen = plaintext_len - 2 - 2 - 2; // minus type, ponglen, CollectionLength + + let offset = self.buffer.len(); + // Reserve space: 18-byte header + plaintext + 16-byte body MAC + self.buffer.resize(offset + 18 + plaintext_len, 0); + + // Write plaintext starting after the 18-byte header. + let plaintext_start = offset + 18; + // Message type: Ping = 18 (0x0012) + self.buffer[plaintext_start..plaintext_start + 2].copy_from_slice(&18u16.to_be_bytes()); + // ponglen: 65533 to suppress Pong response + self.buffer[plaintext_start + 2..plaintext_start + 4] + .copy_from_slice(&65533u16.to_be_bytes()); + // CollectionLength (byteslen as u16, since byteslen < 0xffff) + self.buffer[plaintext_start + 4..plaintext_start + 6] + .copy_from_slice(&(byteslen as u16).to_be_bytes()); + // Remaining bytes are already zero from resize. + + encryptor.encrypt_with_header_0s_at(&mut self.buffer, offset); + } + + /// Sends exactly one chunk of [`CHUNK_SIZE`] bytes to the descriptor. + /// + /// Returns the number of bytes actually written. If fewer than `CHUNK_SIZE` bytes were written, + /// this indicates the socket is full and we should set `awaiting_write_event`. + fn send_chunk(&mut self, descriptor: &mut impl SocketDescriptor, continue_read: bool) -> usize { + debug_assert!(self.pending_bytes() >= CHUNK_SIZE); + let chunk_end = self.send_offset + CHUNK_SIZE; + let data = &self.buffer[self.send_offset..chunk_end]; + let sent = descriptor.send_data(data, continue_read); + self.send_offset += sent; + // Reduce pending_msg_bytes proportionally: a chunk may contain both real messages and + // padding, but once fully sent those message bytes are gone. Use saturating_sub since + // after padding, pending_msg_bytes < pending_bytes. + self.pending_msg_bytes = self.pending_msg_bytes.saturating_sub(sent); + self.maybe_compact(); + sent + } + + /// Sends raw (non-chunked) bytes from the buffer, used during handshake before encryption is + /// established. Returns the number of bytes sent. + fn send_raw(&mut self, descriptor: &mut impl SocketDescriptor, continue_read: bool) -> usize { + let data = &self.buffer[self.send_offset..]; + if data.is_empty() { + return 0; + } + let sent = descriptor.send_data(data, continue_read); + self.send_offset += sent; + self.pending_msg_bytes = self.pending_msg_bytes.saturating_sub(sent); + self.maybe_compact(); + sent + } + + /// Drains already-sent bytes from the front of the buffer when `send_offset` is large enough. + fn maybe_compact(&mut self) { + if self.send_offset >= CHUNK_SIZE { + self.buffer.drain(..self.send_offset); + self.send_offset = 0; + } + } + + /// Returns the total buffered bytes of actual messages (pending encrypted messages, excluding + /// Ping padding, plus pending gossip broadcast estimate). + /// Used for determining when to drop gossip broadcasts. + fn total_buffered_bytes(&self) -> usize { + self.pending_msg_bytes + + self.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::() + } +} + struct Peer { channel_encryptor: PeerChannelEncryptor, /// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip @@ -797,13 +986,10 @@ struct Peer { their_features: Option, their_socket_address: Option, - pending_outbound_buffer: VecDeque>, - pending_outbound_buffer_first_msg_offset: usize, - /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily - /// prioritize channel messages over them. - /// - /// Note that these messages are *not* encrypted/MAC'd, and are only serialized. - gossip_broadcast_buffer: VecDeque, + /// Chunked message queue that replaces the old `pending_outbound_buffer` and + /// `gossip_broadcast_buffer`. Accumulates encrypted bytes and writes them in fixed-size + /// chunks to the socket, with Ping padding to fill any remaining space. + message_queue: ChunkedMessageQueue, awaiting_write_event: bool, /// Set to true if the last call to [`SocketDescriptor::send_data`] for this peer had the /// `should_read` flag unset, indicating we've told the driver to stop reading from this peer. @@ -886,15 +1072,15 @@ impl Peer { if !gossip_processing_backlogged { self.received_channel_announce_since_backlogged = false; } - self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + self.message_queue.pending_bytes() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged) } /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's /// outbound buffer. This is checked every time the peer's buffer may have been drained. fn should_buffer_gossip_backfill(&self) -> bool { - self.pending_outbound_buffer.is_empty() - && self.gossip_broadcast_buffer.is_empty() + !self.message_queue.has_full_chunk() + && self.message_queue.gossip_broadcast_buffer.is_empty() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK && self.handshake_complete() } @@ -902,7 +1088,7 @@ impl Peer { /// Determines if we should push an onion message onto a peer's outbound buffer. This is checked /// every time the peer's buffer may have been drained. fn should_buffer_onion_message(&self) -> bool { - self.pending_outbound_buffer.is_empty() + !self.message_queue.has_full_chunk() && self.handshake_complete() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } @@ -910,18 +1096,14 @@ impl Peer { /// Determines if we should push additional gossip broadcast messages onto a peer's outbound /// buffer. This is checked every time the peer's buffer may have been drained. fn should_buffer_gossip_broadcast(&self) -> bool { - self.pending_outbound_buffer.is_empty() + !self.message_queue.has_full_chunk() && self.handshake_complete() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts. fn buffer_full_drop_gossip_broadcast(&self) -> bool { - let total_outbound_buffered: usize = - self.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::() - + self.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::(); - - total_outbound_buffered > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + self.message_queue.total_buffered_bytes() > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP } fn set_their_node_id(&mut self, node_id: PublicKey) { @@ -1426,9 +1608,7 @@ impl< their_features: None, their_socket_address: remote_network_address, - pending_outbound_buffer: VecDeque::new(), - pending_outbound_buffer_first_msg_offset: 0, - gossip_broadcast_buffer: VecDeque::new(), + message_queue: ChunkedMessageQueue::new(), awaiting_write_event: false, sent_pause_read: false, @@ -1487,9 +1667,7 @@ impl< their_features: None, their_socket_address: remote_network_address, - pending_outbound_buffer: VecDeque::new(), - pending_outbound_buffer_first_msg_offset: 0, - gossip_broadcast_buffer: VecDeque::new(), + message_queue: ChunkedMessageQueue::new(), awaiting_write_event: false, sent_pause_read: false, @@ -1534,7 +1712,36 @@ impl< // indicating whether or not reads are paused. Do this by forcing a write with the desired // `continue_read` flag set, even if no outbound messages are currently queued. force_one_write |= self.should_read_from(peer) == peer.sent_pause_read; + + // If encryption isn't ready yet (handshake in progress), send raw bytes without chunking. + if !peer.channel_encryptor.is_ready_for_encryption() { + if peer.message_queue.pending_bytes() > 0 || force_one_write { + let should_read = self.should_read_from(peer); + peer.message_queue.send_raw(descriptor, should_read); + peer.sent_pause_read = !should_read; + if peer.message_queue.pending_bytes() > 0 { + peer.awaiting_write_event = true; + } + } + return; + } + while force_one_write || !peer.awaiting_write_event { + // First, try to send any full chunks already in the buffer. + if peer.message_queue.has_full_chunk() { + let should_read = self.should_read_from(peer); + let sent = peer.message_queue.send_chunk(descriptor, should_read); + peer.sent_pause_read = !should_read; + force_one_write = false; + if sent < CHUNK_SIZE { + peer.awaiting_write_event = true; + } + continue; + } + + // Buffer more messages (in priority order): onion -> gossip broadcast -> backfill. + let pending_before = peer.message_queue.pending_bytes(); + if peer.should_buffer_onion_message() { if let Some((peer_node_id, _)) = peer.their_node_id { let handler = &self.message_handler.onion_message_handler; @@ -1547,10 +1754,9 @@ impl< } } if peer.should_buffer_gossip_broadcast() { - if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() { + if let Some(msg) = peer.message_queue.gossip_broadcast_buffer.pop_front() { peer.msgs_sent_since_pong += 1; - peer.pending_outbound_buffer - .push_back(peer.channel_encryptor.encrypt_buffer(msg)); + peer.message_queue.encrypt_and_push_gossip(&mut peer.channel_encryptor, msg); } } if peer.should_buffer_gossip_backfill() { @@ -1606,37 +1812,27 @@ impl< self.maybe_send_extra_ping(peer); } - let should_read = self.should_read_from(peer); - let next_buff = match peer.pending_outbound_buffer.front() { - None => { - if force_one_write { - let data_sent = descriptor.send_data(&[], should_read); - debug_assert_eq!(data_sent, 0, "Can't write more than no data"); - peer.sent_pause_read = !should_read; - } - return; - }, - Some(buff) => buff, - }; - force_one_write = false; - - let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, should_read); - peer.sent_pause_read = !should_read; - peer.pending_outbound_buffer_first_msg_offset += data_sent; - if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { - peer.pending_outbound_buffer_first_msg_offset = 0; - peer.pending_outbound_buffer.pop_front(); - const VEC_SIZE: usize = ::core::mem::size_of::>(); - let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE; - let lots_of_slack = peer.pending_outbound_buffer.len() - < peer.pending_outbound_buffer.capacity() / 2; - if large_capacity && lots_of_slack { - peer.pending_outbound_buffer.shrink_to_fit(); - } - } else { - peer.awaiting_write_event = true; + let added_messages = peer.message_queue.pending_bytes() > pending_before; + if added_messages { + // We added messages. If we have a full chunk now, loop back to send it. + continue; + } + + // No new messages were added. If we have a partial buffer, pad + send it. + if peer.message_queue.pending_bytes() > 0 { + peer.message_queue.pad_and_finalize_chunk(&mut peer.channel_encryptor); + // The buffer is now chunk-aligned; loop back to send the chunk(s). + continue; } + + // Nothing pending at all. + if force_one_write { + let should_read = self.should_read_from(peer); + let data_sent = descriptor.send_data(&[], should_read); + debug_assert_eq!(data_sent, 0, "Can't write more than no data"); + peer.sent_pause_read = !should_read; + } + return; } } @@ -1716,7 +1912,7 @@ impl< debug_assert!(false, "node_id should be set by the time we send a message"); } peer.msgs_sent_since_pong += 1; - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message)); + peer.message_queue.encrypt_and_push_message(&mut peer.channel_encryptor, message); } fn do_read_event( @@ -1847,7 +2043,7 @@ impl< &self.secp_ctx, ); let act_two = try_potential_handleerror!(peer, res).to_vec(); - peer.pending_outbound_buffer.push_back(act_two); + peer.message_queue.push_raw(&act_two); peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long }, NextNoiseStep::ActTwo => { @@ -1855,7 +2051,7 @@ impl< .channel_encryptor .process_act_two(&peer.pending_read_buffer[..], &self.node_signer); let (act_three, their_node_id) = try_potential_handleerror!(peer, res); - peer.pending_outbound_buffer.push_back(act_three.to_vec()); + peer.message_queue.push_raw(&act_three); peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes peer.pending_read_is_header = true; @@ -2662,7 +2858,7 @@ impl< continue; } let encoded_message = MessageBuf::from_encoded(&encoded_msg); - peer.gossip_broadcast_buffer.push_back(encoded_message); + peer.message_queue.gossip_broadcast_buffer.push_back(encoded_message); } }, BroadcastGossipMessage::NodeAnnouncement(msg) => { @@ -2708,7 +2904,7 @@ impl< continue; } let encoded_message = MessageBuf::from_encoded(&encoded_msg); - peer.gossip_broadcast_buffer.push_back(encoded_message); + peer.message_queue.gossip_broadcast_buffer.push_back(encoded_message); } }, BroadcastGossipMessage::ChannelUpdate { msg, node_id_1, node_id_2 } => { @@ -2748,7 +2944,7 @@ impl< continue; } let encoded_message = MessageBuf::from_encoded(&encoded_msg); - peer.gossip_broadcast_buffer.push_back(encoded_message); + peer.message_queue.gossip_broadcast_buffer.push_back(encoded_message); } }, } @@ -4283,7 +4479,8 @@ mod tests { let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 }; let msg: Message<()> = Message::Ping(not_init_msg); - let msg_bytes = dup_encryptor.encrypt_message(msg); + let mut msg_bytes = Vec::new(); + dup_encryptor.encrypt_message_into(&mut msg_bytes, msg); assert!(peers[0].read_event(&mut fd_dup, &msg_bytes).is_err()); } @@ -4479,8 +4676,8 @@ mod tests { { let peer_lock = peers[1].peers.read().unwrap(); let peer = peer_lock.get(&fd_b).unwrap().lock().unwrap(); - assert_eq!(peer.pending_outbound_buffer.len(), 1); - assert_eq!(peer.gossip_broadcast_buffer.len(), 0); + assert!(peer.message_queue.pending_bytes() > 0); + assert!(peer.message_queue.gossip_broadcast_buffer.is_empty()); } // At this point we should have sent channel announcements up to roughly SCID 150. Now @@ -4509,10 +4706,10 @@ mod tests { { let peer_lock = peers[1].peers.read().unwrap(); let peer = peer_lock.get(&fd_b).unwrap().lock().unwrap(); - assert_eq!(peer.pending_outbound_buffer.len(), 1); - assert_eq!(peer.gossip_broadcast_buffer.len(), 1); + assert!(peer.message_queue.pending_bytes() > 0); + assert_eq!(peer.message_queue.gossip_broadcast_buffer.len(), 1); - let pending_msg = &peer.gossip_broadcast_buffer[0]; + let pending_msg = &peer.message_queue.gossip_broadcast_buffer[0]; let msg: Message<()> = Message::ChannelUpdate(msg_100); let expected = encode_message(msg); assert_eq!(expected, pending_msg.fetch_encoded_msg_with_type_pfx()); @@ -4751,8 +4948,7 @@ mod tests { { let peer_a_lock = peers[0].peers.read().unwrap(); let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap(); - let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::() - + peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::(); + let buf_len = peer.message_queue.total_buffered_bytes(); assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP - encoded_size); assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP); } @@ -4765,8 +4961,7 @@ mod tests { { let peer_a_lock = peers[0].peers.read().unwrap(); let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap(); - let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::() - + peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::(); + let buf_len = peer.message_queue.total_buffered_bytes(); assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP); assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + encoded_size); } @@ -4782,8 +4977,14 @@ mod tests { drain_queues!(); { let peer_a_lock = peers[0].peers.read().unwrap(); - let empty = - peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty(); + let empty = peer_a_lock + .get(&fd_a) + .unwrap() + .lock() + .unwrap() + .message_queue + .gossip_broadcast_buffer + .is_empty(); assert!(empty); } @@ -4793,6 +4994,162 @@ mod tests { ); } + /// Helper: completes a noise handshake and returns the outbound encryptor ready for encryption. + fn get_test_encryptor() -> PeerChannelEncryptor { + let secp_ctx = Secp256k1::new(); + // Inbound peer identity (the "responder"). + let inbound_secret = SecretKey::from_slice(&[42; 32]).unwrap(); + let inbound_pubkey = + bitcoin::secp256k1::PublicKey::from_secret_key(&secp_ctx, &inbound_secret); + let inbound_signer = crate::util::test_utils::TestNodeSigner::new(inbound_secret); + + // Outbound peer identity (the "initiator"). + let outbound_secret = SecretKey::from_slice(&[43; 32]).unwrap(); + let outbound_signer = crate::util::test_utils::TestNodeSigner::new(outbound_secret); + + let outbound_ephemeral = SecretKey::from_slice(&[44; 32]).unwrap(); + let inbound_ephemeral = SecretKey::from_slice(&[45; 32]).unwrap(); + + let mut outbound = PeerChannelEncryptor::new_outbound(inbound_pubkey, outbound_ephemeral); + let mut inbound = PeerChannelEncryptor::new_inbound(&&inbound_signer); + + let act_one = outbound.get_act_one(&secp_ctx); + let act_two = inbound + .process_act_one_with_keys(&act_one, &&inbound_signer, inbound_ephemeral, &secp_ctx) + .unwrap(); + let (act_three, _) = outbound.process_act_two(&act_two, &&outbound_signer).unwrap(); + let _ = inbound.process_act_three(&act_three).unwrap(); + + outbound + } + + #[test] + fn test_chunked_message_queue_ping_padding() { + // Tests that Ping padding correctly fills the remainder of a chunk. + let mut encryptor = get_test_encryptor(); + + // Test various remainder sizes to ensure padding works correctly. + for msg_size in [40, 100, 500, 1000, 5000, 30000, 65535] { + let mut queue = ChunkedMessageQueue::new(); + // Push a raw blob of msg_size bytes to simulate encrypted message data. + let fake_data = vec![0u8; msg_size]; + queue.buffer.extend_from_slice(&fake_data); + queue.pending_msg_bytes += msg_size; + + queue.pad_and_finalize_chunk(&mut encryptor); + assert_eq!( + queue.pending_bytes() % CHUNK_SIZE, + 0, + "Buffer not chunk-aligned after padding for msg_size={}", + msg_size + ); + assert!( + queue.pending_bytes() >= CHUNK_SIZE, + "Buffer should be at least one chunk for msg_size={}", + msg_size + ); + } + } + + #[test] + fn test_chunked_message_queue_small_remainder_overflow() { + // Tests the edge case where remainder < MIN_ENCRYPTED_PING_SIZE, requiring two Pings. + let mut encryptor = get_test_encryptor(); + + // Test remainders from 1 to MIN_ENCRYPTED_PING_SIZE-1 (the overflow cases). + for remainder in 1..MIN_ENCRYPTED_PING_SIZE { + let mut queue = ChunkedMessageQueue::new(); + // Fill buffer so that exactly `remainder` bytes are left in the current chunk. + let fill_size = CHUNK_SIZE - remainder; + queue.buffer.resize(fill_size, 0); + queue.pending_msg_bytes = fill_size; + + queue.pad_and_finalize_chunk(&mut encryptor); + assert_eq!( + queue.pending_bytes() % CHUNK_SIZE, + 0, + "Buffer not chunk-aligned for remainder={}", + remainder + ); + // Should overflow into exactly 2 chunks. + assert_eq!( + queue.pending_bytes(), + 2 * CHUNK_SIZE, + "Expected 2 chunks for small remainder={}", + remainder + ); + } + } + + #[test] + fn test_chunked_message_queue_chunk_alignment() { + // Tests that after multiple messages the buffer stays correctly aligned after padding. + let mut encryptor = get_test_encryptor(); + let mut queue = ChunkedMessageQueue::new(); + + // Encrypt several Ping messages of various sizes. + for pong_len in [0u16, 64, 256, 1024] { + let ping = msgs::Ping { ponglen: pong_len, byteslen: 64 }; + let msg: wire::Message<()> = wire::Message::Ping(ping); + queue.encrypt_and_push_message(&mut encryptor, msg); + } + + let pending_before_pad = queue.pending_bytes(); + assert!(pending_before_pad > 0); + + queue.pad_and_finalize_chunk(&mut encryptor); + + assert_eq!(queue.pending_bytes() % CHUNK_SIZE, 0); + assert!(queue.pending_bytes() >= pending_before_pad); + } + + #[test] + fn test_chunked_message_queue_buffer_compaction() { + // Tests that maybe_compact drains sent bytes appropriately. + let mut queue = ChunkedMessageQueue::new(); + + // Fill with 2 chunks worth of data. + queue.buffer.resize(2 * CHUNK_SIZE, 0xAB); + queue.pending_msg_bytes = 2 * CHUNK_SIZE; + assert_eq!(queue.pending_bytes(), 2 * CHUNK_SIZE); + + // Simulate sending the first chunk. + queue.send_offset = CHUNK_SIZE; + queue.pending_msg_bytes = CHUNK_SIZE; + queue.maybe_compact(); + + // After compaction, send_offset should be 0 and buffer should be one chunk. + assert_eq!(queue.send_offset, 0); + assert_eq!(queue.buffer.len(), CHUNK_SIZE); + assert_eq!(queue.pending_bytes(), CHUNK_SIZE); + } + + #[test] + fn test_chunked_message_queue_pending_msg_bytes_tracking() { + // Tests that pending_msg_bytes correctly tracks message bytes vs padding bytes. + let mut encryptor = get_test_encryptor(); + let mut queue = ChunkedMessageQueue::new(); + + // Encrypt a small message. + let ping = msgs::Ping { ponglen: 0, byteslen: 64 }; + let msg: wire::Message<()> = wire::Message::Ping(ping); + queue.encrypt_and_push_message(&mut encryptor, msg); + + let msg_bytes = queue.pending_msg_bytes; + assert!(msg_bytes > 0); + assert_eq!(msg_bytes, queue.pending_bytes()); + + // After padding, pending_bytes increases but pending_msg_bytes stays the same. + queue.pad_and_finalize_chunk(&mut encryptor); + + assert_eq!(queue.pending_msg_bytes, msg_bytes); + assert!(queue.pending_bytes() > msg_bytes); + assert_eq!(queue.pending_bytes() % CHUNK_SIZE, 0); + + // total_buffered_bytes should use pending_msg_bytes, not pending_bytes. + assert_eq!(queue.total_buffered_bytes(), msg_bytes); + } + #[test] fn test_filter_addresses() { // Tests the filter_addresses function.