diff --git a/include/net_server.h b/include/net_server.h index 71ebe191..ecdb6dc9 100644 --- a/include/net_server.h +++ b/include/net_server.h @@ -160,7 +160,7 @@ class NetServer { int GetTimerInterval() const { return timer_interval_; } // Get the actual worker thread count (resolved from auto mode). - int GetWorkerCount() { return sock_workers_.GetThreadWorkerNum(); } + int GetWorkerCount() const { return sock_workers_.GetThreadWorkerNum(); } // Access the socket dispatchers (one per worker thread). // Used by upstream connection pooling to pin outbound connections diff --git a/server/cli/daemonizer.cc b/server/cli/daemonizer.cc index f11f5717..9998dfff 100644 --- a/server/cli/daemonizer.cc +++ b/server/cli/daemonizer.cc @@ -99,7 +99,10 @@ void Daemonizer::Daemonize() { void Daemonizer::NotifyReady() { if (g_ready_pipe[1] >= 0) { unsigned char status = 0; // success - (void)write(g_ready_pipe[1], &status, 1); + // Best-effort: if the write fails the parent detects it via short + // read / EOF on the pipe — no recovery possible here. + ssize_t n = write(g_ready_pipe[1], &status, 1); + (void)n; close(g_ready_pipe[1]); g_ready_pipe[1] = -1; } @@ -108,7 +111,8 @@ void Daemonizer::NotifyReady() { void Daemonizer::NotifyFailed() { if (g_ready_pipe[1] >= 0) { unsigned char status = 1; // failure - (void)write(g_ready_pipe[1], &status, 1); + ssize_t n = write(g_ready_pipe[1], &status, 1); + (void)n; close(g_ready_pipe[1]); g_ready_pipe[1] = -1; } diff --git a/server/connection_handler.cc b/server/connection_handler.cc index 6ccbc4dd..5b23114f 100644 --- a/server/connection_handler.cc +++ b/server/connection_handler.cc @@ -524,7 +524,11 @@ void ConnectionHandler::DoSend(const char *data, size_t size){ output_bf_.AppendWithHead(data, size); if (output_bf_.Size() > 0) { - ssize_t written; + // Default 0 keeps the function fail-safe under the HANDSHAKE branch + // (`tls_state_ != HANDSHAKE` gate below) and silences GCC's + // -Wmaybe-uninitialized: the compiler can't prove the gate-vs-branch + // correlation across the if/else-if chain. + ssize_t written = 0; if (tls_state_ == TlsState::READY) { size_t try_len = output_bf_.Size(); written = tls_->Write(output_bf_.Data(), try_len); diff --git a/server/dispatcher.cc b/server/dispatcher.cc index a01791ee..b9fec3d7 100644 --- a/server/dispatcher.cc +++ b/server/dispatcher.cc @@ -610,7 +610,14 @@ void Dispatcher::TimerHandler(){ // explicitly re-armed via ResetTimer(). #if defined(__linux__) uint64_t expirations; - ::read(timer_fd_, &expirations, sizeof(expirations)); + // Drain the timerfd to clear epoll readability. Value is unused — only + // the side effect (consuming the pending read) matters. EAGAIN is the + // expected race outcome when the fd was drained by a sibling event. + ssize_t n = ::read(timer_fd_, &expirations, sizeof(expirations)); + if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + logging::Get()->warn("timerfd drain read failed: {} (errno={})", + logging::SafeStrerror(errno), errno); + } TimeStamp::ResetTimerFd(timer_fd_, end_t_); #elif defined(__APPLE__) || defined(__MACH__) ep_->ResetTimer(end_t_); diff --git a/server/upstream/proxy_transaction.cc b/server/upstream/proxy_transaction.cc index caaa2995..9203122b 100644 --- a/server/upstream/proxy_transaction.cc +++ b/server/upstream/proxy_transaction.cc @@ -3824,8 +3824,13 @@ bool ProxyTransaction::MaybeRetry(RetryPolicy::RetryCondition condition, poison_connection_ = true; } - // Retry not allowed -- map condition to appropriate error response - int result_code; + // Retry not allowed -- map condition to appropriate error response. + // Defensive default (matches the GRPC_UNAVAILABLE branch below) ensures + // any future RetryCondition enum value that forgets to assign falls back + // to a neutral 502, and silences -Wmaybe-uninitialized at -O1 where the + // compiler cannot see that RESPONSE_5XX returns before reaching the + // DeliverTerminalError call. + int result_code = RESULT_UPSTREAM_DISCONNECT; switch (condition) { case RetryPolicy::RetryCondition::CONNECT_FAILURE: result_code = RESULT_CHECKOUT_FAILED; diff --git a/test/grpc/grpc_test.h b/test/grpc/grpc_test.h index dc10cafe..03ea37fe 100644 --- a/test/grpc/grpc_test.h +++ b/test/grpc/grpc_test.h @@ -444,7 +444,7 @@ void TestClassifyRequest_GrpcWeb_Suppressed() { void TestClassifyRequest_PlusVariants() { std::cout << "\n[TEST] ClassifyRequest: application/grpc+proto / +json variants" << std::endl; bool all_good = true; - for (const std::string& ct : + for (const std::string ct : {"application/grpc+proto", "application/grpc+json", "application/grpc+xxx"}) { HttpRequest req; req.http_major = 2; diff --git a/test/http/streaming_request_test.h b/test/http/streaming_request_test.h index bd6118cb..80788bc7 100644 --- a/test/http/streaming_request_test.h +++ b/test/http/streaming_request_test.h @@ -111,55 +111,6 @@ static int ConnectRaw(int port) { return fd; } -// Receive until the string contains needle or timeout elapses. -static std::string RecvUntilContains(int fd, const std::string& needle, int timeout_ms) { - std::string out; - auto deadline = - std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); - while (std::chrono::steady_clock::now() < deadline && - out.find(needle) == std::string::npos) { - auto remaining = std::chrono::duration_cast( - deadline - std::chrono::steady_clock::now()); - struct pollfd pfd{fd, POLLIN, 0}; - int rv; - do { rv = poll(&pfd, 1, static_cast(remaining.count())); } - while (rv < 0 && errno == EINTR); - if (rv <= 0) break; - char buf[4096]; - ssize_t n = recv(fd, buf, sizeof(buf), 0); - if (n <= 0) break; - out.append(buf, static_cast(n)); - } - return out; -} - -// Build a minimal ServerConfig for integration tests. -static ServerConfig MakeStreamingGwConfig(const std::string& upstream_name = "", - const std::string& upstream_host = "", - int upstream_port = 0, - const std::string& route_prefix = "") { - ServerConfig cfg; - cfg.bind_host = "127.0.0.1"; - cfg.bind_port = 0; - cfg.worker_threads = 2; - cfg.http2.enabled = false; // plain HTTP/1.1 for most streaming tests - if (!upstream_name.empty()) { - UpstreamConfig u; - u.name = upstream_name; - u.host = upstream_host; - u.port = upstream_port; - u.pool.max_connections = 4; - u.pool.max_idle_connections = 2; - u.pool.connect_timeout_ms = 3000; - u.pool.idle_timeout_sec = 30; - u.proxy.route_prefix = route_prefix; - u.proxy.response_timeout_ms = 5000; - u.request_mode = http::RouteRequestMode::Streaming; - cfg.upstreams.push_back(u); - } - return cfg; -} - // Poll until pred() returns true or timeout elapses. static bool WaitFor(std::function pred, std::chrono::milliseconds timeout = std::chrono::milliseconds{3000}) { @@ -726,13 +677,13 @@ void TestJ10_IsMethodRetryableForReplay() { std::string err; // RFC 7231 §4.2.2 idempotent methods → retryable. - for (const std::string& m : {"GET", "HEAD", "PUT", "DELETE", "OPTIONS", "TRACE"}) { + for (const std::string m : {"GET", "HEAD", "PUT", "DELETE", "OPTIONS", "TRACE"}) { if (!policy.IsMethodRetryableForReplay(m)) { pass = false; err += m + " should be retryable; "; } } // Non-idempotent methods → not retryable. - for (const std::string& m : {"POST", "PATCH", "CONNECT"}) { + for (const std::string m : {"POST", "PATCH", "CONNECT"}) { if (policy.IsMethodRetryableForReplay(m)) { pass = false; err += m + " should NOT be retryable; "; } diff --git a/thread_pool/include/threadpool.h b/thread_pool/include/threadpool.h index f17cf860..56571714 100644 --- a/thread_pool/include/threadpool.h +++ b/thread_pool/include/threadpool.h @@ -19,7 +19,13 @@ class ThreadPool{ // double-end link list to store the tasks awaiting processing std::deque> tasks_; - int thread_nums = 0; + // Worker count is read concurrently from net_server dispatch hot paths + // (`sock_workers_.GetThreadWorkerNum()` in OnNewConnection / close / error + // handlers) while writes happen only from Init() under state_->mtx. The + // atomic prevents a data race (TSan-visible) and avoids needing a lock in + // the getter, which would deadlock when called from Start() while + // state_->mtx is held. + std::atomic thread_nums{0}; const int DEFAULT_THREAD_NUMS = 6; // Shared state: heap-allocated and ref-counted so worker threads can // safely access it after a task destroys the pool (self-stop path). @@ -65,7 +71,7 @@ class ThreadPool{ void Stop(); void SetThreadWorkerNum(int, bool); - int GetThreadWorkerNum(); + int GetThreadWorkerNum() const; // Set a custom error logger. Default: std::cerr. Set this to route // errors through spdlog when running in daemon mode (stderr is /dev/null). diff --git a/thread_pool/src/threadpool.cc b/thread_pool/src/threadpool.cc index 6ac5cb9a..d32b25a0 100644 --- a/thread_pool/src/threadpool.cc +++ b/thread_pool/src/threadpool.cc @@ -36,12 +36,12 @@ void ThreadPool::LogError(const std::string& msg) { } } -inline void ThreadPool::SetThreadWorkerNum(int nums, bool /*set_by_init*/){ - thread_nums = nums; +void ThreadPool::SetThreadWorkerNum(int nums, bool /*set_by_init*/) { + thread_nums.store(nums, std::memory_order_relaxed); } -inline int ThreadPool::GetThreadWorkerNum(){ - return thread_nums; +int ThreadPool::GetThreadWorkerNum() const { + return thread_nums.load(std::memory_order_relaxed); } void ThreadPool::Init(int worker_nums){