Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ All notable changes to this project will be documented in this file.
- Support `--type outbound-icmp` in geolocation `user add-target`, `remove-target`, and `get` commands
- Add sentinel admin commands to find and create multicast publishers for IBRL validators
- handle non-user owned disconnects gracefully
- Add user's multicast pub/sub groups if applicable to `status`
- Sentinel
- Add multicast publisher worker with Solana RPC-based validator discovery
- SDK
Expand Down
152 changes: 150 additions & 2 deletions client/doublezero/src/command/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
command::util,
requirements::check_doublezero,
servicecontroller::{
DoubleZeroStatus, ServiceController, ServiceControllerImpl, StatusResponse,
DoubleZeroStatus, MulticastGroups, ServiceController, ServiceControllerImpl, StatusResponse,
},
};
use clap::Args;
Expand Down Expand Up @@ -33,6 +33,19 @@ struct AppendedStatusResponse {
metro: String,
#[tabled(rename = "Network")]
network: String,
#[tabled(rename = "Multicast Groups")]
multicast_groups: String,
}

fn format_multicast_groups(groups: &MulticastGroups) -> String {
let mut parts = Vec::new();
for code in &groups.publisher {
parts.push(format!("P:{code}"));
}
for code in &groups.subscriber {
parts.push(format!("S:{code}"));
}
parts.join(",")
}

impl StatusCliCommand {
Expand Down Expand Up @@ -81,6 +94,7 @@ impl StatusCliCommand {
} else {
v2_status.network.clone()
},
multicast_groups: String::new(),
}]);
}

Expand Down Expand Up @@ -124,6 +138,7 @@ impl StatusCliCommand {
metro,
network: network.clone(),
tenant: svc.tenant.clone(),
multicast_groups: format_multicast_groups(&svc.multicast_groups),
});
}

Expand All @@ -136,7 +151,7 @@ impl StatusCliCommand {
mod tests {
use super::*;
use crate::servicecontroller::{
DoubleZeroStatus, MockServiceController, V2ServiceStatus, V2StatusResponse,
DoubleZeroStatus, MockServiceController, MulticastGroups, V2ServiceStatus, V2StatusResponse,
};
use doublezero_cli::doublezerocommand::MockCliCommand;

Expand Down Expand Up @@ -169,6 +184,7 @@ mod tests {
lowest_latency_device: lowest_latency_device.to_string(),
metro: metro.to_string(),
tenant: tenant.to_string(),
multicast_groups: MulticastGroups::default(),
}
}

Expand Down Expand Up @@ -247,6 +263,7 @@ mod tests {
lowest_latency_device: "device2".to_string(),
metro: String::new(),
tenant: String::new(),
multicast_groups: MulticastGroups::default(),
}],
})
});
Expand Down Expand Up @@ -337,6 +354,7 @@ mod tests {
lowest_latency_device: "device1".to_string(),
metro: "metro".to_string(),
tenant: String::new(),
multicast_groups: MulticastGroups::default(),
}],
})
});
Expand Down Expand Up @@ -382,6 +400,7 @@ mod tests {
metro: "amsterdam".to_string(),
network: "Testnet".to_string(),
tenant: "".to_string(),
multicast_groups: String::new(),
};

// JSON output is an array of status responses
Expand Down Expand Up @@ -412,6 +431,11 @@ mod tests {
assert!(status.get("metro").is_some(), "Missing 'metro' field");
assert!(status.get("network").is_some(), "Missing 'network' field");
assert!(status.get("tenant").is_some(), "Missing 'tenant' field");
assert!(
status.get("multicast_groups").is_some(),
"Missing 'multicast_groups' field"
);
assert_eq!(status.get("multicast_groups").unwrap(), "");

// Validate response nested fields
let response = status.get("response").unwrap();
Expand Down Expand Up @@ -491,6 +515,7 @@ mod tests {
metro: "amsterdam".to_string(),
network: "Testnet".to_string(),
tenant: "".to_string(),
multicast_groups: String::new(),
};

// JSON output is an array of status responses
Expand Down Expand Up @@ -521,6 +546,9 @@ mod tests {

// user_type should still be present
assert_eq!(response.get("user_type").unwrap(), "Multicast");

// multicast_groups should be present and empty
assert_eq!(status.get("multicast_groups").unwrap(), "");
}

#[tokio::test]
Expand Down Expand Up @@ -653,4 +681,124 @@ mod tests {
.unwrap();
assert_eq!(result[0].lowest_latency_device, "⚠️ device2");
}

#[tokio::test]
async fn test_status_command_multicast_groups_display() {
let mock_command = MockCliCommand::new();
let mut mock_controller = MockServiceController::new();

mock_controller.expect_v2_status().returning(|| {
Ok(V2StatusResponse {
reconciler_enabled: true,
client_ip: String::new(),
network: "testnet".to_string(),
services: vec![V2ServiceStatus {
status: StatusResponse {
doublezero_status: DoubleZeroStatus {
session_status: "BGP Session Up".to_string(),
last_session_update: Some(1625247600),
},
tunnel_name: Some("doublezero1".to_string()),
tunnel_src: Some("10.10.10.10".to_string()),
tunnel_dst: Some("5.6.7.8".to_string()),
doublezero_ip: None,
user_type: Some("Multicast".to_string()),
},
current_device: "device1".to_string(),
lowest_latency_device: "device1".to_string(),
metro: "metro".to_string(),
tenant: String::new(),
multicast_groups: MulticastGroups {
publisher: vec!["solana-lv".to_string()],
subscriber: vec!["solana-ams".to_string()],
},
}],
})
});

let result = StatusCliCommand { json: true }
.command_impl(&mock_command, &mock_controller)
.await;

assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].multicast_groups, "P:solana-lv,S:solana-ams");
}

