diff --git a/src/correlation.rs b/src/correlation.rs
deleted file mode 100644
index 16fde8a48..000000000
--- a/src/correlation.rs
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Parseable Server (C) 2022 - 2025 Parseable, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-use std::collections::{HashMap, HashSet};
-
-use actix_web::http::StatusCode;
-use actix_web::{Error, http::header::ContentType};
-use chrono::Utc;
-use datafusion::error::DataFusionError;
-use itertools::Itertools;
-use once_cell::sync::Lazy;
-use relative_path::RelativePathBuf;
-use serde::{Deserialize, Serialize};
-use serde_json::Error as SerdeError;
-use tokio::sync::RwLock;
-use tracing::error;
-
-use crate::{
- handlers::http::{
- rbac::RBACError,
- users::{CORRELATION_DIR, USERS_ROOT_DIR},
- },
- metastore::{MetastoreError, metastore_traits::MetastoreObject},
- parseable::{DEFAULT_TENANT, PARSEABLE},
- query::QUERY_SESSION,
- rbac::{Users, map::SessionKey},
- storage::ObjectStorageError,
- users::filters::FilterQuery,
- utils::{get_hash, get_tenant_id_from_key, user_auth_for_datasets},
-};
-
-pub static CORRELATIONS: Lazy = Lazy::new(Correlations::default);
-
-type CorrelationMap = HashMap;
-
-#[derive(Debug, Default, derive_more::Deref)]
-pub struct Correlations(RwLock>);
-
-impl Correlations {
- // Load correlations from storage
- pub async fn load(&self) -> anyhow::Result<()> {
- let all_correlations = PARSEABLE.metastore.get_correlations().await?;
-
- let mut guard = self.write().await;
-
- for (tenant_id, correlations_bytes) in all_correlations {
- let mut corrs = HashMap::new();
- for corr in correlations_bytes {
- let correlation = match serde_json::from_slice::(&corr) {
- Ok(c) => c,
- Err(e) => {
- error!("Unable to load correlation file : {e}");
- continue;
- }
- };
- corrs.insert(correlation.id.clone(), correlation);
- }
-
- guard.insert(tenant_id, corrs);
- }
-
- Ok(())
- }
-
- pub async fn list_correlations(
- &self,
- session_key: &SessionKey,
- ) -> Result, CorrelationError> {
- let mut user_correlations = vec![];
- let permissions = Users.get_permissions(session_key);
- let tenant_id = get_tenant_id_from_key(session_key);
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
- if let Some(corrs) = self.read().await.get(tenant) {
- for correlation in corrs.values() {
- let tables = &correlation
- .table_configs
- .iter()
- .map(|t| t.table_name.clone())
- .collect_vec();
- if user_auth_for_datasets(&permissions, tables, &tenant_id)
- .await
- .is_ok()
- {
- user_correlations.push(correlation.clone());
- }
- }
- }
-
- Ok(user_correlations)
- }
-
- pub async fn get_correlation(
- &self,
- correlation_id: &str,
- tenant_id: &Option,
- ) -> Result {
- let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
- if let Some(corrs) = self.read().await.get(tenant_id) {
- corrs.get(correlation_id).cloned().ok_or_else(|| {
- CorrelationError::AnyhowError(anyhow::Error::msg(format!(
- "Unable to find correlation with ID- {correlation_id}"
- )))
- })
- } else {
- Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
- "Unable to find correlation with ID- {correlation_id}"
- ))))
- }
- }
-
- /// Create correlation associated with the user
- pub async fn create(
- &self,
- mut correlation: CorrelationConfig,
- session_key: &SessionKey,
- ) -> Result {
- correlation.id = get_hash(Utc::now().timestamp_micros().to_string().as_str());
- correlation.validate(session_key).await?;
- let tenant_id = get_tenant_id_from_key(session_key);
- // Update in metastore
- PARSEABLE
- .metastore
- .put_correlation(&correlation, &tenant_id)
- .await?;
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
- // Update in memory
- self.write()
- .await
- .entry(tenant.to_string())
- .or_default()
- .insert(correlation.id.to_owned(), correlation.clone());
-
- Ok(correlation)
- }
-
- /// Update existing correlation for the user and with the same ID
- pub async fn update(
- &self,
- mut updated_correlation: CorrelationConfig,
- session_key: &SessionKey,
- ) -> Result {
- let tenant_id = get_tenant_id_from_key(session_key);
- // validate whether user has access to this correlation object or not
- let correlation = self
- .get_correlation(&updated_correlation.id, &tenant_id)
- .await?;
- if correlation.user_id != updated_correlation.user_id {
- return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
- r#"User "{}" isn't authorized to update correlation with ID - {}"#,
- updated_correlation.user_id, correlation.id
- ))));
- }
-
- correlation.validate(session_key).await?;
- updated_correlation.update(correlation);
-
- // Update in metastore
- PARSEABLE
- .metastore
- .put_correlation(&updated_correlation, &tenant_id)
- .await?;
-
- let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
- // Update in memory
- if let Some(corrs) = self.write().await.get_mut(tenant) {
- corrs.insert(
- updated_correlation.id.to_owned(),
- updated_correlation.clone(),
- );
- }
-
- Ok(updated_correlation)
- }
-
- /// Delete correlation from memory and storage
- pub async fn delete(
- &self,
- correlation_id: &str,
- user_id: &str,
- tenant_id: &Option,
- ) -> Result<(), CorrelationError> {
- let correlation = CORRELATIONS
- .get_correlation(correlation_id, tenant_id)
- .await?;
- if correlation.user_id != user_id {
- return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
- r#"User "{user_id}" isn't authorized to delete correlation with ID - {correlation_id}"#
- ))));
- }
-
- // Delete from storage
- PARSEABLE
- .metastore
- .delete_correlation(&correlation, tenant_id)
- .await?;
-
- // Delete from memory
- self.write()
- .await
- .entry(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT).to_owned())
- .or_default()
- .remove(&correlation.id);
-
- Ok(())
- }
-}
-
-#[derive(Debug, Clone, Default, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub enum CorrelationVersion {
- #[default]
- V1,
-}
-
-type CorrelationId = String;
-type UserId = String;
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct CorrelationConfig {
- #[serde(default)]
- pub version: CorrelationVersion,
- pub title: String,
- #[serde(default)]
- pub id: CorrelationId,
- #[serde(default)]
- pub user_id: UserId,
- pub table_configs: Vec,
- pub join_config: JoinConfig,
- pub filter: Option,
- pub start_time: Option,
- pub end_time: Option,
-}
-
-impl MetastoreObject for CorrelationConfig {
- fn get_object_path(&self) -> String {
- self.path().to_string()
- }
-
- fn get_object_id(&self) -> String {
- self.id.clone()
- }
-}
-
-impl CorrelationConfig {
- pub fn path(&self) -> RelativePathBuf {
- RelativePathBuf::from_iter([
- USERS_ROOT_DIR,
- &self.user_id,
- CORRELATION_DIR,
- &format!("{}.json", self.id),
- ])
- }
-
- pub fn update(&mut self, update: Self) {
- self.title = update.title;
- self.table_configs = update.table_configs;
- self.join_config = update.join_config;
- self.filter = update.filter;
- self.start_time = update.start_time;
- self.end_time = update.end_time;
- }
-
- /// This function will validate the TableConfigs, JoinConfig, and user auth
- pub async fn validate(&self, session_key: &SessionKey) -> Result<(), CorrelationError> {
- let tenant_id = get_tenant_id_from_key(session_key);
- let ctx = &QUERY_SESSION.get_ctx();
- let h1: HashSet<&String> = self.table_configs.iter().map(|t| &t.table_name).collect();
- let h2: HashSet<&String> = self
- .join_config
- .join_conditions
- .iter()
- .map(|j| &j.table_name)
- .collect();
-
- // check if table config tables are the same
- if h1.len() != 2 {
- return Err(CorrelationError::Metadata(
- "Must provide config for two unique tables",
- ));
- }
-
- // check that the tables mentioned in join config are
- // the same as those in table config
- if h1 != h2 {
- return Err(CorrelationError::Metadata(
- "Must provide same tables for join config and table config",
- ));
- }
-
- // check if user has access to table
- let permissions = Users.get_permissions(session_key);
- let tables = &self
- .table_configs
- .iter()
- .map(|t| t.table_name.clone())
- .collect_vec();
-
- user_auth_for_datasets(&permissions, tables, &tenant_id).await?;
-
- // to validate table config, we need to check whether the mentioned fields
- // are present in the table or not
- for table_config in self.table_configs.iter() {
- // table config check
- let df = ctx.table(&table_config.table_name).await?;
-
- let mut selected_fields = table_config
- .selected_fields
- .iter()
- .map(|c| c.as_str())
- .collect_vec();
-
- // unwrap because we have determined that the tables in table config are the same as those in join config
- let condition = self
- .join_config
- .join_conditions
- .iter()
- .find(|j| j.table_name == table_config.table_name)
- .unwrap();
- let join_field = condition.field.as_str();
-
- if !selected_fields.contains(&join_field) {
- selected_fields.push(join_field);
- }
-
- // if this errors out then the table config is incorrect or join config is incorrect
- df.select_columns(selected_fields.as_slice())?;
- }
-
- Ok(())
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum CorrelationError {
- #[error("Failed to connect to storage: {0}")]
- ObjectStorage(#[from] ObjectStorageError),
- #[error("Serde Error: {0}")]
- Serde(#[from] SerdeError),
- #[error("Cannot perform this operation: {0}")]
- Metadata(&'static str),
- #[error("User does not exist")]
- UserDoesNotExist(#[from] RBACError),
- #[error("Error: {0}")]
- AnyhowError(#[from] anyhow::Error),
- #[error("Unauthorized")]
- Unauthorized,
- #[error("DataFusion Error: {0}")]
- DataFusion(#[from] DataFusionError),
- #[error("{0}")]
- ActixError(#[from] Error),
- #[error(transparent)]
- MetastoreError(#[from] MetastoreError),
-}
-
-impl actix_web::ResponseError for CorrelationError {
- fn status_code(&self) -> StatusCode {
- match self {
- Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR,
- Self::Serde(_) => StatusCode::BAD_REQUEST,
- Self::Metadata(_) => StatusCode::BAD_REQUEST,
- Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND,
- Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
- Self::Unauthorized => StatusCode::BAD_REQUEST,
- Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
- Self::ActixError(_) => StatusCode::BAD_REQUEST,
- Self::MetastoreError(e) => e.status_code(),
- }
- }
-
- fn error_response(&self) -> actix_web::HttpResponse {
- match self {
- CorrelationError::MetastoreError(e) => {
- actix_web::HttpResponse::build(self.status_code())
- .insert_header(ContentType::json())
- .json(e.to_detail())
- }
- _ => actix_web::HttpResponse::build(self.status_code())
- .insert_header(ContentType::plaintext())
- .body(self.to_string()),
- }
- }
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct TableConfig {
- pub selected_fields: Vec,
- pub table_name: String,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct JoinCondition {
- pub table_name: String,
- pub field: String,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct JoinConfig {
- pub join_conditions: Vec,
-}
diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs
deleted file mode 100644
index adcb4157c..000000000
--- a/src/handlers/http/correlation.rs
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Parseable Server (C) 2022 - 2025 Parseable, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-use actix_web::web::{Json, Path};
-use actix_web::{HttpRequest, HttpResponse, Responder, web};
-use anyhow::Error;
-use itertools::Itertools;
-
-use crate::rbac::Users;
-use crate::utils::actix::extract_session_key_from_req;
-use crate::utils::{
- get_hash, get_tenant_id_from_request, get_user_and_tenant_from_request, user_auth_for_datasets,
-};
-
-use crate::correlation::{CORRELATIONS, CorrelationConfig, CorrelationError};
-
-pub async fn list(req: HttpRequest) -> Result {
- let session_key = extract_session_key_from_req(&req)
- .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
-
- let correlations = CORRELATIONS.list_correlations(&session_key).await?;
-
- Ok(web::Json(correlations))
-}
-
-pub async fn get(
- req: HttpRequest,
- correlation_id: Path,
-) -> Result {
- let tenant_id = get_tenant_id_from_request(&req);
- let correlation_id = correlation_id.into_inner();
- let session_key = extract_session_key_from_req(&req)
- .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
-
- let correlation = CORRELATIONS
- .get_correlation(&correlation_id, &tenant_id)
- .await?;
-
- let permissions = Users.get_permissions(&session_key);
-
- let tables = &correlation
- .table_configs
- .iter()
- .map(|t| t.table_name.clone())
- .collect_vec();
-
- user_auth_for_datasets(&permissions, tables, &tenant_id).await?;
-
- Ok(web::Json(correlation))
-}
-
-pub async fn post(
- req: HttpRequest,
- Json(mut correlation): Json,
-) -> Result {
- let session_key = extract_session_key_from_req(&req)
- .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
- let user_id = get_user_and_tenant_from_request(&req)
- .map(|(s, _)| get_hash(&s.to_string()))
- .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
- correlation.user_id = user_id;
-
- let correlation = CORRELATIONS.create(correlation, &session_key).await?;
-
- Ok(web::Json(correlation))
-}
-
-pub async fn modify(
- req: HttpRequest,
- correlation_id: Path,
- Json(mut correlation): Json,
-) -> Result {
- correlation.id = correlation_id.into_inner();
- correlation.user_id = get_user_and_tenant_from_request(&req)
- .map(|(s, _)| get_hash(&s.to_string()))
- .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
-
- let session_key = extract_session_key_from_req(&req)
- .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
-
- let correlation = CORRELATIONS.update(correlation, &session_key).await?;
-
- Ok(web::Json(correlation))
-}
-
-pub async fn delete(
- req: HttpRequest,
- correlation_id: Path,
-) -> Result {
- let correlation_id = correlation_id.into_inner();
- let (user_id, tenant_id) = get_user_and_tenant_from_request(&req)
- .map(|(s, t)| (get_hash(&s.to_string()), t))
- .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
-
- CORRELATIONS
- .delete(&correlation_id, &user_id, &tenant_id)
- .await?;
-
- Ok(HttpResponse::Ok().finish())
-}
diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs
index 993a40b10..a2348cf73 100644
--- a/src/handlers/http/mod.rs
+++ b/src/handlers/http/mod.rs
@@ -31,7 +31,6 @@ use self::query::Query;
pub mod about;
pub mod alerts;
pub mod cluster;
-pub mod correlation;
pub mod demo_data;
pub mod health_check;
pub mod ingest;
diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs
index 115f8b643..c464ef53f 100644
--- a/src/handlers/http/modal/mod.rs
+++ b/src/handlers/http/modal/mod.rs
@@ -36,7 +36,6 @@ use tracing::{error, info, warn};
use crate::{
alerts::{ALERTS, get_alert_manager, target::TARGETS},
cli::Options,
- correlation::CORRELATIONS,
hottier::{HotTierManager, StreamHotTier},
metastore::metastore_traits::MetastoreObject,
oauth::{OAuthProvider, connect_oidc},
@@ -169,34 +168,22 @@ pub trait ParseableServer {
pub async fn load_on_init() -> anyhow::Result<()> {
// Run all loading operations concurrently
- let (correlations_result, filters_result, dashboards_result, alerts_result, targets_result) =
- future::join5(
- async {
- CORRELATIONS
- .load()
- .await
- .context("Failed to load correlations")
- },
- async { FILTERS.load().await.context("Failed to load filters") },
- async { DASHBOARDS.load().await.context("Failed to load dashboards") },
- async {
- get_alert_manager().await;
- let guard = ALERTS.write().await;
- let alerts = if let Some(alerts) = guard.as_ref() {
- alerts
- } else {
- return Err(anyhow::Error::msg("No AlertManager set"));
- };
- alerts.load().await
- },
- async { TARGETS.load().await.context("Failed to load targets") },
- )
- .await;
-
- // Handle errors from each operation
- if let Err(e) = correlations_result {
- error!("{e}");
- }
+ let (filters_result, dashboards_result, alerts_result, targets_result) = future::join4(
+ async { FILTERS.load().await.context("Failed to load filters") },
+ async { DASHBOARDS.load().await.context("Failed to load dashboards") },
+ async {
+ get_alert_manager().await;
+ let guard = ALERTS.write().await;
+ let alerts = if let Some(alerts) = guard.as_ref() {
+ alerts
+ } else {
+ return Err(anyhow::Error::msg("No AlertManager set"));
+ };
+ alerts.load().await
+ },
+ async { TARGETS.load().await.context("Failed to load targets") },
+ )
+ .await;
if let Err(err) = filters_result {
error!("{err}");
diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs
index e9c006914..3345246f4 100644
--- a/src/handlers/http/modal/query_server.rs
+++ b/src/handlers/http/modal/query_server.rs
@@ -54,7 +54,6 @@ impl ParseableServer for QueryServer {
config
.service(
web::scope(&base_path())
- .service(Server::get_correlation_webscope())
.service(Server::get_query_factory().wrap(from_fn(
resource_check::check_resource_utilization_middleware,
)))
diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs
index 29a977c3d..f83261d9d 100644
--- a/src/handlers/http/modal/server.rs
+++ b/src/handlers/http/modal/server.rs
@@ -77,7 +77,6 @@ impl ParseableServer for Server {
config
.service(
web::scope(&base_path())
- .service(Self::get_correlation_webscope())
.service(Self::get_query_factory().wrap(from_fn(
resource_check::check_resource_utilization_middleware,
)))
@@ -220,41 +219,6 @@ impl Server {
)
}
- pub fn get_correlation_webscope() -> Scope {
- web::scope("/correlation")
- .service(
- web::resource("")
- .route(
- web::get()
- .to(http::correlation::list)
- .authorize(Action::GetCorrelation),
- )
- .route(
- web::post()
- .to(http::correlation::post)
- .authorize(Action::CreateCorrelation),
- ),
- )
- .service(
- web::resource("/{correlation_id}")
- .route(
- web::get()
- .to(http::correlation::get)
- .authorize(Action::GetCorrelation),
- )
- .route(
- web::put()
- .to(http::correlation::modify)
- .authorize(Action::PutCorrelation),
- )
- .route(
- web::delete()
- .to(http::correlation::delete)
- .authorize(Action::DeleteCorrelation),
- ),
- )
- }
-
pub fn get_alerts_webscope() -> Scope {
web::scope("/alerts")
.service(
diff --git a/src/handlers/http/users/mod.rs b/src/handlers/http/users/mod.rs
index 8d05c8230..fe2c4a3a1 100644
--- a/src/handlers/http/users/mod.rs
+++ b/src/handlers/http/users/mod.rs
@@ -22,4 +22,3 @@ pub mod filters;
pub const USERS_ROOT_DIR: &str = ".users";
pub const DASHBOARDS_DIR: &str = "dashboards";
pub const FILTER_DIR: &str = "filters";
-pub const CORRELATION_DIR: &str = "correlations";
diff --git a/src/lib.rs b/src/lib.rs
index 5c3704d5c..b79c4f313 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -24,7 +24,6 @@ pub mod catalog;
mod cli;
#[cfg(feature = "kafka")]
pub mod connectors;
-pub mod correlation;
pub mod enterprise;
pub mod event;
pub mod handlers;
diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs
index 79745ecf8..b13b73637 100644
--- a/src/metastore/metastore_traits.rs
+++ b/src/metastore/metastore_traits.rs
@@ -206,19 +206,6 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
tenant_id: &Option,
) -> Result<(), MetastoreError>;
- /// correlations
- async fn get_correlations(&self) -> Result>, MetastoreError>;
- async fn put_correlation(
- &self,
- obj: &dyn MetastoreObject,
- tenant_id: &Option,
- ) -> Result<(), MetastoreError>;
- async fn delete_correlation(
- &self,
- obj: &dyn MetastoreObject,
- tenant_id: &Option,
- ) -> Result<(), MetastoreError>;
-
/// stream metadata
/// `get_base` when set to true, will fetch the stream.json present at the base of
/// the stream (independent of Mode of server)
diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs
index 904bcca24..bdb2d05f9 100644
--- a/src/metastore/metastores/object_store_metastore.rs
+++ b/src/metastore/metastores/object_store_metastore.rs
@@ -789,66 +789,6 @@ impl Metastore for ObjectStoreMetastore {
.await?)
}
- /// Get all correlations
- async fn get_correlations(&self) -> Result>, MetastoreError> {
- let mut correlations = HashMap::new();
- let base_paths = PARSEABLE.list_tenants().unwrap_or_else(|| vec!["".into()]);
- for mut tenant in base_paths {
- let tenant_id = &Some(tenant.clone());
- let mut corrs = Vec::new();
- let users_dir = RelativePathBuf::from_iter([&tenant, USERS_ROOT_DIR]);
- for user in self
- .storage
- .list_dirs_relative(&users_dir, tenant_id)
- .await?
- {
- let correlations_path = users_dir.join(&user).join("correlations");
- let correlation_bytes = self
- .storage
- .get_objects(
- Some(&correlations_path),
- Box::new(|file_name| file_name.ends_with(".json")),
- tenant_id,
- )
- .await?;
-
- corrs.extend(correlation_bytes);
- }
- if tenant.is_empty() {
- tenant.clone_from(&DEFAULT_TENANT.to_string());
- }
- correlations.insert(tenant, corrs);
- }
- Ok(correlations)
- }
-
- /// Save a correlation
- async fn put_correlation(
- &self,
- obj: &dyn MetastoreObject,
- tenant_id: &Option,
- ) -> Result<(), MetastoreError> {
- let path = obj.get_object_path();
- Ok(self
- .storage
- .put_object(&RelativePathBuf::from(path), to_bytes(obj), tenant_id)
- .await?)
- }
-
- /// Delete a correlation
- async fn delete_correlation(
- &self,
- obj: &dyn MetastoreObject,
- tenant_id: &Option,
- ) -> Result<(), MetastoreError> {
- let path = obj.get_object_path();
-
- Ok(self
- .storage
- .delete_object(&RelativePathBuf::from(path), tenant_id)
- .await?)
- }
-
/// Fetch an `ObjectStoreFormat` file
///
/// If `get_base` is true, get the one at the base of the stream directory else depends on Mode
diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs
index f005f3d96..f6378b49f 100644
--- a/src/prism/home/mod.rs
+++ b/src/prism/home/mod.rs
@@ -23,7 +23,6 @@ use tracing::error;
use crate::{
alerts::{ALERTS, AlertError, AlertState},
- correlation::{CORRELATIONS, CorrelationError},
event::format::{LogSource, LogSourceEntry},
handlers::{DatasetTag, TelemetryType, http::logstream::error::StreamError},
metastore::MetastoreError,
@@ -86,7 +85,6 @@ pub struct HomeResponse {
#[derive(Debug, Serialize)]
pub enum ResourceType {
Alert,
- Correlation,
Dashboard,
Filter,
DataSet,
@@ -269,9 +267,8 @@ pub async fn generate_home_search_response(
let (user_id, _) = Users
.get_userid_from_session(key)
.expect("Should be a valid user session");
- let (alert_titles, correlation_titles, dashboard_titles, filter_titles, stream_titles) = tokio::join!(
+ let (alert_titles, dashboard_titles, filter_titles, stream_titles) = tokio::join!(
get_alert_titles(key, query_value),
- get_correlation_titles(key, query_value),
get_dashboard_titles(user_id, query_value, tenant_id),
get_filter_titles(key, query_value),
get_stream_titles(key, tenant_id)
@@ -279,8 +276,6 @@ pub async fn generate_home_search_response(
let alerts = alert_titles?;
resources.extend(alerts);
- let correlations = correlation_titles?;
- resources.extend(correlations);
let dashboards = dashboard_titles?;
resources.extend(dashboards);
let filters = filter_titles?;
@@ -354,32 +349,6 @@ async fn get_alert_titles(
Ok(alerts)
}
-async fn get_correlation_titles(
- key: &SessionKey,
- query_value: &str,
-) -> Result, PrismHomeError> {
- let correlations = CORRELATIONS
- .list_correlations(key)
- .await?
- .iter()
- .filter_map(|correlation| {
- if correlation.title.to_lowercase().contains(query_value)
- || correlation.id.to_lowercase().contains(query_value)
- {
- Some(Resource {
- id: correlation.id.to_string(),
- name: correlation.title.clone(),
- resource_type: ResourceType::Correlation,
- })
- } else {
- None
- }
- })
- .collect_vec();
-
- Ok(correlations)
-}
-
async fn get_dashboard_titles(
user_id: String,
query_value: &str,
@@ -443,8 +412,6 @@ pub enum PrismHomeError {
Anyhow(#[from] anyhow::Error),
#[error("AlertError: {0}")]
AlertError(#[from] AlertError),
- #[error("CorrelationError: {0}")]
- CorrelationError(#[from] CorrelationError),
#[error("StreamError: {0}")]
StreamError(#[from] StreamError),
#[error("ObjectStorageError: {0}")]
@@ -460,7 +427,6 @@ impl actix_web::ResponseError for PrismHomeError {
match self {
PrismHomeError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismHomeError::AlertError(e) => e.status_code(),
- PrismHomeError::CorrelationError(e) => e.status_code(),
PrismHomeError::StreamError(e) => e.status_code(),
PrismHomeError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismHomeError::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST,