diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 188bee166..baac8ec8b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -81,6 +81,14 @@ jobs: - name: Check release build with UniFFI support on Rust ${{ matrix.toolchain }} if: matrix.build-uniffi run: cargo check --release --features uniffi --verbose --color always + - name: Ban unwrap in library code on Rust ${{ matrix.toolchain }} + if: matrix.build-uniffi + env: + RUSTFLAGS: "" + run: | + rustup component add clippy + cargo clippy --lib --verbose --color always -- -A warnings -D clippy::unwrap_used -A clippy::tabs_in_doc_comments + cargo clippy --lib --features uniffi --verbose --color always -- -A warnings -D clippy::unwrap_used -A clippy::tabs_in_doc_comments - name: Test on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest'" run: | diff --git a/build.rs b/build.rs index f011148e7..2e080ddcd 100644 --- a/build.rs +++ b/build.rs @@ -7,5 +7,6 @@ fn main() { #[cfg(feature = "uniffi")] - uniffi::generate_scaffolding("bindings/ldk_node.udl").unwrap(); + uniffi::generate_scaffolding("bindings/ldk_node.udl") + .expect("the checked-in UniFFI UDL should always generate scaffolding"); } diff --git a/src/balance.rs b/src/balance.rs index 6c6ad946d..2339c83e1 100644 --- a/src/balance.rs +++ b/src/balance.rs @@ -232,7 +232,9 @@ impl LightningBalance { inbound_htlc_rounded_msat, } => { // unwrap safety: confirmed_balance_candidate_index is guaranteed to index into balance_candidates - let balance = balance_candidates.get(confirmed_balance_candidate_index).unwrap(); + let balance = balance_candidates + .get(confirmed_balance_candidate_index) + .expect("LDK should provide a valid confirmed balance candidate index"); Self::ClaimableOnChannelClose { channel_id, diff --git a/src/builder.rs b/src/builder.rs index cd8cc184f..350dbfcc8 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -80,6 +80,7 @@ use crate::types::{ GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, SyncAndAsyncKVStore, }; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; use crate::{Node, NodeMetrics}; @@ -189,6 +190,8 @@ pub enum BuildError { WalletSetupFailed, /// We failed to setup the logger. LoggerSetupFailed, + /// We failed to setup the configured chain source. + ChainSourceSetupFailed, /// The given network does not match the node's previously configured network. NetworkMismatch, /// The role of the node in an asynchronous payments context is not compatible with the current configuration. @@ -216,6 +219,7 @@ impl fmt::Display for BuildError { Self::KVStoreSetupFailed => write!(f, "Failed to setup KVStore."), Self::WalletSetupFailed => write!(f, "Failed to setup onchain wallet."), Self::LoggerSetupFailed => write!(f, "Failed to setup the logger."), + Self::ChainSourceSetupFailed => write!(f, "Failed to setup the chain source."), Self::InvalidNodeAlias => write!(f, "Given node alias is invalid."), Self::NetworkMismatch => { write!(f, "Given network does not match the node's previously configured network.") @@ -861,7 +865,7 @@ impl ArcedNodeBuilder { pub fn set_chain_source_esplora( &self, server_url: String, sync_config: Option, ) { - self.inner.write().unwrap().set_chain_source_esplora(server_url, sync_config); + self.inner.wlck().set_chain_source_esplora(server_url, sync_config); } /// Configures the [`Node`] instance to source its chain data from the given Esplora server. @@ -875,11 +879,7 @@ impl ArcedNodeBuilder { &self, server_url: String, headers: HashMap, sync_config: Option, ) { - self.inner.write().unwrap().set_chain_source_esplora_with_headers( - server_url, - headers, - sync_config, - ); + self.inner.wlck().set_chain_source_esplora_with_headers(server_url, headers, sync_config); } /// Configures the [`Node`] instance to source its chain data from the given Electrum server. @@ -889,7 +889,7 @@ impl ArcedNodeBuilder { pub fn set_chain_source_electrum( &self, server_url: String, sync_config: Option, ) { - self.inner.write().unwrap().set_chain_source_electrum(server_url, sync_config); + self.inner.wlck().set_chain_source_electrum(server_url, sync_config); } /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. @@ -903,12 +903,7 @@ impl ArcedNodeBuilder { pub fn set_chain_source_bitcoind_rpc( &self, rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, ) { - self.inner.write().unwrap().set_chain_source_bitcoind_rpc( - rpc_host, - rpc_port, - rpc_user, - rpc_password, - ); + self.inner.wlck().set_chain_source_bitcoind_rpc(rpc_host, rpc_port, rpc_user, rpc_password); } /// Configures the [`Node`] instance to synchronize chain data from a Bitcoin Core REST endpoint. @@ -924,7 +919,7 @@ impl ArcedNodeBuilder { &self, rest_host: String, rest_port: u16, rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, ) { - self.inner.write().unwrap().set_chain_source_bitcoind_rest( + self.inner.wlck().set_chain_source_bitcoind_rest( rest_host, rest_port, rpc_host, @@ -937,20 +932,20 @@ impl ArcedNodeBuilder { /// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer /// network. pub fn set_gossip_source_p2p(&self) { - self.inner.write().unwrap().set_gossip_source_p2p(); + self.inner.wlck().set_gossip_source_p2p(); } /// Configures the [`Node`] instance to source its gossip data from the given RapidGossipSync /// server. pub fn set_gossip_source_rgs(&self, rgs_server_url: String) { - self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url); + self.inner.wlck().set_gossip_source_rgs(rgs_server_url); } /// Configures the [`Node`] instance to source its external scores from the given URL. /// /// The external scores are merged into the local scoring system to improve routing. pub fn set_pathfinding_scores_source(&self, url: String) { - self.inner.write().unwrap().set_pathfinding_scores_source(url); + self.inner.wlck().set_pathfinding_scores_source(url); } /// Configures the [`Node`] instance to source inbound liquidity from the given @@ -964,7 +959,7 @@ impl ArcedNodeBuilder { pub fn set_liquidity_source_lsps1( &self, node_id: PublicKey, address: SocketAddress, token: Option, ) { - self.inner.write().unwrap().set_liquidity_source_lsps1(node_id, address, token); + self.inner.wlck().set_liquidity_source_lsps1(node_id, address, token); } /// Configures the [`Node`] instance to source just-in-time inbound liquidity from the given @@ -978,7 +973,7 @@ impl ArcedNodeBuilder { pub fn set_liquidity_source_lsps2( &self, node_id: PublicKey, address: SocketAddress, token: Option, ) { - self.inner.write().unwrap().set_liquidity_source_lsps2(node_id, address, token); + self.inner.wlck().set_liquidity_source_lsps2(node_id, address, token); } /// Configures the [`Node`] instance to provide an [LSPS2] service, issuing just-in-time @@ -988,12 +983,12 @@ impl ArcedNodeBuilder { /// /// [LSPS2]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md pub fn set_liquidity_provider_lsps2(&self, service_config: LSPS2ServiceConfig) { - self.inner.write().unwrap().set_liquidity_provider_lsps2(service_config); + self.inner.wlck().set_liquidity_provider_lsps2(service_config); } /// Sets the used storage directory path. pub fn set_storage_dir_path(&self, storage_dir_path: String) { - self.inner.write().unwrap().set_storage_dir_path(storage_dir_path); + self.inner.wlck().set_storage_dir_path(storage_dir_path); } /// Configures the [`Node`] instance to write logs to the filesystem. @@ -1012,29 +1007,29 @@ impl ArcedNodeBuilder { pub fn set_filesystem_logger( &self, log_file_path: Option, log_level: Option, ) { - self.inner.write().unwrap().set_filesystem_logger(log_file_path, log_level); + self.inner.wlck().set_filesystem_logger(log_file_path, log_level); } /// Configures the [`Node`] instance to write logs to the [`log`](https://crates.io/crates/log) facade. pub fn set_log_facade_logger(&self) { - self.inner.write().unwrap().set_log_facade_logger(); + self.inner.wlck().set_log_facade_logger(); } /// Configures the [`Node`] instance to write logs to the provided custom [`LogWriter`]. pub fn set_custom_logger(&self, log_writer: Arc) { - self.inner.write().unwrap().set_custom_logger(log_writer); + self.inner.wlck().set_custom_logger(log_writer); } /// Sets the Bitcoin network used. pub fn set_network(&self, network: Network) { - self.inner.write().unwrap().set_network(network); + self.inner.wlck().set_network(network); } /// Sets the IP address and TCP port on which [`Node`] will listen for incoming network connections. pub fn set_listening_addresses( &self, listening_addresses: Vec, ) -> Result<(), BuildError> { - self.inner.write().unwrap().set_listening_addresses(listening_addresses).map(|_| ()) + self.inner.wlck().set_listening_addresses(listening_addresses).map(|_| ()) } /// Sets the IP address and TCP port which [`Node`] will announce to the gossip network that it accepts connections on. @@ -1045,7 +1040,7 @@ impl ArcedNodeBuilder { pub fn set_announcement_addresses( &self, announcement_addresses: Vec, ) -> Result<(), BuildError> { - self.inner.write().unwrap().set_announcement_addresses(announcement_addresses).map(|_| ()) + self.inner.wlck().set_announcement_addresses(announcement_addresses).map(|_| ()) } /// Configures the [`Node`] instance to use a Tor SOCKS proxy for outbound connections to peers with OnionV3 addresses. @@ -1054,7 +1049,7 @@ impl ArcedNodeBuilder { /// /// **Note**: If unset, connecting to peer OnionV3 addresses will fail. pub fn set_tor_config(&self, tor_config: TorConfig) -> Result<(), BuildError> { - self.inner.write().unwrap().set_tor_config(tor_config).map(|_| ()) + self.inner.wlck().set_tor_config(tor_config).map(|_| ()) } /// Sets the node alias that will be used when broadcasting announcements to the gossip @@ -1062,14 +1057,14 @@ impl ArcedNodeBuilder { /// /// The provided alias must be a valid UTF-8 string and no longer than 32 bytes in total. pub fn set_node_alias(&self, node_alias: String) -> Result<(), BuildError> { - self.inner.write().unwrap().set_node_alias(node_alias).map(|_| ()) + self.inner.wlck().set_node_alias(node_alias).map(|_| ()) } /// Sets the role of the node in an asynchronous payments context. pub fn set_async_payments_role( &self, role: Option, ) -> Result<(), BuildError> { - self.inner.write().unwrap().set_async_payments_role(role).map(|_| ()) + self.inner.wlck().set_async_payments_role(role).map(|_| ()) } /// Configures the [`Node`] to resync chain data from genesis on first startup, recovering any @@ -1078,13 +1073,13 @@ impl ArcedNodeBuilder { /// This should only be set on first startup when importing an older wallet from a previously /// used [`NodeEntropy`]. pub fn set_wallet_recovery_mode(&self) { - self.inner.write().unwrap().set_wallet_recovery_mode(); + self.inner.wlck().set_wallet_recovery_mode(); } /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { - self.inner.read().unwrap().build(*node_entropy).map(Arc::new) + self.inner.rlck().build(*node_entropy).map(Arc::new) } /// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options @@ -1092,7 +1087,7 @@ impl ArcedNodeBuilder { pub fn build_with_fs_store( &self, node_entropy: Arc, ) -> Result, BuildError> { - self.inner.read().unwrap().build_with_fs_store(*node_entropy).map(Arc::new) + self.inner.rlck().build_with_fs_store(*node_entropy).map(Arc::new) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -1117,8 +1112,7 @@ impl ArcedNodeBuilder { fixed_headers: HashMap, ) -> Result, BuildError> { self.inner - .read() - .unwrap() + .rlck() .build_with_vss_store(*node_entropy, vss_url, store_id, fixed_headers) .map(Arc::new) } @@ -1150,8 +1144,7 @@ impl ArcedNodeBuilder { lnurl_auth_server_url: String, fixed_headers: HashMap, ) -> Result, BuildError> { self.inner - .read() - .unwrap() + .rlck() .build_with_vss_store_and_lnurl_auth( *node_entropy, vss_url, @@ -1179,8 +1172,7 @@ impl ArcedNodeBuilder { fixed_headers: HashMap, ) -> Result, BuildError> { self.inner - .read() - .unwrap() + .rlck() .build_with_vss_store_and_fixed_headers(*node_entropy, vss_url, store_id, fixed_headers) .map(Arc::new) } @@ -1202,8 +1194,7 @@ impl ArcedNodeBuilder { ) -> Result, BuildError> { let adapter = Arc::new(crate::ffi::VssHeaderProviderAdapter::new(header_provider)); self.inner - .read() - .unwrap() + .rlck() .build_with_vss_store_and_header_provider(*node_entropy, vss_url, store_id, adapter) .map(Arc::new) } @@ -1214,7 +1205,7 @@ impl ArcedNodeBuilder { pub fn build_with_store( &self, node_entropy: Arc, kv_store: S, ) -> Result, BuildError> { - self.inner.read().unwrap().build_with_store(*node_entropy, kv_store).map(Arc::new) + self.inner.rlck().build_with_store(*node_entropy, kv_store).map(Arc::new) } } @@ -1310,6 +1301,7 @@ fn build_with_store_internal( Arc::clone(&logger), Arc::clone(&node_metrics), ) + .map_err(|()| BuildError::ChainSourceSetupFailed)? }, Some(ChainDataSourceConfig::Electrum { server_url, sync_config }) => { let sync_config = sync_config.unwrap_or(ElectrumSyncConfig::default()); @@ -1379,6 +1371,7 @@ fn build_with_store_internal( Arc::clone(&logger), Arc::clone(&node_metrics), ) + .map_err(|()| BuildError::ChainSourceSetupFailed)? }, }; let chain_source = Arc::new(chain_source); @@ -1610,7 +1603,7 @@ fn build_with_store_internal( // Restore external pathfinding scores from cache if possible. match external_scores_res { Ok(external_scores) => { - scorer.lock().unwrap().merge(external_scores, cur_time); + scorer.lck().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); }, Err(e) => { @@ -1763,7 +1756,7 @@ fn build_with_store_internal( // Reset the RGS sync timestamp in case we somehow switch gossip sources { - let mut locked_node_metrics = node_metrics.write().unwrap(); + let mut locked_node_metrics = node_metrics.wlck(); locked_node_metrics.latest_rgs_snapshot_timestamp = None; write_node_metrics(&*locked_node_metrics, &*kv_store, Arc::clone(&logger)) .map_err(|e| { @@ -1775,7 +1768,7 @@ fn build_with_store_internal( }, GossipSourceConfig::RapidGossipSync(rgs_server) => { let latest_sync_timestamp = - node_metrics.read().unwrap().latest_rgs_snapshot_timestamp.unwrap_or(0); + node_metrics.rlck().latest_rgs_snapshot_timestamp.unwrap_or(0); Arc::new(GossipSource::new_rgs( rgs_server.clone(), latest_sync_timestamp, diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 26924d8af..06a316055 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -42,6 +42,7 @@ use crate::fee_estimator::{ use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::{Error, NodeMetrics}; const CHAIN_POLLING_INTERVAL_SECS: u64 = 2; @@ -132,7 +133,7 @@ impl BitcoindChainSource { // First register for the wallet polling status to make sure `Node::sync_wallets` calls // wait on the result before proceeding. { - let mut status_lock = self.wallet_polling_status.lock().unwrap(); + let mut status_lock = self.wallet_polling_status.lck(); if status_lock.register_or_subscribe_pending_sync().is_some() { debug_assert!(false, "Sync already in progress. This should never happen."); } @@ -194,15 +195,16 @@ impl BitcoindChainSource { { Ok(chain_tip) => { { + let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0); log_info!( self.logger, "Finished synchronizing listeners in {}ms", - now.elapsed().unwrap().as_millis() + elapsed_ms ); - *self.latest_chain_tip.write().unwrap() = Some(chain_tip); + *self.latest_chain_tip.wlck() = Some(chain_tip); let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = @@ -262,7 +264,7 @@ impl BitcoindChainSource { } // Now propagate the initial result to unblock waiting subscribers. - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(Ok(())); + self.wallet_polling_status.lck().propagate_result_to_subscribers(Ok(())); let mut chain_polling_interval = tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); @@ -346,7 +348,7 @@ impl BitcoindChainSource { match validate_res { Ok(tip) => { - *self.latest_chain_tip.write().unwrap() = Some(tip); + *self.latest_chain_tip.wlck() = Some(tip); Ok(tip) }, Err(e) => { @@ -361,7 +363,7 @@ impl BitcoindChainSource { chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.wallet_polling_status.lock().unwrap(); + let mut status_lock = self.wallet_polling_status.lck(); status_lock.register_or_subscribe_pending_sync() }; @@ -383,7 +385,7 @@ impl BitcoindChainSource { ) .await; - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + self.wallet_polling_status.lck().propagate_result_to_subscribers(res); res } @@ -392,7 +394,7 @@ impl BitcoindChainSource { &self, onchain_wallet: Arc, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { - let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone(); + let latest_chain_tip_opt = self.latest_chain_tip.rlck().clone(); let chain_tip = if let Some(tip) = latest_chain_tip_opt { tip } else { self.poll_chain_tip().await? }; @@ -410,12 +412,9 @@ impl BitcoindChainSource { let now = SystemTime::now(); match spv_client.poll_best_tip().await { Ok((ChainTip::Better(tip), true)) => { - log_trace!( - self.logger, - "Finished polling best tip in {}ms", - now.elapsed().unwrap().as_millis() - ); - *self.latest_chain_tip.write().unwrap() = Some(tip); + let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0); + log_trace!(self.logger, "Finished polling best tip in {}ms", elapsed_ms); + *self.latest_chain_tip.wlck() = Some(tip); }, Ok(_) => {}, Err(e) => { @@ -434,12 +433,13 @@ impl BitcoindChainSource { .await { Ok((unconfirmed_txs, evicted_txids)) => { + let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0); log_trace!( self.logger, "Finished polling mempool of size {} and {} evicted transactions in {}ms", unconfirmed_txs.len(), evicted_txids.len(), - now.elapsed().unwrap().as_millis() + elapsed_ms ); onchain_wallet.apply_mempool_txs(unconfirmed_txs, evicted_txids).unwrap_or_else( |e| { @@ -455,7 +455,7 @@ impl BitcoindChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; @@ -570,7 +570,7 @@ impl BitcoindChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 7b08c3845..d862c7433 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -34,6 +34,7 @@ use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::NodeMetrics; const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5; @@ -76,7 +77,7 @@ impl ElectrumChainSource { } pub(super) fn start(&self, runtime: Arc) -> Result<(), Error> { - self.electrum_runtime_status.write().unwrap().start( + self.electrum_runtime_status.wlck().start( self.server_url.clone(), self.sync_config.clone(), Arc::clone(&runtime), @@ -86,14 +87,14 @@ impl ElectrumChainSource { } pub(super) fn stop(&self) { - self.electrum_runtime_status.write().unwrap().stop(); + self.electrum_runtime_status.wlck().stop(); } pub(crate) async fn sync_onchain_wallet( &self, onchain_wallet: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + let mut status_lock = self.onchain_wallet_sync_status.lck(); status_lock.register_or_subscribe_pending_sync() }; if let Some(mut sync_receiver) = receiver_res { @@ -107,14 +108,14 @@ impl ElectrumChainSource { let res = self.sync_onchain_wallet_inner(onchain_wallet).await; - self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + self.onchain_wallet_sync_status.lck().propagate_result_to_subscribers(res); res } async fn sync_onchain_wallet_inner(&self, onchain_wallet: Arc) -> Result<(), Error> { let electrum_client: Arc = - if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + if let Some(client) = self.electrum_runtime_status.rlck().client().as_ref() { Arc::clone(client) } else { debug_assert!( @@ -126,7 +127,7 @@ impl ElectrumChainSource { // If this is our first sync, do a full scan with the configured gap limit. // Otherwise just do an incremental sync. let incremental_sync = - self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); + self.node_metrics.rlck().latest_onchain_wallet_sync_timestamp.is_some(); let apply_wallet_update = |update_res: Result, now: Instant| match update_res { @@ -141,7 +142,7 @@ impl ElectrumChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics( @@ -184,7 +185,7 @@ impl ElectrumChainSource { output_sweeper: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + let mut status_lock = self.lightning_wallet_sync_status.lck(); status_lock.register_or_subscribe_pending_sync() }; if let Some(mut sync_receiver) = receiver_res { @@ -199,7 +200,7 @@ impl ElectrumChainSource { let res = self.sync_lightning_wallet_inner(channel_manager, chain_monitor, output_sweeper).await; - self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + self.lightning_wallet_sync_status.lck().propagate_result_to_subscribers(res); res } @@ -218,7 +219,7 @@ impl ElectrumChainSource { ]; let electrum_client: Arc = - if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + if let Some(client) = self.electrum_runtime_status.rlck().client().as_ref() { Arc::clone(client) } else { debug_assert!( @@ -234,7 +235,7 @@ impl ElectrumChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } @@ -245,7 +246,7 @@ impl ElectrumChainSource { pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { let electrum_client: Arc = if let Some(client) = - self.electrum_runtime_status.read().unwrap().client().as_ref() + self.electrum_runtime_status.rlck().client().as_ref() { Arc::clone(client) } else { @@ -267,7 +268,7 @@ impl ElectrumChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } @@ -277,7 +278,7 @@ impl ElectrumChainSource { pub(crate) async fn process_broadcast_package(&self, package: Vec) { let electrum_client: Arc = - if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + if let Some(client) = self.electrum_runtime_status.rlck().client().as_ref() { Arc::clone(client) } else { debug_assert!(false, "We should have started the chain source before broadcasting"); @@ -292,10 +293,10 @@ impl ElectrumChainSource { impl Filter for ElectrumChainSource { fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { - self.electrum_runtime_status.write().unwrap().register_tx(txid, script_pubkey) + self.electrum_runtime_status.wlck().register_tx(txid, script_pubkey) } fn register_output(&self, output: lightning::chain::WatchedOutput) { - self.electrum_runtime_status.write().unwrap().register_output(output) + self.electrum_runtime_status.wlck().register_output(output) } } diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index 245db72f6..649f2c8c1 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -25,6 +25,7 @@ use crate::fee_estimator::{ use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_trace, LdkLogger, Logger}; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::{Error, NodeMetrics}; pub(super) struct EsploraChainSource { @@ -45,7 +46,7 @@ impl EsploraChainSource { server_url: String, headers: HashMap, sync_config: EsploraSyncConfig, fee_estimator: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> Self { + ) -> Result { let mut client_builder = esplora_client::Builder::new(&server_url); client_builder = client_builder.timeout(sync_config.timeouts_config.per_request_timeout_secs as u64); @@ -54,13 +55,15 @@ impl EsploraChainSource { client_builder = client_builder.header(header_name, header_value); } - let esplora_client = client_builder.build_async().unwrap(); + let esplora_client = client_builder.build_async().map_err(|e| { + log_error!(logger, "Failed to build Esplora client: {}", e); + })?; let tx_sync = Arc::new(EsploraSyncClient::from_client(esplora_client.clone(), Arc::clone(&logger))); let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); - Self { + Ok(Self { sync_config, esplora_client, onchain_wallet_sync_status, @@ -71,14 +74,14 @@ impl EsploraChainSource { config, logger, node_metrics, - } + }) } pub(super) async fn sync_onchain_wallet( &self, onchain_wallet: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + let mut status_lock = self.onchain_wallet_sync_status.lck(); status_lock.register_or_subscribe_pending_sync() }; if let Some(mut sync_receiver) = receiver_res { @@ -92,7 +95,7 @@ impl EsploraChainSource { let res = self.sync_onchain_wallet_inner(onchain_wallet).await; - self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + self.onchain_wallet_sync_status.lck().propagate_result_to_subscribers(res); res } @@ -101,7 +104,7 @@ impl EsploraChainSource { // If this is our first sync, do a full scan with the configured gap limit. // Otherwise just do an incremental sync. let incremental_sync = - self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); + self.node_metrics.rlck().latest_onchain_wallet_sync_timestamp.is_some(); macro_rules! get_and_apply_wallet_update { ($sync_future: expr) => {{ @@ -121,7 +124,7 @@ impl EsploraChainSource { .ok() .map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics( &*locked_node_metrics, @@ -207,7 +210,7 @@ impl EsploraChainSource { output_sweeper: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + let mut status_lock = self.lightning_wallet_sync_status.lck(); status_lock.register_or_subscribe_pending_sync() }; if let Some(mut sync_receiver) = receiver_res { @@ -222,7 +225,7 @@ impl EsploraChainSource { let res = self.sync_lightning_wallet_inner(channel_manager, chain_monitor, output_sweeper).await; - self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + self.lightning_wallet_sync_status.lck().propagate_result_to_subscribers(res); res } @@ -259,7 +262,7 @@ impl EsploraChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; @@ -344,7 +347,7 @@ impl EsploraChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 49c011a78..3f0843d4c 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -27,6 +27,7 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::util::locks::MutexExt; use crate::{Error, NodeMetrics}; pub(crate) enum WalletSyncStatus { @@ -101,7 +102,7 @@ impl ChainSource { fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> (Self, Option) { + ) -> Result<(Self, Option), ()> { let esplora_chain_source = EsploraChainSource::new( server_url, headers, @@ -111,10 +112,10 @@ impl ChainSource { config, Arc::clone(&logger), node_metrics, - ); + )?; let kind = ChainSourceKind::Esplora(esplora_chain_source); let registered_txids = Mutex::new(Vec::new()); - (Self { kind, registered_txids, tx_broadcaster, logger }, None) + Ok((Self { kind, registered_txids, tx_broadcaster, logger }, None)) } pub(crate) fn new_electrum( @@ -215,7 +216,7 @@ impl ChainSource { } pub(crate) fn registered_txids(&self) -> Vec { - self.registered_txids.lock().unwrap().clone() + self.registered_txids.lck().clone() } pub(crate) fn is_transaction_based(&self) -> bool { @@ -472,7 +473,7 @@ impl ChainSource { impl Filter for ChainSource { fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { - self.registered_txids.lock().unwrap().push(*txid); + self.registered_txids.lck().push(*txid); match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { esplora_chain_source.register_tx(txid, script_pubkey) diff --git a/src/connection.rs b/src/connection.rs index a1d24e36d..799dc056d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -16,6 +16,7 @@ use lightning::ln::msgs::SocketAddress; use crate::config::TorConfig; use crate::logger::{log_debug, log_error, log_info, LdkLogger}; use crate::types::{KeysManager, PeerManager}; +use crate::util::locks::MutexExt; use crate::Error; pub(crate) struct ConnectionManager @@ -238,7 +239,7 @@ where fn register_or_subscribe_pending_connection( &self, node_id: &PublicKey, ) -> Option>> { - let mut pending_connections_lock = self.pending_connections.lock().unwrap(); + let mut pending_connections_lock = self.pending_connections.lck(); match pending_connections_lock.entry(*node_id) { hash_map::Entry::Occupied(mut entry) => { let (tx, rx) = tokio::sync::oneshot::channel(); @@ -254,7 +255,7 @@ where fn propagate_result_to_subscribers(&self, node_id: &PublicKey, res: Result<(), Error>) { // Send the result to any other tasks that might be waiting on it by now. - let mut pending_connections_lock = self.pending_connections.lock().unwrap(); + let mut pending_connections_lock = self.pending_connections.lck(); if let Some(connection_ready_senders) = pending_connections_lock.remove(node_id) { for sender in connection_ready_senders { let _ = sender.send(res).map_err(|e| { diff --git a/src/data_store.rs b/src/data_store.rs index ac5c78fb7..3634571a2 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -14,6 +14,7 @@ use lightning::util::ser::{Readable, Writeable}; use crate::logger::{log_error, LdkLogger}; use crate::types::DynStore; +use crate::util::locks::MutexExt; use crate::Error; pub(crate) trait StorableObject: Clone + Readable + Writeable { @@ -65,7 +66,7 @@ where } pub(crate) fn insert(&self, object: SO) -> Result { - let mut locked_objects = self.objects.lock().unwrap(); + let mut locked_objects = self.objects.lck(); self.persist(&object)?; let updated = locked_objects.insert(object.id(), object).is_some(); @@ -73,7 +74,7 @@ where } pub(crate) fn insert_or_update(&self, object: SO) -> Result { - let mut locked_objects = self.objects.lock().unwrap(); + let mut locked_objects = self.objects.lck(); let updated; match locked_objects.entry(object.id()) { @@ -95,7 +96,7 @@ where } pub(crate) fn remove(&self, id: &SO::Id) -> Result<(), Error> { - let removed = self.objects.lock().unwrap().remove(id).is_some(); + let removed = self.objects.lck().remove(id).is_some(); if removed { let store_key = id.encode_to_hex_str(); KVStoreSync::remove( @@ -121,11 +122,11 @@ where } pub(crate) fn get(&self, id: &SO::Id) -> Option { - self.objects.lock().unwrap().get(id).cloned() + self.objects.lck().get(id).cloned() } pub(crate) fn update(&self, update: SO::Update) -> Result { - let mut locked_objects = self.objects.lock().unwrap(); + let mut locked_objects = self.objects.lck(); if let Some(object) = locked_objects.get_mut(&update.id()) { let updated = object.update(update); @@ -141,7 +142,7 @@ where } pub(crate) fn list_filter bool>(&self, f: F) -> Vec { - self.objects.lock().unwrap().values().filter(f).cloned().collect::>() + self.objects.lck().values().filter(f).cloned().collect::>() } fn persist(&self, object: &SO) -> Result<(), Error> { @@ -169,7 +170,7 @@ where } pub(crate) fn contains_key(&self, id: &SO::Id) -> bool { - self.objects.lock().unwrap().contains_key(id) + self.objects.lck().contains_key(id) } } diff --git a/src/event.rs b/src/event.rs index f06d701bc..65920775b 100644 --- a/src/event.rs +++ b/src/event.rs @@ -56,6 +56,7 @@ use crate::runtime::Runtime; use crate::types::{ CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, }; +use crate::util::locks::MutexExt; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, UserChannelId, @@ -370,21 +371,21 @@ where pub(crate) async fn add_event(&self, event: Event) -> Result<(), Error> { let data = { - let mut locked_queue = self.queue.lock().unwrap(); + let mut locked_queue = self.queue.lck(); locked_queue.push_back(event); EventQueueSerWrapper(&locked_queue).encode() }; self.persist_queue(data).await?; - if let Some(waker) = self.waker.lock().unwrap().take() { + if let Some(waker) = self.waker.lck().take() { waker.wake(); } Ok(()) } pub(crate) fn next_event(&self) -> Option { - let locked_queue = self.queue.lock().unwrap(); + let locked_queue = self.queue.lck(); locked_queue.front().cloned() } @@ -394,14 +395,14 @@ where pub(crate) async fn event_handled(&self) -> Result<(), Error> { let data = { - let mut locked_queue = self.queue.lock().unwrap(); + let mut locked_queue = self.queue.lck(); locked_queue.pop_front(); EventQueueSerWrapper(&locked_queue).encode() }; self.persist_queue(data).await?; - if let Some(waker) = self.waker.lock().unwrap().take() { + if let Some(waker) = self.waker.lck().take() { waker.wake(); } Ok(()) @@ -485,10 +486,10 @@ impl Future for EventFuture { fn poll( self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll { - if let Some(event) = self.event_queue.lock().unwrap().front() { + if let Some(event) = self.event_queue.lck().front() { Poll::Ready(event.clone()) } else { - *self.waker.lock().unwrap() = Some(cx.waker().clone()); + *self.waker.lck() = Some(cx.waker().clone()); Poll::Pending } } @@ -1091,11 +1092,14 @@ where }; self.payment_store.get(&payment_id).map(|payment| { + let amount_msat = payment.amount_msat.expect( + "outbound payments should record their amount before they can succeed", + ); log_info!( self.logger, "Successfully sent payment of {}msat{} from \ payment hash {:?} with preimage {:?}", - payment.amount_msat.unwrap(), + amount_msat, if let Some(fee) = fee_paid_msat { format!(" (fee {} msat)", fee) } else { @@ -1256,7 +1260,9 @@ where } let user_channel_id: u128 = u128::from_ne_bytes( - self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), + self.keys_manager.get_secure_random_bytes()[..16] + .try_into() + .expect("a 16-byte slice should convert into a [u8; 16]"), ); let allow_0conf = self.config.trusted_peers_0conf.contains(&counterparty_node_id); let mut channel_override_config = None; @@ -1446,10 +1452,14 @@ where counterparty_node_id, ); + let former_temporary_channel_id = former_temporary_channel_id.expect( + "LDK Node has only ever persisted ChannelPending events from rust-lightning 0.0.115 or later", + ); + let event = Event::ChannelPending { channel_id, user_channel_id: UserChannelId(user_channel_id), - former_temporary_channel_id: former_temporary_channel_id.unwrap(), + former_temporary_channel_id, counterparty_node_id, funding_txo, }; diff --git a/src/fee_estimator.rs b/src/fee_estimator.rs index b787ecd33..ad73d0d58 100644 --- a/src/fee_estimator.rs +++ b/src/fee_estimator.rs @@ -14,6 +14,8 @@ use lightning::chain::chaininterface::{ FEERATE_FLOOR_SATS_PER_KW, }; +use crate::util::locks::RwLockExt; + #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub(crate) enum ConfirmationTarget { /// The default target for onchain payments. @@ -48,7 +50,7 @@ impl OnchainFeeEstimator { pub(crate) fn set_fee_rate_cache( &self, fee_rate_cache_update: HashMap, ) -> bool { - let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); + let mut locked_fee_rate_cache = self.fee_rate_cache.wlck(); if fee_rate_cache_update != *locked_fee_rate_cache { *locked_fee_rate_cache = fee_rate_cache_update; true @@ -60,7 +62,7 @@ impl OnchainFeeEstimator { impl FeeEstimator for OnchainFeeEstimator { fn estimate_fee_rate(&self, confirmation_target: ConfirmationTarget) -> FeeRate { - let locked_fee_rate_cache = self.fee_rate_cache.read().unwrap(); + let locked_fee_rate_cache = self.fee_rate_cache.rlck(); let fallback_sats_kwu = get_fallback_rate_for_target(confirmation_target); diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 5a1420882..4a0fbfa32 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -917,7 +917,9 @@ uniffi::custom_type!(PaymentHash, String, { } }, lower: |obj| { - Sha256::from_slice(&obj.0).unwrap().to_string() + Sha256::from_slice(&obj.0) + .expect("PaymentHash should always contain exactly 32 bytes") + .to_string() }, }); diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 94e8360fc..c3990a07a 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -21,6 +21,7 @@ use lightning_types::string::PrintableString; use rusqlite::{named_params, Connection}; use crate::io::utils::check_namespace_key_validity; +use crate::util::locks::MutexExt; mod migrations; @@ -288,7 +289,10 @@ impl SqliteStoreInner { })?; let sql = format!("SELECT user_version FROM pragma_user_version"); - let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap(); + let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).map_err(|e| { + let msg = format!("Failed to read PRAGMA user_version: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; if version_res == 0 { // New database, set our SCHEMA_USER_VERSION and continue @@ -364,7 +368,7 @@ impl SqliteStoreInner { } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { - let mut outer_lock = self.write_version_locks.lock().unwrap(); + let mut outer_lock = self.write_version_locks.lck(); Arc::clone(&outer_lock.entry(locking_key).or_default()) } @@ -373,7 +377,7 @@ impl SqliteStoreInner { ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); let sql = format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name); @@ -423,7 +427,7 @@ impl SqliteStoreInner { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; self.execute_locked_write(inner_lock_ref, locking_key, version, || { - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); let sort_order = self.next_sort_order.fetch_add(1, Ordering::Relaxed); @@ -467,7 +471,7 @@ impl SqliteStoreInner { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; self.execute_locked_write(inner_lock_ref, locking_key, version, || { - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name); @@ -500,7 +504,7 @@ impl SqliteStoreInner { ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); let sql = format!( "SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace", @@ -546,7 +550,7 @@ impl SqliteStoreInner { "list_paginated", )?; - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); // Fetch one extra row beyond PAGE_SIZE to determine whether a next page exists. let fetch_limit = (PAGE_SIZE + 1) as i64; @@ -644,7 +648,7 @@ impl SqliteStoreInner { &self, inner_lock_ref: Arc>, locking_key: String, version: u64, callback: F, ) -> Result<(), lightning::io::Error> { let res = { - let mut last_written_version = inner_lock_ref.lock().unwrap(); + let mut last_written_version = inner_lock_ref.lck(); // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual // consistency. @@ -670,7 +674,7 @@ impl SqliteStoreInner { // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already // counted. - let mut outer_lock = self.write_version_locks.lock().unwrap(); + let mut outer_lock = self.write_version_locks.lck(); let strong_count = Arc::strong_count(&inner_lock_ref); debug_assert!(strong_count >= 2, "Unexpected SqliteStore strong count"); diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 2f7a689b2..527cf10bc 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -45,6 +45,7 @@ use vss_client::util::storable_builder::{EntropySource, StorableBuilder}; use crate::entropy::NodeEntropy; use crate::io::utils::check_namespace_key_validity; use crate::lnurl_auth::LNURL_AUTH_HARDENED_CHILD_INDEX; +use crate::util::locks::MutexExt; type CustomRetryPolicy = FilteredRetryPolicy< JitteredRetryPolicy< @@ -110,7 +111,9 @@ impl VssStore { .worker_threads(INTERNAL_RUNTIME_WORKERS) .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) .build() - .unwrap(); + .map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("Failed to build VSS runtime: {}", e)) + })?; let (data_encryption_key, obfuscation_master_key) = derive_data_encryption_and_obfuscation_keys(&vss_seed); @@ -419,7 +422,7 @@ impl VssStoreInner { } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { - let mut outer_lock = self.locks.lock().unwrap(); + let mut outer_lock = self.locks.lck(); Arc::clone(&outer_lock.entry(locking_key).or_default()) } @@ -526,7 +529,10 @@ impl VssStoreInner { // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] - let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { + let storable = Storable::decode( + &resp.value.expect("successful VSS reads should include a value payload").value[..], + ) + .map_err(|e| { let msg = format!( "Failed to decode data read from key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -672,7 +678,7 @@ impl VssStoreInner { // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already // counted. - let mut outer_lock = self.locks.lock().unwrap(); + let mut outer_lock = self.locks.lck(); let strong_count = Arc::strong_count(&inner_lock_ref); debug_assert!(strong_count >= 2, "Unexpected VssStore strong count"); @@ -739,7 +745,10 @@ async fn determine_and_write_schema_version( // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] - let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { + let storable = Storable::decode( + &resp.value.expect("successful VSS reads should include a value payload").value[..], + ) + .map_err(|e| { let msg = format!("Failed to decode schema version: {}", e); Error::new(ErrorKind::Other, msg) })?; diff --git a/src/lib.rs b/src/lib.rs index 2ac4697e8..c13a9bdd3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ mod runtime; mod scoring; mod tx_broadcaster; mod types; +mod util; mod wallet; use std::default::Default; @@ -180,6 +181,7 @@ pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStor pub use vss_client; use crate::scoring::setup_background_pathfinding_scores_sync; +use crate::util::locks::RwLockExt; use crate::wallet::FundingAmount; #[cfg(feature = "uniffi")] @@ -253,7 +255,7 @@ impl Node { /// a thread-safe manner. pub fn start(&self) -> Result<(), Error> { // Acquire a run lock and hold it until we're setup. - let mut is_running_lock = self.is_running.write().unwrap(); + let mut is_running_lock = self.is_running.wlck(); if *is_running_lock { return Err(Error::AlreadyRunning); } @@ -321,7 +323,7 @@ impl Node { now.elapsed().as_millis() ); { - let mut locked_node_metrics = gossip_node_metrics.write().unwrap(); + let mut locked_node_metrics = gossip_node_metrics.wlck(); locked_node_metrics.latest_rgs_snapshot_timestamp = Some(updated_timestamp); write_node_metrics(&*locked_node_metrics, &*gossip_sync_store, Arc::clone(&gossip_sync_logger)) .unwrap_or_else(|e| { @@ -419,13 +421,27 @@ impl Node { break; } res = listener.accept() => { - let tcp_stream = res.unwrap().0; + let tcp_stream = match res { + Ok((tcp_stream, _)) => tcp_stream, + Err(e) => { + log_error!(logger, "Failed to accept inbound connection: {}", e); + continue; + }, + }; let peer_mgr = Arc::clone(&peer_mgr); + let logger = Arc::clone(&logger); runtime.spawn_cancellable_background_task(async move { + let tcp_stream = match tcp_stream.into_std() { + Ok(tcp_stream) => tcp_stream, + Err(e) => { + log_error!(logger, "Failed to convert inbound connection: {}", e); + return; + }, + }; lightning_net_tokio::setup_inbound( Arc::clone(&peer_mgr), - tcp_stream.into_std().unwrap(), - ) + tcp_stream, + ) .await; }); } @@ -497,7 +513,7 @@ impl Node { return; } _ = interval.tick() => { - let skip_broadcast = match bcast_node_metrics.read().unwrap().latest_node_announcement_broadcast_timestamp { + let skip_broadcast = match bcast_node_metrics.rlck().latest_node_announcement_broadcast_timestamp { Some(latest_bcast_time_secs) => { // Skip if the time hasn't elapsed yet. let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL; @@ -538,7 +554,7 @@ impl Node { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = bcast_node_metrics.write().unwrap(); + let mut locked_node_metrics = bcast_node_metrics.wlck(); locked_node_metrics.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*bcast_store, Arc::clone(&bcast_logger)) .unwrap_or_else(|e| { @@ -645,7 +661,13 @@ impl Node { Some(background_scorer), sleeper, true, - || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()), + || { + Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("current time should not be earlier than the Unix epoch"), + ) + }, ) .await .unwrap_or_else(|e| { @@ -683,7 +705,7 @@ impl Node { /// /// After this returns most API methods will return [`Error::NotRunning`]. pub fn stop(&self) -> Result<(), Error> { - let mut is_running_lock = self.is_running.write().unwrap(); + let mut is_running_lock = self.is_running.wlck(); if !*is_running_lock { return Err(Error::NotRunning); } @@ -747,9 +769,9 @@ impl Node { /// Returns the status of the [`Node`]. pub fn status(&self) -> NodeStatus { - let is_running = *self.is_running.read().unwrap(); + let is_running = *self.is_running.rlck(); let current_best_block = self.channel_manager.current_best_block().into(); - let locked_node_metrics = self.node_metrics.read().unwrap(); + let locked_node_metrics = self.node_metrics.rlck(); let latest_lightning_wallet_sync_timestamp = locked_node_metrics.latest_lightning_wallet_sync_timestamp; let latest_onchain_wallet_sync_timestamp = @@ -1078,7 +1100,7 @@ impl Node { pub fn connect( &self, node_id: PublicKey, address: SocketAddress, persist: bool, ) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -1108,7 +1130,7 @@ impl Node { /// Will also remove the peer from the peer store, i.e., after this has been called we won't /// try to reconnect on restart. pub fn disconnect(&self, counterparty_node_id: PublicKey) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -1130,7 +1152,7 @@ impl Node { push_to_counterparty_msat: Option, channel_config: Option, announce_for_forwarding: bool, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -1193,7 +1215,9 @@ impl Node { let push_msat = push_to_counterparty_msat.unwrap_or(0); let user_channel_id: u128 = u128::from_ne_bytes( - self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), + self.keys_manager.get_secure_random_bytes()[..16] + .try_into() + .expect("a 16-byte slice should convert into a [u8; 16]"), ); match self.channel_manager.create_channel( @@ -1641,7 +1665,7 @@ impl Node { /// /// [`EsploraSyncConfig::background_sync_config`]: crate::config::EsploraSyncConfig::background_sync_config pub fn sync_wallets(&self) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } diff --git a/src/liquidity.rs b/src/liquidity.rs index 485da941c..b9c32b315 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -45,6 +45,7 @@ use crate::runtime::Runtime; use crate::types::{ Broadcaster, ChannelManager, DynStore, KeysManager, LiquidityManager, PeerManager, Wallet, }; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::{total_anchor_channels_reserve_sats, Config, Error}; const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; @@ -302,7 +303,7 @@ where L::Target: LdkLogger, { pub(crate) fn set_peer_manager(&self, peer_manager: Weak) { - *self.peer_manager.write().unwrap() = Some(peer_manager); + *self.peer_manager.wlck() = Some(peer_manager); } pub(crate) fn liquidity_manager(&self) -> Arc { @@ -404,11 +405,8 @@ where return; } - if let Some(sender) = lsps1_client - .pending_opening_params_requests - .lock() - .unwrap() - .remove(&request_id) + if let Some(sender) = + lsps1_client.pending_opening_params_requests.lck().remove(&request_id) { let response = LSPS1OpeningParamsResponse { supported_options }; @@ -460,11 +458,8 @@ where return; } - if let Some(sender) = lsps1_client - .pending_create_order_requests - .lock() - .unwrap() - .remove(&request_id) + if let Some(sender) = + lsps1_client.pending_create_order_requests.lck().remove(&request_id) { let response = LSPS1OrderStatus { order_id, @@ -518,11 +513,8 @@ where return; } - if let Some(sender) = lsps1_client - .pending_check_order_status_requests - .lock() - .unwrap() - .remove(&request_id) + if let Some(sender) = + lsps1_client.pending_check_order_status_requests.lck().remove(&request_id) { let response = LSPS1OrderStatus { order_id, @@ -642,7 +634,9 @@ where }; let user_channel_id: u128 = u128::from_ne_bytes( - self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), + self.keys_manager.get_secure_random_bytes()[..16] + .try_into() + .expect("a 16-byte slice should convert into a [u8; 16]"), ); let intercept_scid = self.channel_manager.get_intercept_scid(); @@ -717,7 +711,7 @@ where }; let init_features = if let Some(Some(peer_manager)) = - self.peer_manager.read().unwrap().as_ref().map(|weak| weak.upgrade()) + self.peer_manager.rlck().as_ref().map(|weak| weak.upgrade()) { // Fail if we're not connected to the prospective channel partner. if let Some(peer) = peer_manager.peer_by_node_id(&their_network_key) { @@ -828,7 +822,7 @@ where } if let Some(sender) = - lsps2_client.pending_fee_requests.lock().unwrap().remove(&request_id) + lsps2_client.pending_fee_requests.lck().remove(&request_id) { let response = LSPS2FeeResponse { opening_fee_params_menu }; @@ -880,7 +874,7 @@ where } if let Some(sender) = - lsps2_client.pending_buy_requests.lock().unwrap().remove(&request_id) + lsps2_client.pending_buy_requests.lck().remove(&request_id) { let response = LSPS2BuyResponse { intercept_scid, cltv_expiry_delta }; @@ -930,7 +924,7 @@ where let (request_sender, request_receiver) = oneshot::channel(); { let mut pending_opening_params_requests_lock = - lsps1_client.pending_opening_params_requests.lock().unwrap(); + lsps1_client.pending_opening_params_requests.lck(); let request_id = client_handler.request_supported_options(lsps1_client.lsp_node_id); pending_opening_params_requests_lock.insert(request_id, request_sender); } @@ -1013,7 +1007,7 @@ where let request_id; { let mut pending_create_order_requests_lock = - lsps1_client.pending_create_order_requests.lock().unwrap(); + lsps1_client.pending_create_order_requests.lck(); request_id = client_handler.create_order( &lsps1_client.lsp_node_id, order_params.clone(), @@ -1059,7 +1053,7 @@ where let (request_sender, request_receiver) = oneshot::channel(); { let mut pending_check_order_status_requests_lock = - lsps1_client.pending_check_order_status_requests.lock().unwrap(); + lsps1_client.pending_check_order_status_requests.lck(); let request_id = client_handler.check_order_status(&lsps1_client.lsp_node_id, order_id); pending_check_order_status_requests_lock.insert(request_id, request_sender); } @@ -1200,7 +1194,7 @@ where let (fee_request_sender, fee_request_receiver) = oneshot::channel(); { - let mut pending_fee_requests_lock = lsps2_client.pending_fee_requests.lock().unwrap(); + let mut pending_fee_requests_lock = lsps2_client.pending_fee_requests.lck(); let request_id = client_handler .request_opening_params(lsps2_client.lsp_node_id, lsps2_client.token.clone()); pending_fee_requests_lock.insert(request_id, fee_request_sender); @@ -1233,7 +1227,7 @@ where let (buy_request_sender, buy_request_receiver) = oneshot::channel(); { - let mut pending_buy_requests_lock = lsps2_client.pending_buy_requests.lock().unwrap(); + let mut pending_buy_requests_lock = lsps2_client.pending_buy_requests.lck(); let request_id = client_handler .select_opening_params(lsps2_client.lsp_node_id, amount_msat, opening_fee_params) .map_err(|e| { diff --git a/src/lnurl_auth.rs b/src/lnurl_auth.rs index 1a0def47c..0fef8dd72 100644 --- a/src/lnurl_auth.rs +++ b/src/lnurl_auth.rs @@ -182,7 +182,9 @@ fn linking_key_path(hashing_key: &[u8; 32], domain_name: &str) -> Vec>>, } @@ -17,7 +19,7 @@ impl OnionMessageMailbox { } pub(crate) fn onion_message_intercepted(&self, peer_node_id: PublicKey, message: OnionMessage) { - let mut map = self.map.lock().unwrap(); + let mut map = self.map.lck(); let queue = map.entry(peer_node_id).or_insert_with(VecDeque::new); if queue.len() >= Self::MAX_MESSAGES_PER_PEER { @@ -27,8 +29,11 @@ impl OnionMessageMailbox { // Enforce a peers limit. If exceeded, evict the peer with the longest queue. if map.len() > Self::MAX_PEERS { - let peer_to_remove = - map.iter().max_by_key(|(_, queue)| queue.len()).map(|(peer, _)| *peer).unwrap(); + let peer_to_remove = map + .iter() + .max_by_key(|(_, queue)| queue.len()) + .map(|(peer, _)| *peer) + .expect("a peer must exist when the mailbox exceeds its peer limit"); map.remove(&peer_to_remove); } @@ -37,7 +42,7 @@ impl OnionMessageMailbox { pub(crate) fn onion_message_peer_connected( &self, peer_node_id: PublicKey, ) -> Vec { - let mut map = self.map.lock().unwrap(); + let mut map = self.map.lck(); if let Some(queue) = map.remove(&peer_node_id) { queue.into() @@ -48,7 +53,7 @@ impl OnionMessageMailbox { #[cfg(test)] pub(crate) fn is_empty(&self) -> bool { - let map = self.map.lock().unwrap(); + let map = self.map.lck(); map.is_empty() } } diff --git a/src/payment/asynchronous/static_invoice_store.rs b/src/payment/asynchronous/static_invoice_store.rs index cd0e2ebd2..0e9753db8 100644 --- a/src/payment/asynchronous/static_invoice_store.rs +++ b/src/payment/asynchronous/static_invoice_store.rs @@ -22,6 +22,7 @@ use crate::hex_utils; use crate::io::STATIC_INVOICE_STORE_PRIMARY_NAMESPACE; use crate::payment::asynchronous::rate_limiter::RateLimiter; use crate::types::DynStore; +use crate::util::locks::MutexExt; struct PersistedStaticInvoice { invoice: StaticInvoice, @@ -63,7 +64,7 @@ impl StaticInvoiceStore { fn check_rate_limit( limiter: &Mutex, recipient_id: &[u8], ) -> Result<(), lightning::io::Error> { - let mut limiter = limiter.lock().unwrap(); + let mut limiter = limiter.lck(); if !limiter.allow(recipient_id) { Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, "Rate limit exceeded")) } else { diff --git a/src/payment/bolt11.rs b/src/payment/bolt11.rs index f2857e814..7c79685eb 100644 --- a/src/payment/bolt11.rs +++ b/src/payment/bolt11.rs @@ -37,6 +37,7 @@ use crate::payment::store::{ use crate::peer_store::{PeerInfo, PeerStore}; use crate::runtime::Runtime; use crate::types::{ChannelManager, PaymentStore}; +use crate::util::locks::RwLockExt; #[cfg(not(feature = "uniffi"))] type Bolt11Invoice = LdkBolt11Invoice; @@ -241,7 +242,7 @@ impl Bolt11Payment { pub fn send( &self, invoice: &Bolt11Invoice, route_parameters: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -275,7 +276,9 @@ impl Bolt11Payment { ) { Ok(()) => { let payee_pubkey = invoice.recover_payee_pub_key(); - let amt_msat = invoice.amount_milli_satoshis().unwrap(); + let amt_msat = invoice + .amount_milli_satoshis() + .expect("zero-amount invoices should be rejected before initiating payment"); log_info!(self.logger, "Initiated sending {}msat to {}", amt_msat, payee_pubkey); let kind = PaymentKind::Bolt11 { @@ -342,7 +345,7 @@ impl Bolt11Payment { &self, invoice: &Bolt11Invoice, amount_msat: u64, route_parameters: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -776,7 +779,7 @@ impl Bolt11Payment { pub fn send_probes( &self, invoice: &Bolt11Invoice, route_parameters: Option, ) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -831,7 +834,7 @@ impl Bolt11Payment { &self, invoice: &Bolt11Invoice, amount_msat: u64, route_parameters: Option, ) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } diff --git a/src/payment/bolt12.rs b/src/payment/bolt12.rs index 980e20696..677abed83 100644 --- a/src/payment/bolt12.rs +++ b/src/payment/bolt12.rs @@ -30,6 +30,7 @@ use crate::ffi::{maybe_deref, maybe_wrap}; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::types::{ChannelManager, KeysManager, PaymentStore}; +use crate::util::locks::RwLockExt; #[cfg(not(feature = "uniffi"))] type Bolt12Invoice = lightning::offers::invoice::Bolt12Invoice; @@ -89,7 +90,7 @@ impl Bolt12Payment { &self, offer: &Offer, amount_msat: u64, quantity: Option, payer_note: Option, route_parameters: Option, hrn: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -207,7 +208,7 @@ impl Bolt12Payment { if let Some(expiry_secs) = expiry_secs { let absolute_expiry = (SystemTime::now() + Duration::from_secs(expiry_secs as u64)) .duration_since(UNIX_EPOCH) - .unwrap(); + .expect("a future expiry should not be earlier than the Unix epoch"); offer_builder = offer_builder.absolute_expiry(absolute_expiry); } @@ -219,7 +220,10 @@ impl Bolt12Payment { log_error!(self.logger, "Failed to create offer: quantity can't be zero."); return Err(Error::InvalidQuantity); } else { - offer = offer.supported_quantity(Quantity::Bounded(NonZeroU64::new(qty).unwrap())) + offer = offer.supported_quantity(Quantity::Bounded( + NonZeroU64::new(qty) + .expect("qty == 0 was rejected before constructing NonZeroU64"), + )) }; }; @@ -262,7 +266,7 @@ impl Bolt12Payment { &self, offer: &Offer, quantity: Option, payer_note: Option, route_parameters: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -405,7 +409,7 @@ impl Bolt12Payment { if let Some(expiry_secs) = expiry_secs { let absolute_expiry = (SystemTime::now() + Duration::from_secs(expiry_secs as u64)) .duration_since(UNIX_EPOCH) - .unwrap(); + .expect("a future expiry should not be earlier than the Unix epoch"); offer_builder = offer_builder.absolute_expiry(absolute_expiry); } @@ -425,7 +429,7 @@ impl Bolt12Payment { /// [`Refund`]: lightning::offers::refund::Refund /// [`Bolt12Invoice`]: lightning::offers::invoice::Bolt12Invoice pub fn request_refund_payment(&self, refund: &Refund) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -474,7 +478,7 @@ impl Bolt12Payment { let absolute_expiry = (SystemTime::now() + Duration::from_secs(expiry_secs as u64)) .duration_since(UNIX_EPOCH) - .unwrap(); + .expect("a future expiry should not be earlier than the Unix epoch"); let retry_strategy = Retry::Timeout(LDK_PAYMENT_RETRY_TIMEOUT); let route_parameters = route_parameters.or(self.config.route_parameters).unwrap_or_default(); diff --git a/src/payment/onchain.rs b/src/payment/onchain.rs index cc16690e2..e711353ec 100644 --- a/src/payment/onchain.rs +++ b/src/payment/onchain.rs @@ -16,6 +16,7 @@ use crate::config::Config; use crate::error::Error; use crate::logger::{log_info, LdkLogger, Logger}; use crate::types::{ChannelManager, Wallet}; +use crate::util::locks::RwLockExt; use crate::wallet::OnchainSendAmount; #[cfg(not(feature = "uniffi"))] @@ -80,7 +81,7 @@ impl OnchainPayment { pub fn send_to_address( &self, address: &bitcoin::Address, amount_sats: u64, fee_rate: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -110,7 +111,7 @@ impl OnchainPayment { pub fn send_all_to_address( &self, address: &bitcoin::Address, retain_reserves: bool, fee_rate: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } diff --git a/src/payment/spontaneous.rs b/src/payment/spontaneous.rs index 74fa84c0e..0f3f41184 100644 --- a/src/payment/spontaneous.rs +++ b/src/payment/spontaneous.rs @@ -23,6 +23,7 @@ use crate::error::Error; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::types::{ChannelManager, CustomTlvRecord, KeysManager, PaymentStore}; +use crate::util::locks::RwLockExt; // The default `final_cltv_expiry_delta` we apply when not set. const LDK_DEFAULT_FINAL_CLTV_EXPIRY_DELTA: u32 = 144; @@ -56,7 +57,7 @@ impl SpontaneousPayment { route_parameters: Option, custom_tlvs: Option>, preimage: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -206,7 +207,7 @@ impl SpontaneousPayment { /// /// [`Bolt11Payment::send_probes`]: crate::payment::Bolt11Payment pub fn send_probes(&self, amount_msat: u64, node_id: PublicKey) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } diff --git a/src/peer_store.rs b/src/peer_store.rs index ce8a9810e..5b83f25ab 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -20,6 +20,7 @@ use crate::io::{ }; use crate::logger::{log_error, LdkLogger}; use crate::types::DynStore; +use crate::util::locks::RwLockExt; use crate::{Error, SocketAddress}; pub struct PeerStore @@ -41,7 +42,7 @@ where } pub(crate) fn add_peer(&self, peer_info: PeerInfo) -> Result<(), Error> { - let mut locked_peers = self.peers.write().unwrap(); + let mut locked_peers = self.peers.wlck(); if locked_peers.contains_key(&peer_info.node_id) { return Ok(()); @@ -52,18 +53,18 @@ where } pub(crate) fn remove_peer(&self, node_id: &PublicKey) -> Result<(), Error> { - let mut locked_peers = self.peers.write().unwrap(); + let mut locked_peers = self.peers.wlck(); locked_peers.remove(node_id); self.persist_peers(&*locked_peers) } pub(crate) fn list_peers(&self) -> Vec { - self.peers.read().unwrap().values().cloned().collect() + self.peers.rlck().values().cloned().collect() } pub(crate) fn get_peer(&self, node_id: &PublicKey) -> Option { - self.peers.read().unwrap().get(node_id).cloned() + self.peers.rlck().get(node_id).cloned() } fn persist_peers(&self, locked_peers: &HashMap) -> Result<(), Error> { diff --git a/src/runtime.rs b/src/runtime.rs index 39a34ddfe..ddfb38cfd 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -16,6 +16,7 @@ use crate::config::{ BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, }; use crate::logger::{log_debug, log_error, log_trace, LdkLogger, Logger}; +use crate::util::locks::MutexExt; pub(crate) struct Runtime { mode: RuntimeMode, @@ -66,7 +67,7 @@ impl Runtime { where F: Future + Send + 'static, { - let mut background_tasks = self.background_tasks.lock().unwrap(); + let mut background_tasks = self.background_tasks.lck(); let runtime_handle = self.handle(); // Since it seems to make a difference to `tokio` (see // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures @@ -78,7 +79,7 @@ impl Runtime { where F: Future + Send + 'static, { - let mut cancellable_background_tasks = self.cancellable_background_tasks.lock().unwrap(); + let mut cancellable_background_tasks = self.cancellable_background_tasks.lck(); let runtime_handle = self.handle(); // Since it seems to make a difference to `tokio` (see // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures @@ -90,7 +91,7 @@ impl Runtime { where F: Future + Send + 'static, { - let mut background_processor_task = self.background_processor_task.lock().unwrap(); + let mut background_processor_task = self.background_processor_task.lck(); debug_assert!(background_processor_task.is_none(), "Expected no background processor_task"); let runtime_handle = self.handle(); @@ -121,14 +122,14 @@ impl Runtime { } pub fn abort_cancellable_background_tasks(&self) { - let mut tasks = core::mem::take(&mut *self.cancellable_background_tasks.lock().unwrap()); + let mut tasks = core::mem::take(&mut *self.cancellable_background_tasks.lck()); debug_assert!(tasks.len() > 0, "Expected some cancellable background_tasks"); tasks.abort_all(); self.block_on(async { while let Some(_) = tasks.join_next().await {} }) } pub fn wait_on_background_tasks(&self) { - let mut tasks = core::mem::take(&mut *self.background_tasks.lock().unwrap()); + let mut tasks = core::mem::take(&mut *self.background_tasks.lck()); debug_assert!(tasks.len() > 0, "Expected some background_tasks"); self.block_on(async { loop { @@ -160,9 +161,7 @@ impl Runtime { } pub fn wait_on_background_processor_task(&self) { - if let Some(background_processor_task) = - self.background_processor_task.lock().unwrap().take() - { + if let Some(background_processor_task) = self.background_processor_task.lck().take() { let abort_handle = background_processor_task.abort_handle(); // Since it seems to make a difference to `tokio` (see // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures diff --git a/src/scoring.rs b/src/scoring.rs index 3ed7b9d1e..89c314558 100644 --- a/src/scoring.rs +++ b/src/scoring.rs @@ -13,6 +13,7 @@ use crate::io::utils::write_external_pathfinding_scores_to_cache; use crate::logger::LdkLogger; use crate::runtime::Runtime; use crate::types::DynStore; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::{write_node_metrics, Logger, NodeMetrics, Scorer}; /// Start a background task that periodically downloads scores via an external url and merges them into the local @@ -82,10 +83,11 @@ async fn sync_external_scores( log_error!(logger, "Failed to persist external scores to cache: {}", e); } - let duration_since_epoch = - SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); - scorer.lock().unwrap().merge(liquidities, duration_since_epoch); - let mut locked_node_metrics = node_metrics.write().unwrap(); + let duration_since_epoch = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("current time should not be earlier than the Unix epoch"); + scorer.lck().merge(liquidities, duration_since_epoch); + let mut locked_node_metrics = node_metrics.wlck(); locked_node_metrics.latest_pathfinding_scores_sync_timestamp = Some(duration_since_epoch.as_secs()); write_node_metrics(&*locked_node_metrics, &*kv_store, logger).unwrap_or_else(|e| { diff --git a/src/types.rs b/src/types.rs index dae315ae0..d90b978d2 100644 --- a/src/types.rs +++ b/src/types.rs @@ -573,9 +573,9 @@ impl From for ChannelDetails { channel_value_sats: value.channel_value_satoshis, unspendable_punishment_reserve: value.unspendable_punishment_reserve, user_channel_id: UserChannelId(value.user_channel_id), - // unwrap safety: This value will be `None` for objects serialized with LDK versions - // prior to 0.0.115. - feerate_sat_per_1000_weight: value.feerate_sat_per_1000_weight.unwrap(), + feerate_sat_per_1000_weight: value + .feerate_sat_per_1000_weight + .expect("ldk-node v0.1.0 already required rust-lightning 0.0.115 feerate details"), outbound_capacity_msat: value.outbound_capacity_msat, inbound_capacity_msat: value.inbound_capacity_msat, confirmations_required: value.confirmations_required, @@ -608,11 +608,14 @@ impl From for ChannelDetails { next_outbound_htlc_limit_msat: value.next_outbound_htlc_limit_msat, next_outbound_htlc_minimum_msat: value.next_outbound_htlc_minimum_msat, force_close_spend_delay: value.force_close_spend_delay, - // unwrap safety: This field is only `None` for objects serialized prior to LDK 0.0.107 - inbound_htlc_minimum_msat: value.inbound_htlc_minimum_msat.unwrap_or(0), + inbound_htlc_minimum_msat: value.inbound_htlc_minimum_msat.expect( + "ldk-node v0.1.0 already required rust-lightning 0.0.115 inbound HTLC minimums", + ), inbound_htlc_maximum_msat: value.inbound_htlc_maximum_msat, - // unwrap safety: `config` is only `None` for LDK objects serialized prior to 0.0.109. - config: value.config.map(|c| c.into()).unwrap(), + config: value + .config + .map(|c| c.into()) + .expect("ldk-node v0.1.0 already required rust-lightning 0.0.115 channel config"), } } } diff --git a/src/util/locks.rs b/src/util/locks.rs new file mode 100644 index 000000000..4afe2d046 --- /dev/null +++ b/src/util/locks.rs @@ -0,0 +1,33 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +pub(crate) trait MutexExt { + fn lck(&self) -> MutexGuard<'_, T>; +} + +impl MutexExt for Mutex { + fn lck(&self) -> MutexGuard<'_, T> { + self.lock().expect("mutex poisoning indicates a broken internal invariant") + } +} + +pub(crate) trait RwLockExt { + fn rlck(&self) -> RwLockReadGuard<'_, T>; + fn wlck(&self) -> RwLockWriteGuard<'_, T>; +} + +impl RwLockExt for RwLock { + fn rlck(&self) -> RwLockReadGuard<'_, T> { + self.read().expect("rwlock poisoning indicates a broken internal invariant") + } + + fn wlck(&self) -> RwLockWriteGuard<'_, T> { + self.write().expect("rwlock poisoning indicates a broken internal invariant") + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 000000000..3cde8e381 --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,8 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +pub(crate) mod locks; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 0e80a46db..fbaac8a3b 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -60,6 +60,7 @@ use crate::payment::{ PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, }; use crate::types::{Broadcaster, PaymentStore, PendingPaymentStore}; +use crate::util::locks::MutexExt; use crate::{ChainSource, Error}; pub(crate) enum OnchainSendAmount { @@ -115,21 +116,20 @@ impl Wallet { } pub(crate) fn get_full_scan_request(&self) -> FullScanRequest { - self.inner.lock().unwrap().start_full_scan().build() + self.inner.lck().start_full_scan().build() } pub(crate) fn get_incremental_sync_request(&self) -> SyncRequest<(KeychainKind, u32)> { - self.inner.lock().unwrap().start_sync_with_revealed_spks().build() + self.inner.lck().start_sync_with_revealed_spks().build() } pub(crate) fn get_cached_txs(&self) -> Vec> { - self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect() + self.inner.lck().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect() } pub(crate) fn get_unconfirmed_txids(&self) -> Vec { self.inner - .lock() - .unwrap() + .lck() .transactions() .filter(|t| t.chain_position.is_unconfirmed()) .map(|t| t.tx_node.txid) @@ -137,12 +137,12 @@ impl Wallet { } pub(crate) fn current_best_block(&self) -> BestBlock { - let checkpoint = self.inner.lock().unwrap().latest_checkpoint(); + let checkpoint = self.inner.lck().latest_checkpoint(); BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() } } pub(crate) fn apply_update(&self, update: impl Into) -> Result<(), Error> { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); match locked_wallet.apply_update_events(update) { Ok(events) => { self.update_payment_store(&mut *locked_wallet, events).map_err(|e| { @@ -150,7 +150,7 @@ impl Wallet { Error::PersistenceFailed })?; - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed @@ -172,7 +172,7 @@ impl Wallet { return Ok(()); } - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); let chain_tip1 = locked_wallet.latest_checkpoint().block_id(); let wallet_txs1 = locked_wallet @@ -203,7 +203,7 @@ impl Wallet { Error::PersistenceFailed })?; - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed @@ -426,7 +426,7 @@ impl Wallet { ) -> Result { let fee_rate = self.fee_estimator.estimate_fee_rate(confirmation_target); - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); let mut tx_builder = locked_wallet.build_tx(); tx_builder.add_recipient(output_script, amount).fee_rate(fee_rate).nlocktime(locktime); @@ -454,7 +454,7 @@ impl Wallet { }, } - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed @@ -469,8 +469,8 @@ impl Wallet { } pub(crate) fn get_new_address(&self) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); let address_info = locked_wallet.reveal_next_address(KeychainKind::External); locked_wallet.persist(&mut locked_persister).map_err(|e| { @@ -481,8 +481,8 @@ impl Wallet { } pub(crate) fn get_new_internal_address(&self) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); let address_info = locked_wallet.next_unused_address(KeychainKind::Internal); locked_wallet.persist(&mut locked_persister).map_err(|e| { @@ -493,8 +493,8 @@ impl Wallet { } pub(crate) fn cancel_tx(&self, tx: &Transaction) -> Result<(), Error> { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); locked_wallet.cancel_tx(tx); locked_wallet.persist(&mut locked_persister).map_err(|e| { @@ -508,7 +508,7 @@ impl Wallet { pub(crate) fn get_balances( &self, total_anchor_channels_reserve_sats: u64, ) -> Result<(u64, u64), Error> { - let balance = self.inner.lock().unwrap().balance(); + let balance = self.inner.lck().balance(); // Make sure `list_confirmed_utxos` returns at least one `Utxo` we could use to spend/bump // Anchors if we have any confirmed amounts. @@ -644,7 +644,7 @@ impl Wallet { pub(crate) fn get_max_funding_amount( &self, cur_anchor_reserve_sats: u64, fee_rate: FeeRate, ) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); // Use a dummy P2WSH script (34 bytes) to match the size of a real funding output. let dummy_p2wsh_script = ScriptBuf::new().to_p2wsh(); @@ -668,7 +668,7 @@ impl Wallet { &self, shared_input: Input, shared_output_script: ScriptBuf, cur_anchor_reserve_sats: u64, fee_rate: FeeRate, ) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); debug_assert!(matches!( locked_wallet.public_descriptor(KeychainKind::External), @@ -712,7 +712,7 @@ impl Wallet { fee_rate.unwrap_or_else(|| self.fee_estimator.estimate_fee_rate(confirmation_target)); let tx = { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); // Prepare the tx_builder. We properly check the reserve requirements (again) further down. let tx_builder = match send_amount { @@ -834,7 +834,7 @@ impl Wallet { }, } - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed @@ -888,8 +888,8 @@ impl Wallet { pub(crate) fn select_confirmed_utxos( &self, must_spend: Vec, must_pay_to: &[TxOut], fee_rate: FeeRate, ) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); debug_assert!(matches!( locked_wallet.public_descriptor(KeychainKind::External), @@ -964,7 +964,7 @@ impl Wallet { } fn list_confirmed_utxos_inner(&self) -> Result, ()> { - let locked_wallet = self.inner.lock().unwrap(); + let locked_wallet = self.inner.lck(); let mut utxos = Vec::new(); let confirmed_txs: Vec = locked_wallet .transactions() @@ -1058,8 +1058,8 @@ impl Wallet { #[allow(deprecated)] fn get_change_script_inner(&self) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); let address_info = locked_wallet.next_unused_address(KeychainKind::Internal); locked_wallet.persist(&mut locked_persister).map_err(|e| { @@ -1071,7 +1071,7 @@ impl Wallet { #[allow(deprecated)] pub(crate) fn sign_owned_inputs(&self, unsigned_tx: Transaction) -> Result { - let locked_wallet = self.inner.lock().unwrap(); + let locked_wallet = self.inner.lck(); let mut psbt = Psbt::from_unsigned_tx(unsigned_tx).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT: {}", e); @@ -1108,7 +1108,7 @@ impl Wallet { #[allow(deprecated)] fn sign_psbt_inner(&self, mut psbt: Psbt) -> Result { - let locked_wallet = self.inner.lock().unwrap(); + let locked_wallet = self.inner.lck(); // While BDK populates both `witness_utxo` and `non_witness_utxo` fields, LDK does not. As // BDK by default doesn't trust the witness UTXO to account for the Segwit bug, we must @@ -1256,7 +1256,7 @@ impl Wallet { }, }; - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); debug_assert!( locked_wallet.tx_details(txid).is_some(), @@ -1319,7 +1319,7 @@ impl Wallet { log_error!( self.logger, "Provided fee rate {} is too low for RBF fee bump of txid {}, required minimum fee rate: {}", - fee_rate.unwrap(), + fee_rate.expect("fee_rate.is_some() was checked above"), txid, required_fee_rate ); @@ -1380,7 +1380,7 @@ impl Wallet { }, } - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet after fee bump of {}: {}", txid, e); Error::PersistenceFailed @@ -1431,7 +1431,7 @@ impl Listen for Wallet { } fn block_connected(&self, block: &bitcoin::Block, height: u32) { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); let pre_checkpoint = locked_wallet.latest_checkpoint(); if pre_checkpoint.height() != height - 1 @@ -1481,7 +1481,7 @@ impl Listen for Wallet { }, }; - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); match locked_wallet.persist(&mut locked_persister) { Ok(_) => (), Err(e) => { @@ -1513,7 +1513,7 @@ impl WalletSource for Wallet { &'a self, outpoint: OutPoint, ) -> impl Future> + Send + 'a { async move { - let locked_wallet = self.inner.lock().unwrap(); + let locked_wallet = self.inner.lck(); locked_wallet .tx_details(outpoint.txid) .map(|tx_details| tx_details.tx.deref().clone()) diff --git a/src/wallet/ser.rs b/src/wallet/ser.rs index c1ad984e6..c6a707bcd 100644 --- a/src/wallet/ser.rs +++ b/src/wallet/ser.rs @@ -94,7 +94,9 @@ impl Readable for ChangeSetDeserWrapper { decode_tlv_stream!(reader, { (0, blocks, required), }); - Ok(Self(BdkLocalChainChangeSet { blocks: blocks.0.unwrap() })) + Ok(Self(BdkLocalChainChangeSet { + blocks: blocks.0.expect("required blocks TLV field should be present"), + })) } } @@ -141,10 +143,10 @@ impl Readable for ChangeSetDeserWrapper> (0, time, required), (2, txid, required), }); - set.insert((time.0.unwrap().0, txid.0.unwrap())); + set.insert(( + time.0.expect("required confirmation time TLV field should be present").0, + txid.0.expect("required txid TLV field should be present"), + )); } Ok(Self(set)) } @@ -205,7 +210,7 @@ impl Readable for ChangeSetDeserWrapper>> { read_tlv_fields!(reader, { (0, tx, required), }); - set.insert(Arc::new(tx.0.unwrap())); + set.insert(Arc::new(tx.0.expect("required transaction TLV field should be present"))); } Ok(Self(set)) } @@ -232,8 +237,10 @@ impl Readable for ChangeSetDeserWrapper { }); Ok(Self(ConfirmationBlockTime { - block_id: block_id.0.unwrap().0, - confirmation_time: confirmation_time.0.unwrap(), + block_id: block_id.0.expect("required block_id TLV field should be present").0, + confirmation_time: confirmation_time + .0 + .expect("required confirmation_time TLV field should be present"), })) } } @@ -257,7 +264,10 @@ impl Readable for ChangeSetDeserWrapper { (2, hash, required), }); - Ok(Self(BlockId { height: height.0.unwrap(), hash: hash.0.unwrap() })) + Ok(Self(BlockId { + height: height.0.expect("required height TLV field should be present"), + hash: hash.0.expect("required hash TLV field should be present"), + })) } } @@ -285,7 +295,10 @@ impl Readable for ChangeSetDeserWrapper { decode_tlv_stream!(reader, { (0, last_revealed, required) }); Ok(Self(BdkIndexerChangeSet { - last_revealed: last_revealed.0.unwrap().0, + last_revealed: last_revealed + .0 + .expect("required last_revealed TLV field should be present") + .0, spk_cache: Default::default(), })) } @@ -317,7 +330,10 @@ impl Readable for ChangeSetDeserWrapper> { (0, descriptor_id, required), (2, last_index, required), }); - set.insert(descriptor_id.0.unwrap().0, last_index.0.unwrap()); + set.insert( + descriptor_id.0.expect("required descriptor_id TLV field should be present").0, + last_index.0.expect("required last_index TLV field should be present"), + ); } Ok(Self(set)) } @@ -336,7 +352,9 @@ impl Readable for ChangeSetDeserWrapper { decode_tlv_stream!(reader, { (0, hash, required) }); - Ok(Self(DescriptorId(hash.0.unwrap().0))) + Ok(Self(DescriptorId( + hash.0.expect("required descriptor hash TLV field should be present").0, + ))) } } @@ -351,6 +369,9 @@ impl Readable for ChangeSetDeserWrapper { use bitcoin::hashes::Hash; let buf: [u8; 32] = Readable::read(reader)?; - Ok(Self(Sha256Hash::from_slice(&buf[..]).unwrap())) + Ok(Self( + Sha256Hash::from_slice(&buf[..]) + .expect("a 32-byte buffer should decode into a sha256 hash"), + )) } }