Skip to content

Commit fc0870e

Browse files
authored
fix: handle resetter channel errors (#22)
reset signal channels didn't check for errors on receive, and would only stop the subproxy if receive was `Ok`. however, it's possible that receiver lags, in which case `.recv()` would yield `Err`, which would cause the resetter task to exit w/o stopping the web-service. consequently, this can cause one of the sub-proxies to remain active, while 2 others would stop, and the whole proxy would endlessly wait in the `join_all`. this PR fixes above condition, improves error handling in general, and adds basic unit test.
2 parents ea7f203 + 9c9b45c commit fc0870e

12 files changed

Lines changed: 1163 additions & 437 deletions

File tree

Cargo.lock

Lines changed: 663 additions & 415 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/rproxy/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rproxy"
3-
version = "0.0.10"
3+
version = "0.0.11"
44
edition = "2024"
55
default-run = "rproxy"
66

@@ -70,3 +70,8 @@ uuid = { version = "1.18.1", features = ["v7" ]}
7070
valuable = { version = "0.1.1", features = ["derive"] }
7171
x509-parser = "0.18.0"
7272
zstd = "0.13.3"
73+
74+
[dev-dependencies]
75+
actix-rt = "2.11.0"
76+
jsonrpsee = { version = "0.26.0", features = ["server"] }
77+
mime = "0.3.17"

crates/rproxy/src/jrpc/jrpc_request.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const JRPC_METHOD_FCUV3_WITH_PAYLOAD: Cow<'static, str> =
1414

1515
const EMPTY_PARAMS: &Vec<serde_json::Value> = &Vec::new();
1616

17+
#[derive(Debug)]
1718
pub(crate) struct JrpcRequestMeta {
1819
id: Id,
1920

@@ -54,6 +55,7 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta {
5455
struct JrpcRequestMetaWire {
5556
id: Id,
5657
method: Cow<'static, str>,
58+
#[serde(default)]
5759
params: serde_json::Value,
5860
}
5961

@@ -93,7 +95,7 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta {
9395

9496
const JRPC_METHOD_BATCH: Cow<'static, str> = Cow::Borrowed("batch");
9597

96-
#[derive(Deserialize)]
98+
#[derive(Debug, Deserialize)]
9799
#[serde(untagged)]
98100
pub(crate) enum JrpcRequestMetaMaybeBatch {
99101
Single(JrpcRequestMeta),
@@ -108,3 +110,53 @@ impl JrpcRequestMetaMaybeBatch {
108110
}
109111
}
110112
}
113+
114+
// tests ---------------------------------------------------------------
115+
116+
#[cfg(test)]
117+
mod tests {
118+
use super::*;
119+
120+
#[test]
121+
fn test_jrpc_request_meta_maybe_batch_deserialize() {
122+
let json = r#"[
123+
{
124+
"jsonrpc": "2.0",
125+
"id": 1108954,
126+
"method": "net_version"
127+
},
128+
{
129+
"jsonrpc": "2.0",
130+
"id": "1108955",
131+
"method": "eth_getBlockByNumber",
132+
"params": [
133+
"0x73f151",
134+
true
135+
]
136+
}
137+
]"#;
138+
139+
let result: Result<JrpcRequestMetaMaybeBatch, _> = serde_json::from_str(json);
140+
assert!(result.is_ok(), "{result:?}");
141+
142+
let batch = result.unwrap();
143+
match batch {
144+
JrpcRequestMetaMaybeBatch::Batch(requests) => {
145+
assert_eq!(requests.len(), 2);
146+
147+
// First request
148+
assert_eq!(*requests[0].id(), Id::Number(1108954));
149+
assert_eq!(requests[0].method(), Cow::Borrowed("net_version"));
150+
assert!(requests[0].params().is_empty());
151+
152+
// Second request
153+
assert_eq!(*requests[1].id(), Id::Number(1108955));
154+
assert_eq!(requests[1].method(), Cow::Borrowed("eth_getBlockByNumber"));
155+
assert_eq!(requests[1].params().len(), 2);
156+
}
157+
JrpcRequestMetaMaybeBatch::Single(_) => {
158+
panic!("Expected Batch variant, got Single");
159+
}
160+
}
161+
}
162+
}

crates/rproxy/src/server.rs

Lines changed: 218 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ use crate::{
2828
utils::tls_certificate_validity_timestamps,
2929
};
3030

31-
const MAX_OPEN_FILES: u64 = 10240;
32-
3331
// Proxy ---------------------------------------------------------------
3432

3533
pub struct Server {}
@@ -39,6 +37,14 @@ impl Server {
3937
let canceller = Server::wait_for_shutdown_signal();
4038
let resetter = Server::wait_for_reset_signal(canceller.clone());
4139

40+
Self::_run(config, canceller, resetter).await
41+
}
42+
43+
async fn _run(
44+
config: Config,
45+
canceller: CancellationToken,
46+
resetter: broadcast::Sender<()>,
47+
) -> Result<(), Box<dyn std::error::Error + Send>> {
4248
// try to set system limits
4349
match rlimit::getrlimit(rlimit::Resource::NOFILE) {
4450
Ok((_, hard)) => {
@@ -195,7 +201,11 @@ impl Server {
195201
}));
196202
}
197203

198-
futures::future::join_all(services).await;
204+
for res in futures::future::join_all(services).await.iter() {
205+
if let Err(err) = res {
206+
warn!(error = ?err, "One of the services had failed")
207+
}
208+
}
199209
}
200210

201211
Ok(())
@@ -237,7 +247,7 @@ impl Server {
237247
}
238248

239249
fn wait_for_reset_signal(canceller: CancellationToken) -> broadcast::Sender<()> {
240-
let (resetter, _) = broadcast::channel::<()>(2);
250+
let (resetter, _) = broadcast::channel::<()>(1);
241251

242252
{
243253
let resetter = resetter.clone();
@@ -252,7 +262,8 @@ impl Server {
252262
info!("Hangup signal received, resetting...");
253263

254264
if let Err(err) = resetter.send(()) {
255-
error!(from = "sighup", error = ?err, "Failed to broadcast reset signal");
265+
error!(from = "sighup", error = ?err, "Failed to broadcast reset signal, shutting down whole proxy...");
266+
canceller.cancel();
256267
}
257268
}
258269

@@ -267,3 +278,205 @@ impl Server {
267278
resetter
268279
}
269280
}
281+
282+
// tests ===============================================================
283+
284+
#[cfg(test)]
285+
mod tests {
286+
use std::{net::SocketAddr, time::Duration};
287+
288+
use awc::{Client, http::header};
289+
use clap::Parser;
290+
use jsonrpsee::{
291+
RpcModule,
292+
server::{ServerBuilder, ServerHandle},
293+
};
294+
use tracing::{debug, info};
295+
296+
use super::*;
297+
use crate::config::Config;
298+
299+
async fn spawn_rpc_backend() -> (SocketAddr, ServerHandle) {
300+
let server = ServerBuilder::default().build("127.0.0.1:0").await.unwrap();
301+
302+
let addr: SocketAddr = server.local_addr().unwrap();
303+
304+
let mut module = RpcModule::new(());
305+
306+
module
307+
.register_async_method("eth_chainId", |_params, _ctx, _ext| async move {
308+
Ok::<_, jsonrpsee::types::ErrorObjectOwned>("0x1")
309+
})
310+
.unwrap();
311+
312+
let handle = server.start(module);
313+
314+
(addr, handle)
315+
}
316+
317+
#[actix_web::test]
318+
async fn test_circuit_breaker() {
319+
let (backend, _handle) = spawn_rpc_backend().await;
320+
321+
let cfg = {
322+
let mut cfg = Config::parse_from(["rproxy"]);
323+
324+
cfg.authrpc.enabled = true;
325+
cfg.authrpc.backend_url = format!("http://{backend}");
326+
cfg.authrpc.listen_address = "127.0.0.1:18645".into();
327+
cfg.authrpc.shutdown_timeout_sec = 1;
328+
329+
cfg.rpc.enabled = true;
330+
cfg.rpc.backend_url = format!("http://{backend}");
331+
cfg.rpc.listen_address = "127.0.0.1:18651".into();
332+
cfg.rpc.shutdown_timeout_sec = 1;
333+
334+
cfg.logging.level = "warn,rproxy::server::tests=info".into();
335+
cfg.logging.setup_logging();
336+
337+
cfg
338+
};
339+
340+
let proxy_addr_authrpc = cfg.clone().authrpc.listen_address;
341+
let proxy_addr_rpc = cfg.clone().rpc.listen_address;
342+
343+
let canceller = tokio_util::sync::CancellationToken::new();
344+
let resetter = Server::wait_for_reset_signal(canceller.clone());
345+
346+
let server = {
347+
let canceller = canceller.clone();
348+
let resetter = resetter.clone();
349+
350+
actix_rt::spawn(async move { Server::_run(cfg, canceller, resetter).await })
351+
};
352+
actix_rt::time::sleep(std::time::Duration::from_millis(100)).await;
353+
354+
{
355+
let canceller = canceller.clone();
356+
let client = Client::builder().timeout(Duration::from_millis(10)).finish();
357+
let proxy_addr_authrpc = proxy_addr_authrpc.clone();
358+
359+
actix_rt::spawn(async move {
360+
loop {
361+
actix_rt::time::sleep(std::time::Duration::from_millis(10)).await;
362+
363+
let req = client
364+
.post(format!("http://{proxy_addr_authrpc}"))
365+
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON))
366+
.send_body(
367+
r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#,
368+
);
369+
370+
tokio::select! {
371+
res = req => {
372+
match res {
373+
Ok(mut res) => {
374+
let _ = res.body().await;
375+
}
376+
377+
Err(err) => {
378+
debug!(error = ?err, "Failed to send a request");
379+
}
380+
}
381+
}
382+
383+
_ = canceller.cancelled() => {
384+
break
385+
}
386+
}
387+
}
388+
});
389+
}
390+
391+
{
392+
let canceller = canceller.clone();
393+
let client = Client::builder().timeout(Duration::from_millis(10)).finish();
394+
395+
actix_rt::spawn(async move {
396+
loop {
397+
actix_rt::time::sleep(std::time::Duration::from_millis(10)).await;
398+
399+
let req = client
400+
.post(format!("http://{proxy_addr_rpc}"))
401+
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON))
402+
.send_body(
403+
r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#,
404+
);
405+
406+
tokio::select! {
407+
res = req => {
408+
match res {
409+
Ok(mut res) => {
410+
let _ = res.body().await;
411+
}
412+
413+
Err(err) => {
414+
debug!(error = ?err, "Failed to send a request");
415+
}
416+
}
417+
}
418+
419+
_ = canceller.cancelled() => {
420+
break
421+
}
422+
}
423+
}
424+
});
425+
}
426+
427+
let client = Client::builder().timeout(Duration::from_millis(10)).finish();
428+
429+
for i in 0..10 {
430+
match resetter.send(()) {
431+
Err(err) => {
432+
debug!(iteration = i, error = ?err, "Failed to send a reset");
433+
}
434+
435+
Ok(proxies_count) => {
436+
info!(iteration = i, proxies_count = proxies_count, "Sent a reset");
437+
assert_eq!(
438+
proxies_count, 2,
439+
"sent reset wrong count of proxies: {proxies_count} != 2"
440+
);
441+
}
442+
}
443+
444+
actix_rt::time::sleep(std::time::Duration::from_millis(1200)).await;
445+
446+
let req = client
447+
.post(format!("http://{proxy_addr_authrpc}"))
448+
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON))
449+
.send_body(r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#);
450+
451+
tokio::select! {
452+
res = req => {
453+
match res {
454+
Ok(mut res) => {
455+
match res.body().await {
456+
Err(err) => {
457+
panic!("Failed to send a request: {err}");
458+
}
459+
Ok(body) => {
460+
let body = String::from_utf8_lossy(&body).to_string();
461+
info!("Sent a request and got a response: {body}");
462+
}
463+
}
464+
}
465+
466+
Err(err) => {
467+
panic!("Failed to send a request: {err}");
468+
}
469+
}
470+
}
471+
472+
_ = canceller.cancelled() => {
473+
break
474+
}
475+
}
476+
}
477+
478+
canceller.cancel();
479+
480+
tokio::time::timeout(tokio::time::Duration::from_secs(5), server).await.ok();
481+
}
482+
}