#[test]
fn test_multicast_groups_serde_default() {
let json = r#"{
"doublezero_status": {"session_status": "BGP Session Up", "last_session_update": null},
"tunnel_name": null, "tunnel_src": null, "tunnel_dst": null,
"doublezero_ip": null, "user_type": "IBRL",
"current_device": "dz1", "lowest_latency_device": "dz1",
"metro": "ams", "tenant": ""
}"#;
let svc: V2ServiceStatus = serde_json::from_str(json).unwrap();
assert!(svc.multicast_groups.publisher.is_empty());
assert!(svc.multicast_groups.subscriber.is_empty());
}

#[test]
fn test_multicast_groups_serde_populated() {
let json = r#"{
"doublezero_status": {"session_status": "BGP Session Up", "last_session_update": null},
"tunnel_name": "doublezero1", "tunnel_src": "10.0.0.1", "tunnel_dst": "5.6.7.8",
"doublezero_ip": null, "user_type": "Multicast",
"current_device": "dz1", "lowest_latency_device": "dz1",
"metro": "ams", "tenant": "acme",
"multicast_groups": {
"publisher": ["solana-lv"],
"subscriber": ["solana-ams", "solana-fra"]
}
}"#;
let svc: V2ServiceStatus = serde_json::from_str(json).unwrap();
assert_eq!(svc.multicast_groups.publisher, vec!["solana-lv"]);
assert_eq!(
svc.multicast_groups.subscriber,
vec!["solana-ams", "solana-fra"]
);
}

#[test]
fn test_multicast_groups_serde_empty_arrays() {
let json = r#"{
"doublezero_status": {"session_status": "BGP Session Up", "last_session_update": null},
"tunnel_name": null, "tunnel_src": null, "tunnel_dst": null,
"doublezero_ip": "10.0.0.1", "user_type": "IBRL",
"current_device": "dz1", "lowest_latency_device": "dz1",
"metro": "ams", "tenant": "",
"multicast_groups": {"publisher": [], "subscriber": []}
}"#;
let svc: V2ServiceStatus = serde_json::from_str(json).unwrap();
assert!(svc.multicast_groups.publisher.is_empty());
assert!(svc.multicast_groups.subscriber.is_empty());
}

#[test]
fn test_format_multicast_groups() {
assert_eq!(format_multicast_groups(&MulticastGroups::default()), "");
assert_eq!(
format_multicast_groups(&MulticastGroups {
publisher: vec!["solana-lv".to_string()],
subscriber: vec![],
}),
"P:solana-lv"
);
assert_eq!(
format_multicast_groups(&MulticastGroups {
publisher: vec![],
subscriber: vec!["solana-ams".to_string()],
}),
"S:solana-ams"
);
assert_eq!(
format_multicast_groups(&MulticastGroups {
publisher: vec!["solana-lv".to_string()],
subscriber: vec!["solana-ams".to_string(), "solana-fra".to_string()],
}),
"P:solana-lv,S:solana-ams,S:solana-fra"
);
}
}
10 changes: 10 additions & 0 deletions client/doublezero/src/servicecontroller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ fn parse_daemon_response<T: serde::de::DeserializeOwned>(
}
}

