citadel_sdk/prefabs/shared/
internal_service.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
//! Internal Service Communication Layer
//!
//! This module provides the core functionality for integrating internal services
//! within the Citadel Protocol network. It enables bidirectional communication
//! between network services and the protocol layer.
//!
//! # Features
//! - Asynchronous communication channels
//! - Bidirectional message passing
//! - Automatic protocol conversion
//! - Error propagation
//! - Resource cleanup on shutdown
//! - Stream-based I/O interface
//!
//! # Example
//! ```rust,no_run
//! use citadel_sdk::prelude::*;
//! use citadel_sdk::prefabs::shared::internal_service::InternalServerCommunicator;
//! use futures::Future;
//!
//! async fn my_service(comm: InternalServerCommunicator) -> Result<(), NetworkError> {
//!     // Service implementation
//!     Ok(())
//! }
//! ```
//!
//! # Important Notes
//! - Services run in isolated contexts
//! - Communication is fully asynchronous
//! - Implements AsyncRead and AsyncWrite
//! - Automatic cleanup on drop
//! - Thread-safe message passing
//!
//! # Related Components
//! - [`CitadelClientServerConnection`]: Connection event data
//! - [`TargetLockedRemote`]: Remote target interface
//! - [`NetworkError`]: Error handling
//! - [`SecBuffer`]: Secure data handling
//!
//! [`CitadelClientServerConnection`]: crate::prelude::CitadelClientServerConnection
//! [`TargetLockedRemote`]: crate::prelude::TargetLockedRemote
//! [`NetworkError`]: crate::prelude::NetworkError
//! [`SecBuffer`]: crate::prelude::SecBuffer

use crate::prelude::{CitadelClientServerConnection, TargetLockedRemote};
use bytes::Bytes;
use citadel_io::tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use citadel_proto::prelude::NetworkError;
use citadel_proto::prelude::*;
use citadel_proto::re_imports::{StreamReader, UnboundedReceiverStream};
use citadel_types::crypto::SecBuffer;
use futures::StreamExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub async fn internal_service<F, Fut, R: Ratchet>(
    connection: CitadelClientServerConnection<R>,
    service: F,
) -> Result<(), NetworkError>
where
    F: Send + Copy + Sync + FnOnce(InternalServerCommunicator) -> Fut,
    Fut: Send + Sync + Future<Output = Result<(), NetworkError>>,
{
    let remote = connection.remote.clone();
    let (tx_to_service, rx_from_kernel) = citadel_io::tokio::sync::mpsc::unbounded_channel();
    let (tx_to_kernel, mut rx_from_service) = citadel_io::tokio::sync::mpsc::unbounded_channel();

    let internal_server_communicator = InternalServerCommunicator {
        tx_to_kernel,
        rx_from_kernel: StreamReader::new(rx_from_kernel.into()),
    };

    let internal_server = service(internal_server_communicator);

    // each time a client connects, we will begin listening for messages.
    let (mut sink, mut stream) = connection.split();
    // from_proto forwards packets from the proto to the http server
    let from_proto = async move {
        while let Some(packet) = stream.next().await {
            // we receive a Citadel Protocol Packet. Now, we need to forward it to the webserver
            // the response below is the response of the internal server
            tx_to_service.send(Ok(packet.into_buffer().freeze()))?;
        }

        Ok(())
    };

    // from_webserver forwards packets from the internal server to the proto
    let from_webserver = async move {
        while let Some(packet) = rx_from_service.recv().await {
            sink.send(packet).await?;
        }

        Ok(())
    };

    let res = citadel_io::tokio::select! {
        res0 = from_proto => {
            res0
        },
        res1 = from_webserver => {
            res1
        },
        res2 = internal_server => {
            res2
        }
    };

    citadel_logging::warn!(target: "citadel", "Internal Server Stopped: {res:?}");

    remote.remote().shutdown().await?;
    res
}

pub struct InternalServerCommunicator {
    pub(crate) tx_to_kernel: citadel_io::tokio::sync::mpsc::UnboundedSender<SecBuffer>,
    pub(crate) rx_from_kernel:
        StreamReader<UnboundedReceiverStream<Result<Bytes, std::io::Error>>, Bytes>,
}

impl AsyncWrite for InternalServerCommunicator {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        let len = buf.len();
        match self.tx_to_kernel.send(buf.into()) {
            Ok(_) => Poll::Ready(Ok(len)),
            Err(err) => Poll::Ready(Err(std::io::Error::new(
                std::io::ErrorKind::Other,
                err.to_string(),
            ))),
        }
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

impl AsyncRead for InternalServerCommunicator {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        Pin::new(&mut self.rx_from_kernel).poll_read(cx, buf)
    }
}

impl Unpin for InternalServerCommunicator {}