diff --git a/src/correlation.rs b/src/correlation.rs deleted file mode 100644 index 16fde8a48..000000000 --- a/src/correlation.rs +++ /dev/null @@ -1,418 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2025 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use std::collections::{HashMap, HashSet}; - -use actix_web::http::StatusCode; -use actix_web::{Error, http::header::ContentType}; -use chrono::Utc; -use datafusion::error::DataFusionError; -use itertools::Itertools; -use once_cell::sync::Lazy; -use relative_path::RelativePathBuf; -use serde::{Deserialize, Serialize}; -use serde_json::Error as SerdeError; -use tokio::sync::RwLock; -use tracing::error; - -use crate::{ - handlers::http::{ - rbac::RBACError, - users::{CORRELATION_DIR, USERS_ROOT_DIR}, - }, - metastore::{MetastoreError, metastore_traits::MetastoreObject}, - parseable::{DEFAULT_TENANT, PARSEABLE}, - query::QUERY_SESSION, - rbac::{Users, map::SessionKey}, - storage::ObjectStorageError, - users::filters::FilterQuery, - utils::{get_hash, get_tenant_id_from_key, user_auth_for_datasets}, -}; - -pub static CORRELATIONS: Lazy = Lazy::new(Correlations::default); - -type CorrelationMap = HashMap; - -#[derive(Debug, Default, derive_more::Deref)] -pub struct Correlations(RwLock>); - -impl Correlations { - // Load correlations from storage - pub async fn load(&self) -> anyhow::Result<()> { - let all_correlations = PARSEABLE.metastore.get_correlations().await?; - - let mut guard = self.write().await; - - for (tenant_id, correlations_bytes) in all_correlations { - let mut corrs = HashMap::new(); - for corr in correlations_bytes { - let correlation = match serde_json::from_slice::(&corr) { - Ok(c) => c, - Err(e) => { - error!("Unable to load correlation file : {e}"); - continue; - } - }; - corrs.insert(correlation.id.clone(), correlation); - } - - guard.insert(tenant_id, corrs); - } - - Ok(()) - } - - pub async fn list_correlations( - &self, - session_key: &SessionKey, - ) -> Result, CorrelationError> { - let mut user_correlations = vec![]; - let permissions = Users.get_permissions(session_key); - let tenant_id = get_tenant_id_from_key(session_key); - let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(corrs) = self.read().await.get(tenant) { - for correlation in corrs.values() { - let tables = &correlation - .table_configs - .iter() - .map(|t| t.table_name.clone()) - .collect_vec(); - if user_auth_for_datasets(&permissions, tables, &tenant_id) - .await - .is_ok() - { - user_correlations.push(correlation.clone()); - } - } - } - - Ok(user_correlations) - } - - pub async fn get_correlation( - &self, - correlation_id: &str, - tenant_id: &Option, - ) -> Result { - let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(corrs) = self.read().await.get(tenant_id) { - corrs.get(correlation_id).cloned().ok_or_else(|| { - CorrelationError::AnyhowError(anyhow::Error::msg(format!( - "Unable to find correlation with ID- {correlation_id}" - ))) - }) - } else { - Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( - "Unable to find correlation with ID- {correlation_id}" - )))) - } - } - - /// Create correlation associated with the user - pub async fn create( - &self, - mut correlation: CorrelationConfig, - session_key: &SessionKey, - ) -> Result { - correlation.id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); - correlation.validate(session_key).await?; - let tenant_id = get_tenant_id_from_key(session_key); - // Update in metastore - PARSEABLE - .metastore - .put_correlation(&correlation, &tenant_id) - .await?; - let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - // Update in memory - self.write() - .await - .entry(tenant.to_string()) - .or_default() - .insert(correlation.id.to_owned(), correlation.clone()); - - Ok(correlation) - } - - /// Update existing correlation for the user and with the same ID - pub async fn update( - &self, - mut updated_correlation: CorrelationConfig, - session_key: &SessionKey, - ) -> Result { - let tenant_id = get_tenant_id_from_key(session_key); - // validate whether user has access to this correlation object or not - let correlation = self - .get_correlation(&updated_correlation.id, &tenant_id) - .await?; - if correlation.user_id != updated_correlation.user_id { - return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( - r#"User "{}" isn't authorized to update correlation with ID - {}"#, - updated_correlation.user_id, correlation.id - )))); - } - - correlation.validate(session_key).await?; - updated_correlation.update(correlation); - - // Update in metastore - PARSEABLE - .metastore - .put_correlation(&updated_correlation, &tenant_id) - .await?; - - let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - // Update in memory - if let Some(corrs) = self.write().await.get_mut(tenant) { - corrs.insert( - updated_correlation.id.to_owned(), - updated_correlation.clone(), - ); - } - - Ok(updated_correlation) - } - - /// Delete correlation from memory and storage - pub async fn delete( - &self, - correlation_id: &str, - user_id: &str, - tenant_id: &Option, - ) -> Result<(), CorrelationError> { - let correlation = CORRELATIONS - .get_correlation(correlation_id, tenant_id) - .await?; - if correlation.user_id != user_id { - return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( - r#"User "{user_id}" isn't authorized to delete correlation with ID - {correlation_id}"# - )))); - } - - // Delete from storage - PARSEABLE - .metastore - .delete_correlation(&correlation, tenant_id) - .await?; - - // Delete from memory - self.write() - .await - .entry(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT).to_owned()) - .or_default() - .remove(&correlation.id); - - Ok(()) - } -} - -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum CorrelationVersion { - #[default] - V1, -} - -type CorrelationId = String; -type UserId = String; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CorrelationConfig { - #[serde(default)] - pub version: CorrelationVersion, - pub title: String, - #[serde(default)] - pub id: CorrelationId, - #[serde(default)] - pub user_id: UserId, - pub table_configs: Vec, - pub join_config: JoinConfig, - pub filter: Option, - pub start_time: Option, - pub end_time: Option, -} - -impl MetastoreObject for CorrelationConfig { - fn get_object_path(&self) -> String { - self.path().to_string() - } - - fn get_object_id(&self) -> String { - self.id.clone() - } -} - -impl CorrelationConfig { - pub fn path(&self) -> RelativePathBuf { - RelativePathBuf::from_iter([ - USERS_ROOT_DIR, - &self.user_id, - CORRELATION_DIR, - &format!("{}.json", self.id), - ]) - } - - pub fn update(&mut self, update: Self) { - self.title = update.title; - self.table_configs = update.table_configs; - self.join_config = update.join_config; - self.filter = update.filter; - self.start_time = update.start_time; - self.end_time = update.end_time; - } - - /// This function will validate the TableConfigs, JoinConfig, and user auth - pub async fn validate(&self, session_key: &SessionKey) -> Result<(), CorrelationError> { - let tenant_id = get_tenant_id_from_key(session_key); - let ctx = &QUERY_SESSION.get_ctx(); - let h1: HashSet<&String> = self.table_configs.iter().map(|t| &t.table_name).collect(); - let h2: HashSet<&String> = self - .join_config - .join_conditions - .iter() - .map(|j| &j.table_name) - .collect(); - - // check if table config tables are the same - if h1.len() != 2 { - return Err(CorrelationError::Metadata( - "Must provide config for two unique tables", - )); - } - - // check that the tables mentioned in join config are - // the same as those in table config - if h1 != h2 { - return Err(CorrelationError::Metadata( - "Must provide same tables for join config and table config", - )); - } - - // check if user has access to table - let permissions = Users.get_permissions(session_key); - let tables = &self - .table_configs - .iter() - .map(|t| t.table_name.clone()) - .collect_vec(); - - user_auth_for_datasets(&permissions, tables, &tenant_id).await?; - - // to validate table config, we need to check whether the mentioned fields - // are present in the table or not - for table_config in self.table_configs.iter() { - // table config check - let df = ctx.table(&table_config.table_name).await?; - - let mut selected_fields = table_config - .selected_fields - .iter() - .map(|c| c.as_str()) - .collect_vec(); - - // unwrap because we have determined that the tables in table config are the same as those in join config - let condition = self - .join_config - .join_conditions - .iter() - .find(|j| j.table_name == table_config.table_name) - .unwrap(); - let join_field = condition.field.as_str(); - - if !selected_fields.contains(&join_field) { - selected_fields.push(join_field); - } - - // if this errors out then the table config is incorrect or join config is incorrect - df.select_columns(selected_fields.as_slice())?; - } - - Ok(()) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum CorrelationError { - #[error("Failed to connect to storage: {0}")] - ObjectStorage(#[from] ObjectStorageError), - #[error("Serde Error: {0}")] - Serde(#[from] SerdeError), - #[error("Cannot perform this operation: {0}")] - Metadata(&'static str), - #[error("User does not exist")] - UserDoesNotExist(#[from] RBACError), - #[error("Error: {0}")] - AnyhowError(#[from] anyhow::Error), - #[error("Unauthorized")] - Unauthorized, - #[error("DataFusion Error: {0}")] - DataFusion(#[from] DataFusionError), - #[error("{0}")] - ActixError(#[from] Error), - #[error(transparent)] - MetastoreError(#[from] MetastoreError), -} - -impl actix_web::ResponseError for CorrelationError { - fn status_code(&self) -> StatusCode { - match self { - Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, - Self::Serde(_) => StatusCode::BAD_REQUEST, - Self::Metadata(_) => StatusCode::BAD_REQUEST, - Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND, - Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR, - Self::Unauthorized => StatusCode::BAD_REQUEST, - Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR, - Self::ActixError(_) => StatusCode::BAD_REQUEST, - Self::MetastoreError(e) => e.status_code(), - } - } - - fn error_response(&self) -> actix_web::HttpResponse { - match self { - CorrelationError::MetastoreError(e) => { - actix_web::HttpResponse::build(self.status_code()) - .insert_header(ContentType::json()) - .json(e.to_detail()) - } - _ => actix_web::HttpResponse::build(self.status_code()) - .insert_header(ContentType::plaintext()) - .body(self.to_string()), - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct TableConfig { - pub selected_fields: Vec, - pub table_name: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct JoinCondition { - pub table_name: String, - pub field: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct JoinConfig { - pub join_conditions: Vec, -} diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs deleted file mode 100644 index adcb4157c..000000000 --- a/src/handlers/http/correlation.rs +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2025 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use actix_web::web::{Json, Path}; -use actix_web::{HttpRequest, HttpResponse, Responder, web}; -use anyhow::Error; -use itertools::Itertools; - -use crate::rbac::Users; -use crate::utils::actix::extract_session_key_from_req; -use crate::utils::{ - get_hash, get_tenant_id_from_request, get_user_and_tenant_from_request, user_auth_for_datasets, -}; - -use crate::correlation::{CORRELATIONS, CorrelationConfig, CorrelationError}; - -pub async fn list(req: HttpRequest) -> Result { - let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - - let correlations = CORRELATIONS.list_correlations(&session_key).await?; - - Ok(web::Json(correlations)) -} - -pub async fn get( - req: HttpRequest, - correlation_id: Path, -) -> Result { - let tenant_id = get_tenant_id_from_request(&req); - let correlation_id = correlation_id.into_inner(); - let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - - let correlation = CORRELATIONS - .get_correlation(&correlation_id, &tenant_id) - .await?; - - let permissions = Users.get_permissions(&session_key); - - let tables = &correlation - .table_configs - .iter() - .map(|t| t.table_name.clone()) - .collect_vec(); - - user_auth_for_datasets(&permissions, tables, &tenant_id).await?; - - Ok(web::Json(correlation)) -} - -pub async fn post( - req: HttpRequest, - Json(mut correlation): Json, -) -> Result { - let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - let user_id = get_user_and_tenant_from_request(&req) - .map(|(s, _)| get_hash(&s.to_string())) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - correlation.user_id = user_id; - - let correlation = CORRELATIONS.create(correlation, &session_key).await?; - - Ok(web::Json(correlation)) -} - -pub async fn modify( - req: HttpRequest, - correlation_id: Path, - Json(mut correlation): Json, -) -> Result { - correlation.id = correlation_id.into_inner(); - correlation.user_id = get_user_and_tenant_from_request(&req) - .map(|(s, _)| get_hash(&s.to_string())) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - - let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - - let correlation = CORRELATIONS.update(correlation, &session_key).await?; - - Ok(web::Json(correlation)) -} - -pub async fn delete( - req: HttpRequest, - correlation_id: Path, -) -> Result { - let correlation_id = correlation_id.into_inner(); - let (user_id, tenant_id) = get_user_and_tenant_from_request(&req) - .map(|(s, t)| (get_hash(&s.to_string()), t)) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - - CORRELATIONS - .delete(&correlation_id, &user_id, &tenant_id) - .await?; - - Ok(HttpResponse::Ok().finish()) -} diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 993a40b10..a2348cf73 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -31,7 +31,6 @@ use self::query::Query; pub mod about; pub mod alerts; pub mod cluster; -pub mod correlation; pub mod demo_data; pub mod health_check; pub mod ingest; diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 115f8b643..c464ef53f 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -36,7 +36,6 @@ use tracing::{error, info, warn}; use crate::{ alerts::{ALERTS, get_alert_manager, target::TARGETS}, cli::Options, - correlation::CORRELATIONS, hottier::{HotTierManager, StreamHotTier}, metastore::metastore_traits::MetastoreObject, oauth::{OAuthProvider, connect_oidc}, @@ -169,34 +168,22 @@ pub trait ParseableServer { pub async fn load_on_init() -> anyhow::Result<()> { // Run all loading operations concurrently - let (correlations_result, filters_result, dashboards_result, alerts_result, targets_result) = - future::join5( - async { - CORRELATIONS - .load() - .await - .context("Failed to load correlations") - }, - async { FILTERS.load().await.context("Failed to load filters") }, - async { DASHBOARDS.load().await.context("Failed to load dashboards") }, - async { - get_alert_manager().await; - let guard = ALERTS.write().await; - let alerts = if let Some(alerts) = guard.as_ref() { - alerts - } else { - return Err(anyhow::Error::msg("No AlertManager set")); - }; - alerts.load().await - }, - async { TARGETS.load().await.context("Failed to load targets") }, - ) - .await; - - // Handle errors from each operation - if let Err(e) = correlations_result { - error!("{e}"); - } + let (filters_result, dashboards_result, alerts_result, targets_result) = future::join4( + async { FILTERS.load().await.context("Failed to load filters") }, + async { DASHBOARDS.load().await.context("Failed to load dashboards") }, + async { + get_alert_manager().await; + let guard = ALERTS.write().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(anyhow::Error::msg("No AlertManager set")); + }; + alerts.load().await + }, + async { TARGETS.load().await.context("Failed to load targets") }, + ) + .await; if let Err(err) = filters_result { error!("{err}"); diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index e9c006914..3345246f4 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -54,7 +54,6 @@ impl ParseableServer for QueryServer { config .service( web::scope(&base_path()) - .service(Server::get_correlation_webscope()) .service(Server::get_query_factory().wrap(from_fn( resource_check::check_resource_utilization_middleware, ))) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 29a977c3d..f83261d9d 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -77,7 +77,6 @@ impl ParseableServer for Server { config .service( web::scope(&base_path()) - .service(Self::get_correlation_webscope()) .service(Self::get_query_factory().wrap(from_fn( resource_check::check_resource_utilization_middleware, ))) @@ -220,41 +219,6 @@ impl Server { ) } - pub fn get_correlation_webscope() -> Scope { - web::scope("/correlation") - .service( - web::resource("") - .route( - web::get() - .to(http::correlation::list) - .authorize(Action::GetCorrelation), - ) - .route( - web::post() - .to(http::correlation::post) - .authorize(Action::CreateCorrelation), - ), - ) - .service( - web::resource("/{correlation_id}") - .route( - web::get() - .to(http::correlation::get) - .authorize(Action::GetCorrelation), - ) - .route( - web::put() - .to(http::correlation::modify) - .authorize(Action::PutCorrelation), - ) - .route( - web::delete() - .to(http::correlation::delete) - .authorize(Action::DeleteCorrelation), - ), - ) - } - pub fn get_alerts_webscope() -> Scope { web::scope("/alerts") .service( diff --git a/src/handlers/http/users/mod.rs b/src/handlers/http/users/mod.rs index 8d05c8230..fe2c4a3a1 100644 --- a/src/handlers/http/users/mod.rs +++ b/src/handlers/http/users/mod.rs @@ -22,4 +22,3 @@ pub mod filters; pub const USERS_ROOT_DIR: &str = ".users"; pub const DASHBOARDS_DIR: &str = "dashboards"; pub const FILTER_DIR: &str = "filters"; -pub const CORRELATION_DIR: &str = "correlations"; diff --git a/src/lib.rs b/src/lib.rs index 5c3704d5c..b79c4f313 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,6 @@ pub mod catalog; mod cli; #[cfg(feature = "kafka")] pub mod connectors; -pub mod correlation; pub mod enterprise; pub mod event; pub mod handlers; diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs index 79745ecf8..b13b73637 100644 --- a/src/metastore/metastore_traits.rs +++ b/src/metastore/metastore_traits.rs @@ -206,19 +206,6 @@ pub trait Metastore: std::fmt::Debug + Send + Sync { tenant_id: &Option, ) -> Result<(), MetastoreError>; - /// correlations - async fn get_correlations(&self) -> Result>, MetastoreError>; - async fn put_correlation( - &self, - obj: &dyn MetastoreObject, - tenant_id: &Option, - ) -> Result<(), MetastoreError>; - async fn delete_correlation( - &self, - obj: &dyn MetastoreObject, - tenant_id: &Option, - ) -> Result<(), MetastoreError>; - /// stream metadata /// `get_base` when set to true, will fetch the stream.json present at the base of /// the stream (independent of Mode of server) diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 904bcca24..bdb2d05f9 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -789,66 +789,6 @@ impl Metastore for ObjectStoreMetastore { .await?) } - /// Get all correlations - async fn get_correlations(&self) -> Result>, MetastoreError> { - let mut correlations = HashMap::new(); - let base_paths = PARSEABLE.list_tenants().unwrap_or_else(|| vec!["".into()]); - for mut tenant in base_paths { - let tenant_id = &Some(tenant.clone()); - let mut corrs = Vec::new(); - let users_dir = RelativePathBuf::from_iter([&tenant, USERS_ROOT_DIR]); - for user in self - .storage - .list_dirs_relative(&users_dir, tenant_id) - .await? - { - let correlations_path = users_dir.join(&user).join("correlations"); - let correlation_bytes = self - .storage - .get_objects( - Some(&correlations_path), - Box::new(|file_name| file_name.ends_with(".json")), - tenant_id, - ) - .await?; - - corrs.extend(correlation_bytes); - } - if tenant.is_empty() { - tenant.clone_from(&DEFAULT_TENANT.to_string()); - } - correlations.insert(tenant, corrs); - } - Ok(correlations) - } - - /// Save a correlation - async fn put_correlation( - &self, - obj: &dyn MetastoreObject, - tenant_id: &Option, - ) -> Result<(), MetastoreError> { - let path = obj.get_object_path(); - Ok(self - .storage - .put_object(&RelativePathBuf::from(path), to_bytes(obj), tenant_id) - .await?) - } - - /// Delete a correlation - async fn delete_correlation( - &self, - obj: &dyn MetastoreObject, - tenant_id: &Option, - ) -> Result<(), MetastoreError> { - let path = obj.get_object_path(); - - Ok(self - .storage - .delete_object(&RelativePathBuf::from(path), tenant_id) - .await?) - } - /// Fetch an `ObjectStoreFormat` file /// /// If `get_base` is true, get the one at the base of the stream directory else depends on Mode diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index f005f3d96..f6378b49f 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -23,7 +23,6 @@ use tracing::error; use crate::{ alerts::{ALERTS, AlertError, AlertState}, - correlation::{CORRELATIONS, CorrelationError}, event::format::{LogSource, LogSourceEntry}, handlers::{DatasetTag, TelemetryType, http::logstream::error::StreamError}, metastore::MetastoreError, @@ -86,7 +85,6 @@ pub struct HomeResponse { #[derive(Debug, Serialize)] pub enum ResourceType { Alert, - Correlation, Dashboard, Filter, DataSet, @@ -269,9 +267,8 @@ pub async fn generate_home_search_response( let (user_id, _) = Users .get_userid_from_session(key) .expect("Should be a valid user session"); - let (alert_titles, correlation_titles, dashboard_titles, filter_titles, stream_titles) = tokio::join!( + let (alert_titles, dashboard_titles, filter_titles, stream_titles) = tokio::join!( get_alert_titles(key, query_value), - get_correlation_titles(key, query_value), get_dashboard_titles(user_id, query_value, tenant_id), get_filter_titles(key, query_value), get_stream_titles(key, tenant_id) @@ -279,8 +276,6 @@ pub async fn generate_home_search_response( let alerts = alert_titles?; resources.extend(alerts); - let correlations = correlation_titles?; - resources.extend(correlations); let dashboards = dashboard_titles?; resources.extend(dashboards); let filters = filter_titles?; @@ -354,32 +349,6 @@ async fn get_alert_titles( Ok(alerts) } -async fn get_correlation_titles( - key: &SessionKey, - query_value: &str, -) -> Result, PrismHomeError> { - let correlations = CORRELATIONS - .list_correlations(key) - .await? - .iter() - .filter_map(|correlation| { - if correlation.title.to_lowercase().contains(query_value) - || correlation.id.to_lowercase().contains(query_value) - { - Some(Resource { - id: correlation.id.to_string(), - name: correlation.title.clone(), - resource_type: ResourceType::Correlation, - }) - } else { - None - } - }) - .collect_vec(); - - Ok(correlations) -} - async fn get_dashboard_titles( user_id: String, query_value: &str, @@ -443,8 +412,6 @@ pub enum PrismHomeError { Anyhow(#[from] anyhow::Error), #[error("AlertError: {0}")] AlertError(#[from] AlertError), - #[error("CorrelationError: {0}")] - CorrelationError(#[from] CorrelationError), #[error("StreamError: {0}")] StreamError(#[from] StreamError), #[error("ObjectStorageError: {0}")] @@ -460,7 +427,6 @@ impl actix_web::ResponseError for PrismHomeError { match self { PrismHomeError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismHomeError::AlertError(e) => e.status_code(), - PrismHomeError::CorrelationError(e) => e.status_code(), PrismHomeError::StreamError(e) => e.status_code(), PrismHomeError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismHomeError::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST,