Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
418 changes: 0 additions & 418 deletions src/correlation.rs

This file was deleted.

115 changes: 0 additions & 115 deletions src/handlers/http/correlation.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use self::query::Query;
pub mod about;
pub mod alerts;
pub mod cluster;
pub mod correlation;
pub mod demo_data;
pub mod health_check;
pub mod ingest;
Expand Down
45 changes: 16 additions & 29 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use tracing::{error, info, warn};
use crate::{
alerts::{ALERTS, get_alert_manager, target::TARGETS},
cli::Options,
correlation::CORRELATIONS,
hottier::{HotTierManager, StreamHotTier},
metastore::metastore_traits::MetastoreObject,
oauth::{OAuthProvider, connect_oidc},
Expand Down Expand Up @@ -169,34 +168,22 @@ pub trait ParseableServer {

pub async fn load_on_init() -> anyhow::Result<()> {
// Run all loading operations concurrently
let (correlations_result, filters_result, dashboards_result, alerts_result, targets_result) =
future::join5(
async {
CORRELATIONS
.load()
.await
.context("Failed to load correlations")
},
async { FILTERS.load().await.context("Failed to load filters") },
async { DASHBOARDS.load().await.context("Failed to load dashboards") },
async {
get_alert_manager().await;
let guard = ALERTS.write().await;
let alerts = if let Some(alerts) = guard.as_ref() {
alerts
} else {
return Err(anyhow::Error::msg("No AlertManager set"));
};
alerts.load().await
},
async { TARGETS.load().await.context("Failed to load targets") },
)
.await;

// Handle errors from each operation
if let Err(e) = correlations_result {
error!("{e}");
}
let (filters_result, dashboards_result, alerts_result, targets_result) = future::join4(
async { FILTERS.load().await.context("Failed to load filters") },
async { DASHBOARDS.load().await.context("Failed to load dashboards") },
async {
get_alert_manager().await;
let guard = ALERTS.write().await;
let alerts = if let Some(alerts) = guard.as_ref() {
alerts
} else {
return Err(anyhow::Error::msg("No AlertManager set"));
};
alerts.load().await
},
async { TARGETS.load().await.context("Failed to load targets") },
)
.await;

if let Err(err) = filters_result {
error!("{err}");
Expand Down
1 change: 0 additions & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl ParseableServer for QueryServer {
config
.service(
web::scope(&base_path())
.service(Server::get_correlation_webscope())
.service(Server::get_query_factory().wrap(from_fn(
resource_check::check_resource_utilization_middleware,
)))
Expand Down
36 changes: 0 additions & 36 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ impl ParseableServer for Server {
config
.service(
web::scope(&base_path())
.service(Self::get_correlation_webscope())
.service(Self::get_query_factory().wrap(from_fn(
resource_check::check_resource_utilization_middleware,
)))
Expand Down Expand Up @@ -220,41 +219,6 @@ impl Server {
)
}

pub fn get_correlation_webscope() -> Scope {
web::scope("/correlation")
.service(
web::resource("")
.route(
web::get()
.to(http::correlation::list)
.authorize(Action::GetCorrelation),
)
.route(
web::post()
.to(http::correlation::post)
.authorize(Action::CreateCorrelation),
),
)
.service(
web::resource("/{correlation_id}")
.route(
web::get()
.to(http::correlation::get)
.authorize(Action::GetCorrelation),
)
.route(
web::put()
.to(http::correlation::modify)
.authorize(Action::PutCorrelation),
)
.route(
web::delete()
.to(http::correlation::delete)
.authorize(Action::DeleteCorrelation),
),
)
}

pub fn get_alerts_webscope() -> Scope {
web::scope("/alerts")
.service(
Expand Down
1 change: 0 additions & 1 deletion src/handlers/http/users/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,3 @@ pub mod filters;
pub const USERS_ROOT_DIR: &str = ".users";
pub const DASHBOARDS_DIR: &str = "dashboards";
pub const FILTER_DIR: &str = "filters";
pub const CORRELATION_DIR: &str = "correlations";
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub mod catalog;
mod cli;
#[cfg(feature = "kafka")]
pub mod connectors;
pub mod correlation;
pub mod enterprise;
pub mod event;
pub mod handlers;
Expand Down
13 changes: 0 additions & 13 deletions src/metastore/metastore_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,6 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
tenant_id: &Option<String>,
) -> Result<(), MetastoreError>;

/// correlations
async fn get_correlations(&self) -> Result<HashMap<String, Vec<Bytes>>, MetastoreError>;
async fn put_correlation(
&self,
obj: &dyn MetastoreObject,
tenant_id: &Option<String>,
) -> Result<(), MetastoreError>;
async fn delete_correlation(
&self,
obj: &dyn MetastoreObject,
tenant_id: &Option<String>,
) -> Result<(), MetastoreError>;

/// stream metadata
/// `get_base` when set to true, will fetch the stream.json present at the base of
/// the stream (independent of Mode of server)
Expand Down
60 changes: 0 additions & 60 deletions src/metastore/metastores/object_store_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,66 +789,6 @@ impl Metastore for ObjectStoreMetastore {
.await?)
}

/// Get all correlations
async fn get_correlations(&self) -> Result<HashMap<String, Vec<Bytes>>, MetastoreError> {
let mut correlations = HashMap::new();
let base_paths = PARSEABLE.list_tenants().unwrap_or_else(|| vec!["".into()]);
for mut tenant in base_paths {
let tenant_id = &Some(tenant.clone());
let mut corrs = Vec::new();
let users_dir = RelativePathBuf::from_iter([&tenant, USERS_ROOT_DIR]);
for user in self
.storage
.list_dirs_relative(&users_dir, tenant_id)
.await?
{
let correlations_path = users_dir.join(&user).join("correlations");
let correlation_bytes = self
.storage
.get_objects(
Some(&correlations_path),
Box::new(|file_name| file_name.ends_with(".json")),
tenant_id,
)
.await?;

corrs.extend(correlation_bytes);
}
if tenant.is_empty() {
tenant.clone_from(&DEFAULT_TENANT.to_string());
}
correlations.insert(tenant, corrs);
}
Ok(correlations)
}

/// Save a correlation
async fn put_correlation(
&self,
obj: &dyn MetastoreObject,
tenant_id: &Option<String>,
) -> Result<(), MetastoreError> {
let path = obj.get_object_path();
Ok(self
.storage
.put_object(&RelativePathBuf::from(path), to_bytes(obj), tenant_id)
.await?)
}

/// Delete a correlation
async fn delete_correlation(
&self,
obj: &dyn MetastoreObject,
tenant_id: &Option<String>,
) -> Result<(), MetastoreError> {
let path = obj.get_object_path();

Ok(self
.storage
.delete_object(&RelativePathBuf::from(path), tenant_id)
.await?)
}

/// Fetch an `ObjectStoreFormat` file
///
/// If `get_base` is true, get the one at the base of the stream directory else depends on Mode
Expand Down
Loading
Loading