citadel_sdk/prefabs/shared/
internal_service.rs1use crate::prelude::{CitadelClientServerConnection, TargetLockedRemote};
46use bytes::Bytes;
47use citadel_io::tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
48use citadel_io::tokio_stream::wrappers::UnboundedReceiverStream;
49use citadel_io::tokio_util::io::StreamReader;
50use citadel_proto::prelude::NetworkError;
51use citadel_proto::prelude::*;
52use citadel_types::crypto::SecBuffer;
53use futures::StreamExt;
54use std::future::Future;
55use std::pin::Pin;
56use std::task::{Context, Poll};
57
58pub async fn internal_service<F, Fut, R: Ratchet>(
59 connection: CitadelClientServerConnection<R>,
60 service: F,
61) -> Result<(), NetworkError>
62where
63 F: Send + Copy + Sync + FnOnce(InternalServerCommunicator) -> Fut,
64 Fut: Send + Sync + Future<Output = Result<(), NetworkError>>,
65{
66 let remote = connection.remote.clone();
67 let (tx_to_service, rx_from_kernel) = citadel_io::tokio::sync::mpsc::unbounded_channel();
68 let (tx_to_kernel, mut rx_from_service) = citadel_io::tokio::sync::mpsc::unbounded_channel();
69
70 let internal_server_communicator = InternalServerCommunicator {
71 tx_to_kernel,
72 rx_from_kernel: StreamReader::new(rx_from_kernel.into()),
73 };
74
75 let internal_server = service(internal_server_communicator);
76
77 let (mut sink, mut stream) = connection.split();
79 let from_proto = async move {
81 while let Some(packet) = stream.next().await {
82 tx_to_service.send(Ok(packet.into_buffer().freeze()))?;
85 }
86
87 Ok(())
88 };
89
90 let from_webserver = async move {
92 while let Some(packet) = rx_from_service.recv().await {
93 sink.send(packet).await?;
94 }
95
96 Ok(())
97 };
98
99 let res = citadel_io::tokio::select! {
100 res0 = from_proto => {
101 res0
102 },
103 res1 = from_webserver => {
104 res1
105 },
106 res2 = internal_server => {
107 res2
108 }
109 };
110
111 citadel_logging::warn!(target: "citadel", "Internal Server Stopped: {res:?}");
112
113 remote.remote().shutdown().await?;
114 res
115}
116
117pub struct InternalServerCommunicator {
118 pub(crate) tx_to_kernel: citadel_io::tokio::sync::mpsc::UnboundedSender<SecBuffer>,
119 pub(crate) rx_from_kernel:
120 StreamReader<UnboundedReceiverStream<Result<Bytes, std::io::Error>>, Bytes>,
121}
122
123impl AsyncWrite for InternalServerCommunicator {
124 fn poll_write(
125 self: Pin<&mut Self>,
126 _cx: &mut Context<'_>,
127 buf: &[u8],
128 ) -> Poll<std::io::Result<usize>> {
129 let len = buf.len();
130 match self.tx_to_kernel.send(buf.into()) {
131 Ok(_) => Poll::Ready(Ok(len)),
132 Err(err) => Poll::Ready(Err(std::io::Error::other(err.to_string()))),
133 }
134 }
135
136 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
137 Poll::Ready(Ok(()))
138 }
139
140 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
141 Poll::Ready(Ok(()))
142 }
143}
144
145impl AsyncRead for InternalServerCommunicator {
146 fn poll_read(
147 mut self: Pin<&mut Self>,
148 cx: &mut Context<'_>,
149 buf: &mut ReadBuf<'_>,
150 ) -> Poll<std::io::Result<()>> {
151 Pin::new(&mut self.rx_from_kernel).poll_read(cx, buf)
152 }
153}
154
155impl Unpin for InternalServerCommunicator {}