crates/rproxy/src/server/proxy/config/authrpc.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,17 @@ pub(crate) struct ConfigAuthrpc {
239239
name("authrpc_remove_backend_from_mirroring_peers")
240240
)]
241241
pub(crate) remove_backend_from_mirroring_peers: bool,
242+
243+
/// timeout for graceful shutdown of authrpc workers
244+
#[arg(
245+
default_value = "5",
246+
env = "RPROXY_AUTHRPC_SHUTDOWN_TIMEOUT_SEC",
247+
help_heading = "authrpc",
248+
long("authrpc-shutdown-timeout-sec"),
249+
name("authrpc_shutdown_timeout_sec"),
250+
value_name = "seconds"
251+
)]
252+
pub(crate) shutdown_timeout_sec: u64,
242253
}
243254

244255
impl ConfigAuthrpc {
@@ -449,6 +460,11 @@ impl ConfigProxyHttp for ConfigAuthrpc {
449460
fn prealloacated_response_buffer_size(&self) -> usize {
450461
1024 * self.prealloacated_response_buffer_size_kb
451462
}
463+
464+
#[inline]
465+
fn shutdown_timeout_sec(&self) -> u64 {
466+
self.shutdown_timeout_sec
467+
}
452468
}
453469

454470
// ConfigAuthrpcError --------------------------------------------------

0 commit comments

Comments
 (0)