Skip to content

Commit 02f9fdd

Browse files
committed
fix: deepsource
1 parent d06732c commit 02f9fdd

3 files changed

Lines changed: 31 additions & 9 deletions

File tree

src/handlers/http/query.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub async fn get_records_and_fields(
117117
Ok((Some(records), Some(fields)))
118118
}
119119

120-
#[tracing::instrument(name = "query", skip(req, query_request), fields(otel.kind = "server", query.sql = %query_request.query, query.streaming = query_request.streaming))]
120+
#[tracing::instrument(name = "query", skip(req, query_request), fields(otel.kind = "server", query.sql = %query_request.query, query.streaming = query_request.streaming, query.start_time = query_request.start_time, query.end_time = query_request.end_time))]
121121
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
122122
let mut session_state = QUERY_SESSION.get_ctx().state();
123123
let time_range =
@@ -497,8 +497,18 @@ pub async fn create_streams_for_distributed(
497497
}
498498

499499
while let Some(result) = join_set.join_next().await {
500-
if let Err(join_error) = result {
501-
warn!("Task join error: {}", join_error);
500+
match result {
501+
Err(join_error) => {
502+
warn!("Task join error: {}", join_error);
503+
}
504+
Ok((stream_name, Err(e))) => {
505+
return Err(QueryError::Anyhow(anyhow::anyhow!(
506+
"Failed to create stream '{}': {}",
507+
stream_name,
508+
e
509+
)));
510+
}
511+
Ok((_, Ok(_))) => {}
502512
}
503513
}
504514

src/query/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ use crate::utils::time::TimeRange;
7878
// pub static QUERY_SESSION: Lazy<SessionContext> =
7979
// Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));
8080

81-
pub type RB = Either<
81+
/// Takes care of both streaming and non-streaming query flows
82+
pub type QueryResult = Either<
8283
Vec<RecordBatch>,
8384
Pin<
8485
Box<
@@ -159,7 +160,7 @@ pub async fn execute(
159160
query: Query,
160161
is_streaming: bool,
161162
tenant_id: &Option<String>,
162-
) -> Result<(RB, Vec<String>), ExecuteError> {
163+
) -> Result<(QueryResult, Vec<String>), ExecuteError> {
163164
let id = tenant_id.clone();
164165

165166
// W3C TraceContext propagation across QUERY_RUNTIME (separate OS-thread runtime).
@@ -293,7 +294,7 @@ impl Query {
293294
&self,
294295
is_streaming: bool,
295296
tenant_id: &Option<String>,
296-
) -> Result<(RB, Vec<String>), ExecuteError> {
297+
) -> Result<(QueryResult, Vec<String>), ExecuteError> {
297298
let df = QUERY_SESSION
298299
.get_ctx()
299300
.execute_logical_plan(self.final_logical_plan(tenant_id))

src/telemetry.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ use opentelemetry_sdk::{
2525
trace::{BatchSpanProcessor, SdkTracerProvider},
2626
};
2727

28+
const EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
29+
const EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
30+
2831
/// Initialise an OTLP tracer provider.
2932
///
3033
/// **Required env var:**
@@ -48,10 +51,10 @@ use opentelemetry_sdk::{
4851
pub fn init_otel_tracer() -> Option<SdkTracerProvider> {
4952
// Only used to decide whether OTEL is enabled; the SDK reads it again
5053
// from env to build the exporter (which also appends /v1/traces for HTTP).
51-
std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok()?;
54+
std::env::var(EXPORTER_OTLP_ENDPOINT).ok()?;
5255

5356
let protocol =
54-
std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_else(|_| "http/json".to_string());
57+
std::env::var(EXPORTER_OTLP_PROTOCOL).unwrap_or_else(|_| "http/json".to_string());
5558

5659
// Build the exporter using the SDK's env-var-aware builders.
5760
// We intentionally do NOT call .with_endpoint() / .with_headers() /
@@ -69,10 +72,18 @@ pub fn init_otel_tracer() -> Option<SdkTracerProvider> {
6972
// ── HTTP/JSON (default) ──────────────────────────────────────────────
7073
// Default when OTEL_EXPORTER_OTLP_PROTOCOL is unset.
7174
// Required for Parseable OSS — it only accepts application/json.
72-
_ => SpanExporter::builder()
75+
"http/json" => SpanExporter::builder()
7376
.with_http()
7477
.with_protocol(Protocol::HttpJson)
7578
.build(),
79+
other => {
80+
tracing::warn!(
81+
"Unknown OTEL_EXPORTER_OTLP_PROTOCOL value '{}'; disabling OTEL tracing. \
82+
Supported values: grpc, http/protobuf, http/json",
83+
other
84+
);
85+
return None;
86+
}
7687
};
7788

7889
let exporter = exporter

0 commit comments

Comments
 (0)