Skip to main content

citadel_sdk/prefabs/shared/
internal_service.rs

1//! Internal Service Communication Layer
2//!
3//! This module provides the core functionality for integrating internal services
4//! within the Citadel Protocol network. It enables bidirectional communication
5//! between network services and the protocol layer.
6//!
7//! # Features
8//! - Asynchronous communication channels
9//! - Bidirectional message passing
10//! - Automatic protocol conversion
11//! - Error propagation
12//! - Resource cleanup on shutdown
13//! - Stream-based I/O interface
14//!
15//! # Example
16//! ```rust,no_run
17//! use citadel_sdk::prelude::*;
18//! use citadel_sdk::prefabs::shared::internal_service::InternalServerCommunicator;
19//! use futures::Future;
20//!
21//! async fn my_service(comm: InternalServerCommunicator) -> Result<(), NetworkError> {
22//!     // Service implementation
23//!     Ok(())
24//! }
25//! ```
26//!
27//! # Important Notes
28//! - Services run in isolated contexts
29//! - Communication is fully asynchronous
30//! - Implements AsyncRead and AsyncWrite
31//! - Automatic cleanup on drop
32//! - Thread-safe message passing
33//!
34//! # Related Components
35//! - [`CitadelClientServerConnection`]: Connection event data
36//! - [`TargetLockedRemote`]: Remote target interface
37//! - [`NetworkError`]: Error handling
38//! - [`SecBuffer`]: Secure data handling
39//!
40//! [`CitadelClientServerConnection`]: crate::prelude::CitadelClientServerConnection
41//! [`TargetLockedRemote`]: crate::prelude::TargetLockedRemote
42//! [`NetworkError`]: crate::prelude::NetworkError
43//! [`SecBuffer`]: crate::prelude::SecBuffer
44
45use crate::prelude::{CitadelClientServerConnection, TargetLockedRemote};
46use bytes::Bytes;
47use citadel_io::tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
48use citadel_io::tokio_stream::wrappers::UnboundedReceiverStream;
49use citadel_io::tokio_util::io::StreamReader;
50use citadel_proto::prelude::NetworkError;
51use citadel_proto::prelude::*;
52use citadel_types::crypto::SecBuffer;
53use futures::StreamExt;
54use std::future::Future;
55use std::pin::Pin;
56use std::task::{Context, Poll};
57
58pub async fn internal_service<F, Fut, R: Ratchet>(
59    connection: CitadelClientServerConnection<R>,
60    service: F,
61) -> Result<(), NetworkError>
62where
63    F: Send + Copy + Sync + FnOnce(InternalServerCommunicator) -> Fut,
64    Fut: Send + Sync + Future<Output = Result<(), NetworkError>>,
65{
66    let remote = connection.remote.clone();
67    let (tx_to_service, rx_from_kernel) = citadel_io::tokio::sync::mpsc::unbounded_channel();
68    let (tx_to_kernel, mut rx_from_service) = citadel_io::tokio::sync::mpsc::unbounded_channel();
69
70    let internal_server_communicator = InternalServerCommunicator {
71        tx_to_kernel,
72        rx_from_kernel: StreamReader::new(rx_from_kernel.into()),
73    };
74
75    let internal_server = service(internal_server_communicator);
76
77    // each time a client connects, we will begin listening for messages.
78    let (mut sink, mut stream) = connection.split();
79    // from_proto forwards packets from the proto to the http server
80    let from_proto = async move {
81        while let Some(packet) = stream.next().await {
82            // we receive a Citadel Protocol Packet. Now, we need to forward it to the webserver
83            // the response below is the response of the internal server
84            tx_to_service.send(Ok(packet.into_buffer().freeze()))?;
85        }
86
87        Ok(())
88    };
89
90    // from_webserver forwards packets from the internal server to the proto
91    let from_webserver = async move {
92        while let Some(packet) = rx_from_service.recv().await {
93            sink.send(packet).await?;
94        }
95
96        Ok(())
97    };
98
99    let res = citadel_io::tokio::select! {
100        res0 = from_proto => {
101            res0
102        },
103        res1 = from_webserver => {
104            res1
105        },
106        res2 = internal_server => {
107            res2
108        }
109    };
110
111    citadel_logging::warn!(target: "citadel", "Internal Server Stopped: {res:?}");
112
113    remote.remote().shutdown().await?;
114    res
115}
116
117pub struct InternalServerCommunicator {
118    pub(crate) tx_to_kernel: citadel_io::tokio::sync::mpsc::UnboundedSender<SecBuffer>,
119    pub(crate) rx_from_kernel:
120        StreamReader<UnboundedReceiverStream<Result<Bytes, std::io::Error>>, Bytes>,
121}
122
123impl AsyncWrite for InternalServerCommunicator {
124    fn poll_write(
125        self: Pin<&mut Self>,
126        _cx: &mut Context<'_>,
127        buf: &[u8],
128    ) -> Poll<std::io::Result<usize>> {
129        let len = buf.len();
130        match self.tx_to_kernel.send(buf.into()) {
131            Ok(_) => Poll::Ready(Ok(len)),
132            Err(err) => Poll::Ready(Err(std::io::Error::other(err.to_string()))),
133        }
134    }
135
136    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
137        Poll::Ready(Ok(()))
138    }
139
140    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
141        Poll::Ready(Ok(()))
142    }
143}
144
145impl AsyncRead for InternalServerCommunicator {
146    fn poll_read(
147        mut self: Pin<&mut Self>,
148        cx: &mut Context<'_>,
149        buf: &mut ReadBuf<'_>,
150    ) -> Poll<std::io::Result<()>> {
151        Pin::new(&mut self.rx_from_kernel).poll_read(cx, buf)
152    }
153}
154
155impl Unpin for InternalServerCommunicator {}