#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
pub struct MulticastGroups {
#[serde(default)]
pub publisher: Vec<String>,
#[serde(default)]
pub subscriber: Vec<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct V2ServiceStatus {
#[serde(flatten)]
Expand All @@ -158,6 +166,8 @@ pub struct V2ServiceStatus {
pub metro: String,
#[serde(default)]
pub tenant: String,
#[serde(default)]
pub multicast_groups: MulticastGroups,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
52 changes: 40 additions & 12 deletions client/doublezerod/internal/manager/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@ import (
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
)

// MulticastGroups contains the group codes a user publishes to and subscribes to.
type MulticastGroups struct {
Publisher []string `json:"publisher"`
Subscriber []string `json:"subscriber"`
}

// V2ServiceStatus wraps a StatusResponse with enriched fields.
type V2ServiceStatus struct {
*api.StatusResponse
CurrentDevice string `json:"current_device"`
CurrentDeviceRttNanoseconds int64 `json:"current_device_rtt_nanoseconds,omitempty"`
CurrentDeviceLossPercentage float64 `json:"current_device_loss_percentage,omitempty"`
LowestLatencyDevice string `json:"lowest_latency_device"`
Metro string `json:"metro"`
Tenant string `json:"tenant"`
CurrentDevice string `json:"current_device"`
CurrentDeviceRttNanoseconds int64 `json:"current_device_rtt_nanoseconds,omitempty"`
CurrentDeviceLossPercentage float64 `json:"current_device_loss_percentage,omitempty"`
LowestLatencyDevice string `json:"lowest_latency_device"`
Metro string `json:"metro"`
Tenant string `json:"tenant"`
MulticastGroups MulticastGroups `json:"multicast_groups"`
}

// V2StatusResponse is the response for the /v2/status endpoint.
Expand Down Expand Up @@ -245,10 +252,11 @@ func (n *NetlinkManager) enrichStatuses(statuses []*api.StatusResponse) []V2Serv

// Build lookup maps from program data.
var (
devicesByPK map[[32]byte]serviceability.Device
exchangesByPK map[[32]byte]serviceability.Exchange
tenantsByPK map[[32]byte]serviceability.Tenant
users []serviceability.User
devicesByPK map[[32]byte]serviceability.Device
exchangesByPK map[[32]byte]serviceability.Exchange
tenantsByPK map[[32]byte]serviceability.Tenant
mcastGroupsByPK map[[32]byte]serviceability.MulticastGroup
users []serviceability.User
)
if data != nil {
devicesByPK = make(map[[32]byte]serviceability.Device, len(data.Devices))
Expand All @@ -263,6 +271,10 @@ func (n *NetlinkManager) enrichStatuses(statuses []*api.StatusResponse) []V2Serv
for _, t := range data.Tenants {
tenantsByPK[t.PubKey] = t
}
mcastGroupsByPK = make(map[[32]byte]serviceability.MulticastGroup, len(data.MulticastGroups))
for _, mg := range data.MulticastGroups {
mcastGroupsByPK[mg.PubKey] = mg
}
users = data.Users
}

Expand All @@ -277,7 +289,13 @@ func (n *NetlinkManager) enrichStatuses(statuses []*api.StatusResponse) []V2Serv

enriched := make([]V2ServiceStatus, 0, len(statuses))
for _, svc := range statuses {
es := V2ServiceStatus{StatusResponse: svc}
es := V2ServiceStatus{
StatusResponse: svc,
MulticastGroups: MulticastGroups{
Publisher: []string{},
Subscriber: []string{},
},
}

if data == nil {
enriched = append(enriched, es)
Expand Down Expand Up @@ -316,7 +334,7 @@ func (n *NetlinkManager) enrichStatuses(statuses []*api.StatusResponse) []V2Serv

// Fallback: match by client_ip + user_type (e.g. multicast subscribers
// whose tunnel endpoint differs from the device public IP).
if matchedDevice == nil {
if matchedUser == nil {
clientIP4 := n.clientIP.To4()
for i := range users {
u := &users[i]
Expand Down Expand Up @@ -353,6 +371,16 @@ func (n *NetlinkManager) enrichStatuses(statuses []*api.StatusResponse) []V2Serv
es.Tenant = t.Code
}
}
for _, pk := range matchedUser.Publishers {
if mg, ok := mcastGroupsByPK[pk]; ok {
es.MulticastGroups.Publisher = append(es.MulticastGroups.Publisher, mg.Code)
}
}
for _, pk := range matchedUser.Subscribers {
if mg, ok := mcastGroupsByPK[pk]; ok {
es.MulticastGroups.Subscriber = append(es.MulticastGroups.Subscriber, mg.Code)
}
}
}

// Compute lowest latency device.
Expand Down
Loading
Loading