citadel_sdk/prefabs/shared/internal_service.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
//! Internal Service Communication Layer
//!
//! This module provides the core functionality for integrating internal services
//! within the Citadel Protocol network. It enables bidirectional communication
//! between network services and the protocol layer.
//!
//! # Features
//! - Asynchronous communication channels
//! - Bidirectional message passing
//! - Automatic protocol conversion
//! - Error propagation
//! - Resource cleanup on shutdown
//! - Stream-based I/O interface
//!
//! # Example
//! ```rust,no_run
//! use citadel_sdk::prelude::*;
//! use citadel_sdk::prefabs::shared::internal_service::InternalServerCommunicator;
//! use futures::Future;
//!
//! async fn my_service(comm: InternalServerCommunicator) -> Result<(), NetworkError> {
//! // Service implementation
//! Ok(())
//! }
//! ```
//!
//! # Important Notes
//! - Services run in isolated contexts
//! - Communication is fully asynchronous
//! - Implements AsyncRead and AsyncWrite
//! - Automatic cleanup on drop
//! - Thread-safe message passing
//!
//! # Related Components
//! - [`CitadelClientServerConnection`]: Connection event data
//! - [`TargetLockedRemote`]: Remote target interface
//! - [`NetworkError`]: Error handling
//! - [`SecBuffer`]: Secure data handling
//!
//! [`CitadelClientServerConnection`]: crate::prelude::CitadelClientServerConnection
//! [`TargetLockedRemote`]: crate::prelude::TargetLockedRemote
//! [`NetworkError`]: crate::prelude::NetworkError
//! [`SecBuffer`]: crate::prelude::SecBuffer
use crate::prelude::{CitadelClientServerConnection, TargetLockedRemote};
use bytes::Bytes;
use citadel_io::tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use citadel_proto::prelude::NetworkError;
use citadel_proto::prelude::*;
use citadel_proto::re_imports::{StreamReader, UnboundedReceiverStream};
use citadel_types::crypto::SecBuffer;
use futures::StreamExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub async fn internal_service<F, Fut, R: Ratchet>(
connection: CitadelClientServerConnection<R>,
service: F,
) -> Result<(), NetworkError>
where
F: Send + Copy + Sync + FnOnce(InternalServerCommunicator) -> Fut,
Fut: Send + Sync + Future<Output = Result<(), NetworkError>>,
{
let remote = connection.remote.clone();
let (tx_to_service, rx_from_kernel) = citadel_io::tokio::sync::mpsc::unbounded_channel();
let (tx_to_kernel, mut rx_from_service) = citadel_io::tokio::sync::mpsc::unbounded_channel();
let internal_server_communicator = InternalServerCommunicator {
tx_to_kernel,
rx_from_kernel: StreamReader::new(rx_from_kernel.into()),
};
let internal_server = service(internal_server_communicator);
// each time a client connects, we will begin listening for messages.
let (mut sink, mut stream) = connection.split();
// from_proto forwards packets from the proto to the http server
let from_proto = async move {
while let Some(packet) = stream.next().await {
// we receive a Citadel Protocol Packet. Now, we need to forward it to the webserver
// the response below is the response of the internal server
tx_to_service.send(Ok(packet.into_buffer().freeze()))?;
}
Ok(())
};
// from_webserver forwards packets from the internal server to the proto
let from_webserver = async move {
while let Some(packet) = rx_from_service.recv().await {
sink.send(packet).await?;
}
Ok(())
};
let res = citadel_io::tokio::select! {
res0 = from_proto => {
res0
},
res1 = from_webserver => {
res1
},
res2 = internal_server => {
res2
}
};
citadel_logging::warn!(target: "citadel", "Internal Server Stopped: {res:?}");
remote.remote().shutdown().await?;
res
}
pub struct InternalServerCommunicator {
pub(crate) tx_to_kernel: citadel_io::tokio::sync::mpsc::UnboundedSender<SecBuffer>,
pub(crate) rx_from_kernel:
StreamReader<UnboundedReceiverStream<Result<Bytes, std::io::Error>>, Bytes>,
}
impl AsyncWrite for InternalServerCommunicator {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let len = buf.len();
match self.tx_to_kernel.send(buf.into()) {
Ok(_) => Poll::Ready(Ok(len)),
Err(err) => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
err.to_string(),
))),
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsyncRead for InternalServerCommunicator {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.rx_from_kernel).poll_read(cx, buf)
}
}
impl Unpin for InternalServerCommunicator {}