diff --git a/Cargo.lock b/Cargo.lock index 1671e7da6..a329b4113 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2239,6 +2239,7 @@ name = "uart-service" version = "0.1.0" dependencies = [ "defmt 0.3.100", + "embassy-futures", "embassy-sync", "embedded-io-async 0.7.0", "embedded-services", diff --git a/battery-service-relay/src/lib.rs b/battery-service-relay/src/lib.rs index 92988ae2f..406903d90 100644 --- a/battery-service-relay/src/lib.rs +++ b/battery-service-relay/src/lib.rs @@ -23,6 +23,9 @@ impl embedded_services::relay::mct { type RequestType = serialization::AcpiBatteryRequest; type ResultType = serialization::AcpiBatteryResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler diff --git a/debug-service/src/debug_service.rs b/debug-service/src/debug_service.rs index ef9762307..85ade54be 100644 --- a/debug-service/src/debug_service.rs +++ b/debug-service/src/debug_service.rs @@ -37,6 +37,9 @@ impl Service { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for Service { type RequestType = DebugRequest; type ResultType = DebugResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler for Service { diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index a24736761..ef010465e 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -143,3 +143,157 @@ impl, F: FnMut(I) -> O> Sender for MapSender { self.sender.send((self.map_fn)(event)) } } + +/// Applies a function on events received from the wrapped receiver. +pub struct MapReceiver, F: FnMut(I) -> O> { + receiver: R, + map_fn: F, + _phantom: PhantomData<(I, O)>, +} + +impl, F: FnMut(I) -> O> MapReceiver { + /// Create a new MapReceiver. + pub fn new(receiver: R, map_fn: F) -> Self { + Self { + receiver, + map_fn, + _phantom: PhantomData, + } + } +} + +impl, F: FnMut(I) -> O> Receiver for MapReceiver { + fn try_next(&mut self) -> Option { + self.receiver.try_next().map(&mut self.map_fn) + } + + async fn wait_next(&mut self) -> O { + (self.map_fn)(self.receiver.wait_next().await) + } +} + +/// Filters events from the wrapped receiver, only yielding events that pass the predicate. +/// +/// Events that do not pass the filter are consumed and discarded. +pub struct FilterReceiver, F: FnMut(&E) -> bool> { + receiver: R, + filter_fn: F, + _phantom: PhantomData, +} + +impl, F: FnMut(&E) -> bool> FilterReceiver { + /// Create a new FilterReceiver. + pub fn new(receiver: R, filter_fn: F) -> Self { + Self { + receiver, + filter_fn, + _phantom: PhantomData, + } + } +} + +impl, F: FnMut(&E) -> bool> Receiver for FilterReceiver { + fn try_next(&mut self) -> Option { + loop { + match self.receiver.try_next() { + Some(e) if (self.filter_fn)(&e) => return Some(e), + Some(_) => continue, + None => return None, + } + } + } + + async fn wait_next(&mut self) -> E { + loop { + let e = self.receiver.wait_next().await; + if (self.filter_fn)(&e) { + return e; + } + } + } +} + +/// A receiver that never produces events. +/// +/// This is mainly used to make it easier to construct a `MuxReceiver` +/// via macro since we don't need to handle the special start case +/// when chaining `with` calls. +pub struct NeverReceiver(PhantomData); + +impl NeverReceiver { + /// Create a new NeverReceiver. + pub fn new() -> Self { + Self(PhantomData) + } +} + +impl Default for NeverReceiver { + fn default() -> Self { + Self::new() + } +} + +impl Receiver for NeverReceiver { + fn try_next(&mut self) -> Option { + None + } + + async fn wait_next(&mut self) -> E { + core::future::pending().await + } +} + +/// Combines multiple receivers into one by racing them (with left-bias) and returning +/// the first event that becomes available mapped to a common event type. +pub struct MuxReceiver, R: Receiver> { + left: L, + right: R, + _phantom: PhantomData, +} + +impl MuxReceiver, NeverReceiver> { + /// Create an empty MuxReceiver. + /// + /// Use `.with()` to add receivers. + pub fn new() -> Self { + Self { + left: NeverReceiver::new(), + right: NeverReceiver::new(), + _phantom: PhantomData, + } + } +} + +impl Default for MuxReceiver, NeverReceiver> { + fn default() -> Self { + Self::new() + } +} + +impl, R1: Receiver> MuxReceiver { + /// Add another receiver to multiplex with this one. + pub fn with, F: FnMut(I) -> E>( + self, + receiver: R2, + map_fn: F, + ) -> MuxReceiver> { + MuxReceiver { + left: self, + right: MapReceiver::new(receiver, map_fn), + _phantom: PhantomData, + } + } +} + +impl, R: Receiver> Receiver for MuxReceiver { + fn try_next(&mut self) -> Option { + self.left.try_next().or_else(|| self.right.try_next()) + } + + async fn wait_next(&mut self) -> E { + match embassy_futures::select::select(self.left.wait_next(), self.right.wait_next()).await { + embassy_futures::select::Either::First(e) => e, + embassy_futures::select::Either::Second(e) => e, + } + } +} diff --git a/embedded-service/src/relay/mod.rs b/embedded-service/src/relay/mod.rs index d92cfb97f..274a351df 100644 --- a/embedded-service/src/relay/mod.rs +++ b/embedded-service/src/relay/mod.rs @@ -116,6 +116,9 @@ pub mod mctp { /// The result type that this service handler processes type ResultType: super::SerializableResult; + + /// The event type that this service emits. + type EventType; } /// Trait for a service that can be relayed over an external bus (e.g. battery service, thermal service, time-alarm service) @@ -448,6 +451,14 @@ pub mod mctp { } + /// A common event type wrapper for all relayable service events. + #[derive(Debug)] + pub enum ServiceEvent { + $( + $service_name(<$service_handler_type as $crate::relay::mctp::RelayServiceHandlerTypes>::EventType), + )+ + } + pub struct $relay_type_name { $( [<$service_name:snake _handler>]: $service_handler_type, @@ -466,6 +477,33 @@ pub mod mctp { )+ } } + + /// Build an event multiplexer from the provided relayable services. + /// + /// This is generic over the receiver type used for each service, so callers + /// can decide at the call site which concrete `Receiver` impl to + /// supply for each relayed service (e.g. `NeverReceiver`, an embassy channel + /// `Receiver`, a `DynamicReceiver`, or a custom impl). + /// + /// The caller will then need to further filter this event mux to whatever + /// events they consider worth notifying the host about. + /// + /// Lastly, the caller will then need to map the events into a format the + /// relay service understands (e.g. a single u8 for uart-service). + pub fn event_mux< + $( + [<$service_name Rx>]: $crate::event::Receiver< + <$service_handler_type as $crate::relay::mctp::RelayServiceHandlerTypes>::EventType + >, + )+ + >( + $( + [<$service_name:snake _event_rx>]: [<$service_name Rx>], + )+ + ) -> impl $crate::event::Receiver { + $crate::event::MuxReceiver::new() + $(.with([<$service_name:snake _event_rx>], ServiceEvent::$service_name))+ + } } impl $crate::relay::mctp::RelayHandler for $relay_type_name { @@ -494,6 +532,7 @@ pub mod mctp { // Allows this generated relay type to be publicly re-exported pub use [< _odp_impl_ $relay_type_name:snake >]::$relay_type_name; + pub use [< _odp_impl_ $relay_type_name:snake >]::ServiceEvent; } // end paste! }; // end macro arm diff --git a/thermal-service-interface/src/lib.rs b/thermal-service-interface/src/lib.rs index b1e56a100..c901fb201 100644 --- a/thermal-service-interface/src/lib.rs +++ b/thermal-service-interface/src/lib.rs @@ -3,6 +3,17 @@ pub mod fan; pub mod sensor; +/// Thermal service event. +#[derive(Debug, PartialEq, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +#[non_exhaustive] +pub enum Event { + /// A sensor event occurred. + Sensor(u8, sensor::Event), + /// A fan event occurred. + Fan(u8, fan::Event), +} + /// Thermal service interface trait. pub trait ThermalService { /// Associated type for registered sensor services. diff --git a/thermal-service-relay/src/lib.rs b/thermal-service-relay/src/lib.rs index fceae864d..d896606b5 100644 --- a/thermal-service-relay/src/lib.rs +++ b/thermal-service-relay/src/lib.rs @@ -3,6 +3,7 @@ mod serialization; pub use serialization::{ThermalError, ThermalRequest, ThermalResponse, ThermalResult}; +use thermal_service_interface::Event as ThermalEvent; use thermal_service_interface::ThermalService; use thermal_service_interface::fan::{self, FanService}; use thermal_service_interface::sensor::{self, SensorService}; @@ -194,6 +195,7 @@ impl ThermalServiceRelayHandler { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for ThermalServiceRelayHandler { type RequestType = ThermalRequest; type ResultType = ThermalResult; + type EventType = ThermalEvent; } impl embedded_services::relay::mctp::RelayServiceHandler for ThermalServiceRelayHandler { diff --git a/time-alarm-service-relay/src/lib.rs b/time-alarm-service-relay/src/lib.rs index ed444a394..b428b3cb6 100644 --- a/time-alarm-service-relay/src/lib.rs +++ b/time-alarm-service-relay/src/lib.rs @@ -20,6 +20,9 @@ impl TimeAlarmServiceRelayHandler { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for TimeAlarmServiceRelayHandler { type RequestType = AcpiTimeAlarmRequest; type ResultType = AcpiTimeAlarmResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler for TimeAlarmServiceRelayHandler { diff --git a/uart-service/Cargo.toml b/uart-service/Cargo.toml index beb373559..8aed7b44c 100644 --- a/uart-service/Cargo.toml +++ b/uart-service/Cargo.toml @@ -17,6 +17,7 @@ workspace = true embedded-services.workspace = true defmt = { workspace = true, optional = true } log = { workspace = true, optional = true } +embassy-futures.workspace = true embassy-sync.workspace = true mctp-rs = { workspace = true } embedded-io-async.workspace = true diff --git a/uart-service/src/lib.rs b/uart-service/src/lib.rs index e3ecffe6a..a0e8a621b 100644 --- a/uart-service/src/lib.rs +++ b/uart-service/src/lib.rs @@ -4,9 +4,6 @@ //! Use [`DefaultService`] for the SmbusEspi-medium baseline; use //! [`Service::new`] directly with another medium (e.g. DSP0253 serial) //! for non-SmbusEspi callers. -//! -//! Revisit: Will also need to consider how to handle notifications (likely need to have user -//! provide GPIO pin we can use). #![no_std] pub mod task; @@ -24,6 +21,27 @@ use mctp_rs::smbus_espi::{SmbusEspiMedium, SmbusEspiReplyContext}; const BUF_SIZE: usize = 256; const HOST_TX_QUEUE_SIZE: usize = 5; +// Persistent state for UART request reading +// +// Necessary to make sure the request reading process is cancel-safe +struct ReadState { + buf: [u8; BUF_SIZE], + filled: usize, +} + +impl ReadState { + fn new() -> Self { + Self { + buf: [0u8; BUF_SIZE], + filled: 0, + } + } + + fn reset(&mut self) { + self.filled = 0; + } +} + #[derive(Clone)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub(crate) struct HostResultMessage { @@ -107,13 +125,18 @@ impl Service { Ok(()) } - async fn wait_for_request(&self, uart: &mut T) -> Result<(), Error> { - // Incremental read loop: read bytes, ask the medium whether the - // assembled prefix is a complete frame, repeat until it is. - let mut buf = [0u8; BUF_SIZE]; - let mut filled = 0usize; - let packet_len = loop { - let dst = buf.get_mut(filled..).ok_or(Error::Serialize("buffer overrun"))?; + // Read bytes from UART until a complete request is assembled. + // + // # Cancel Safety + // + // This method is cancel-safe because partial read progress is stored in `state` + // and will be resumed on the next call. + async fn wait_for_request(&self, uart: &mut T, state: &mut ReadState) -> Result> { + loop { + let dst = state + .buf + .get_mut(state.filled..) + .ok_or(Error::Serialize("buffer overrun"))?; if dst.is_empty() { return Err(Error::Serialize("frame exceeds BUF_SIZE")); } @@ -121,23 +144,33 @@ impl Service { if n == 0 { return Err(Error::Comms); } - filled += n; + state.filled += n; match self .medium - .frame_complete(buf.get(..filled).ok_or(Error::Serialize("buffer overrun"))?) + .frame_complete( + state + .buf + .get(..state.filled) + .ok_or(Error::Serialize("buffer overrun"))?, + ) .map_err(Error::Mctp)? { - Some(len) => break len, + Some(len) => return Ok(len), None => continue, } - }; + } + } + // Deserialize the request and forward it to correct service for processing + async fn process_request(&self, state: &ReadState, packet_len: usize) -> Result<(), Error> { let mut assembly_buf = [0u8; BUF_SIZE]; let mut mctp_ctx = mctp_rs::MctpPacketContext::::new(self.medium, &mut assembly_buf); let message = mctp_ctx .deserialize_packet( - buf.get(..packet_len) + state + .buf + .get(..packet_len) .ok_or(Error::Serialize("frame exceeds BUF_SIZE"))?, ) .map_err(Error::Mctp)? diff --git a/uart-service/src/task.rs b/uart-service/src/task.rs index 06a2b8c05..80256615d 100644 --- a/uart-service/src/task.rs +++ b/uart-service/src/task.rs @@ -1,26 +1,59 @@ -use crate::{Error, Service}; +use crate::{Error, ReadState, Service}; +use embassy_futures::select::{Either, select}; use embedded_io_async::Read as UartRead; use embedded_io_async::Write as UartWrite; -use embedded_services::error; use embedded_services::relay::mctp::RelayHandler; +use embedded_services::{error, warn}; use mctp_rs::MctpMedium; pub async fn uart_service( uart_service: &Service, mut uart: T, + mut notifiable_events: impl embedded_services::event::Receiver, ) -> Result> { - // Note: eSPI service uses `select!` to seemingly allow asyncrhonous `responses` from services, - // but there are concerns around async cancellation here at least for UART service. - // - // Thus this assumes services will only send messages in response to requests from the host, - // so we handle this in order. + let mut read_state = ReadState::new(); + loop { - if let Err(e) = uart_service.wait_for_request(&mut uart).await { - log_error("request", &e); - } else { - let host_msg = uart_service.wait_for_response().await; - if let Err(e) = uart_service.process_response(&mut uart, host_msg).await { - log_error("response", &e); + match select( + uart_service.wait_for_request(&mut uart, &mut read_state), + notifiable_events.wait_next(), + ) + .await + { + Either::First(Ok(packet_len)) => { + if let Err(e) = uart_service.process_request(&read_state, packet_len).await { + log_error("request", &e); + } else { + let host_msg = uart_service.wait_for_response().await; + if let Err(e) = uart_service.process_response(&mut uart, host_msg).await { + log_error("response", &e); + } + } + read_state.reset(); + } + Either::First(Err(e)) => { + log_error("request", &e); + read_state.reset(); + } + Either::Second(event) => { + warn!( + "uart-service received notifiable event ({}) from relayable service", + event + ); + + // TODO: Here we would do something like: + // + // if let Err(_e) = uart.write_all(&[0x42, event]).await { + // error!("uart-service failed to send notification"); + // } else { + // warn!("uart-service sent notification for event {}", event); + // } + // + // Where we TX some starter byte(s) to tell host it's about to receive a notification, + // then the notification ID itself. + // + // This is TODO until the whole stack is ready to receive notifications + // otherwise TXing here could break things. } } }