Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/net_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class NetServer {
int GetTimerInterval() const { return timer_interval_; }

// Get the actual worker thread count (resolved from auto mode).
int GetWorkerCount() { return sock_workers_.GetThreadWorkerNum(); }
int GetWorkerCount() const { return sock_workers_.GetThreadWorkerNum(); }

// Access the socket dispatchers (one per worker thread).
// Used by upstream connection pooling to pin outbound connections
Expand Down
8 changes: 6 additions & 2 deletions server/cli/daemonizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ void Daemonizer::Daemonize() {
void Daemonizer::NotifyReady() {
if (g_ready_pipe[1] >= 0) {
unsigned char status = 0; // success
(void)write(g_ready_pipe[1], &status, 1);
// Best-effort: if the write fails the parent detects it via short
// read / EOF on the pipe — no recovery possible here.
ssize_t n = write(g_ready_pipe[1], &status, 1);
(void)n;
close(g_ready_pipe[1]);
g_ready_pipe[1] = -1;
}
Expand All @@ -108,7 +111,8 @@ void Daemonizer::NotifyReady() {
void Daemonizer::NotifyFailed() {
if (g_ready_pipe[1] >= 0) {
unsigned char status = 1; // failure
(void)write(g_ready_pipe[1], &status, 1);
ssize_t n = write(g_ready_pipe[1], &status, 1);
(void)n;
close(g_ready_pipe[1]);
g_ready_pipe[1] = -1;
}
Expand Down
6 changes: 5 additions & 1 deletion server/connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,11 @@ void ConnectionHandler::DoSend(const char *data, size_t size){
output_bf_.AppendWithHead(data, size);

if (output_bf_.Size() > 0) {
ssize_t written;
// Default 0 keeps the function fail-safe under the HANDSHAKE branch
// (`tls_state_ != HANDSHAKE` gate below) and silences GCC's
// -Wmaybe-uninitialized: the compiler can't prove the gate-vs-branch
// correlation across the if/else-if chain.
ssize_t written = 0;
if (tls_state_ == TlsState::READY) {
size_t try_len = output_bf_.Size();
written = tls_->Write(output_bf_.Data(), try_len);
Expand Down
9 changes: 8 additions & 1 deletion server/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,14 @@ void Dispatcher::TimerHandler(){
// explicitly re-armed via ResetTimer().
#if defined(__linux__)
uint64_t expirations;
::read(timer_fd_, &expirations, sizeof(expirations));
// Drain the timerfd to clear epoll readability. Value is unused — only
// the side effect (consuming the pending read) matters. EAGAIN is the
// expected race outcome when the fd was drained by a sibling event.
ssize_t n = ::read(timer_fd_, &expirations, sizeof(expirations));
if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
logging::Get()->warn("timerfd drain read failed: {} (errno={})",
logging::SafeStrerror(errno), errno);
}
TimeStamp::ResetTimerFd(timer_fd_, end_t_);
#elif defined(__APPLE__) || defined(__MACH__)
ep_->ResetTimer(end_t_);
Expand Down
9 changes: 7 additions & 2 deletions server/upstream/proxy_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3824,8 +3824,13 @@ bool ProxyTransaction::MaybeRetry(RetryPolicy::RetryCondition condition,
poison_connection_ = true;
}

// Retry not allowed -- map condition to appropriate error response
int result_code;
// Retry not allowed -- map condition to appropriate error response.
// Defensive default (matches the GRPC_UNAVAILABLE branch below) ensures
// any future RetryCondition enum value that forgets to assign falls back
// to a neutral 502, and silences -Wmaybe-uninitialized at -O1 where the
// compiler cannot see that RESPONSE_5XX returns before reaching the
// DeliverTerminalError call.
int result_code = RESULT_UPSTREAM_DISCONNECT;
switch (condition) {
case RetryPolicy::RetryCondition::CONNECT_FAILURE:
result_code = RESULT_CHECKOUT_FAILED;
Expand Down
2 changes: 1 addition & 1 deletion test/grpc/grpc_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ void TestClassifyRequest_GrpcWeb_Suppressed() {
void TestClassifyRequest_PlusVariants() {
std::cout << "\n[TEST] ClassifyRequest: application/grpc+proto / +json variants" << std::endl;
bool all_good = true;
for (const std::string& ct :
for (const std::string ct :
{"application/grpc+proto", "application/grpc+json", "application/grpc+xxx"}) {
HttpRequest req;
req.http_major = 2;
Expand Down
53 changes: 2 additions & 51 deletions test/http/streaming_request_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,55 +111,6 @@ static int ConnectRaw(int port) {
return fd;
}

// Receive until the string contains needle or timeout elapses.
static std::string RecvUntilContains(int fd, const std::string& needle, int timeout_ms) {
std::string out;
auto deadline =
std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
while (std::chrono::steady_clock::now() < deadline &&
out.find(needle) == std::string::npos) {
auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
deadline - std::chrono::steady_clock::now());
struct pollfd pfd{fd, POLLIN, 0};
int rv;
do { rv = poll(&pfd, 1, static_cast<int>(remaining.count())); }
while (rv < 0 && errno == EINTR);
if (rv <= 0) break;
char buf[4096];
ssize_t n = recv(fd, buf, sizeof(buf), 0);
if (n <= 0) break;
out.append(buf, static_cast<size_t>(n));
}
return out;
}

// Build a minimal ServerConfig for integration tests.
static ServerConfig MakeStreamingGwConfig(const std::string& upstream_name = "",
const std::string& upstream_host = "",
int upstream_port = 0,
const std::string& route_prefix = "") {
ServerConfig cfg;
cfg.bind_host = "127.0.0.1";
cfg.bind_port = 0;
cfg.worker_threads = 2;
cfg.http2.enabled = false; // plain HTTP/1.1 for most streaming tests
if (!upstream_name.empty()) {
UpstreamConfig u;
u.name = upstream_name;
u.host = upstream_host;
u.port = upstream_port;
u.pool.max_connections = 4;
u.pool.max_idle_connections = 2;
u.pool.connect_timeout_ms = 3000;
u.pool.idle_timeout_sec = 30;
u.proxy.route_prefix = route_prefix;
u.proxy.response_timeout_ms = 5000;
u.request_mode = http::RouteRequestMode::Streaming;
cfg.upstreams.push_back(u);
}
return cfg;
}

// Poll until pred() returns true or timeout elapses.
static bool WaitFor(std::function<bool()> pred,
std::chrono::milliseconds timeout = std::chrono::milliseconds{3000}) {
Expand Down Expand Up @@ -726,13 +677,13 @@ void TestJ10_IsMethodRetryableForReplay() {
std::string err;

// RFC 7231 §4.2.2 idempotent methods → retryable.
for (const std::string& m : {"GET", "HEAD", "PUT", "DELETE", "OPTIONS", "TRACE"}) {
for (const std::string m : {"GET", "HEAD", "PUT", "DELETE", "OPTIONS", "TRACE"}) {
if (!policy.IsMethodRetryableForReplay(m)) {
pass = false; err += m + " should be retryable; ";
}
}
// Non-idempotent methods → not retryable.
for (const std::string& m : {"POST", "PATCH", "CONNECT"}) {
for (const std::string m : {"POST", "PATCH", "CONNECT"}) {
if (policy.IsMethodRetryableForReplay(m)) {
pass = false; err += m + " should NOT be retryable; ";
}
Expand Down
10 changes: 8 additions & 2 deletions thread_pool/include/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ class ThreadPool{
// double-end link list to store the tasks awaiting processing
std::deque<std::shared_ptr<ThreadTaskInterface>> tasks_;

int thread_nums = 0;
// Worker count is read concurrently from net_server dispatch hot paths
// (`sock_workers_.GetThreadWorkerNum()` in OnNewConnection / close / error
// handlers) while writes happen only from Init() under state_->mtx. The
// atomic prevents a data race (TSan-visible) and avoids needing a lock in
// the getter, which would deadlock when called from Start() while
// state_->mtx is held.
std::atomic<int> thread_nums{0};
const int DEFAULT_THREAD_NUMS = 6;
// Shared state: heap-allocated and ref-counted so worker threads can
// safely access it after a task destroys the pool (self-stop path).
Expand Down Expand Up @@ -65,7 +71,7 @@ class ThreadPool{
void Stop();

void SetThreadWorkerNum(int, bool);
int GetThreadWorkerNum();
int GetThreadWorkerNum() const;

// Set a custom error logger. Default: std::cerr. Set this to route
// errors through spdlog when running in daemon mode (stderr is /dev/null).
Expand Down
8 changes: 4 additions & 4 deletions thread_pool/src/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ void ThreadPool::LogError(const std::string& msg) {
}
}

inline void ThreadPool::SetThreadWorkerNum(int nums, bool /*set_by_init*/){
thread_nums = nums;
void ThreadPool::SetThreadWorkerNum(int nums, bool /*set_by_init*/) {
thread_nums.store(nums, std::memory_order_relaxed);
}

inline int ThreadPool::GetThreadWorkerNum(){
return thread_nums;
int ThreadPool::GetThreadWorkerNum() const {
return thread_nums.load(std::memory_order_relaxed);
}

void ThreadPool::Init(int worker_nums){
Expand Down
Loading