citadel_sdk/
remote_ext.rs

1//! Remote Protocol Extensions
2//!
3//! This module extends the core NodeRemote functionality with high-level operations
4//! for managing connections, file transfers, and peer interactions in the Citadel
5//! Protocol network.
6//!
7//! # Features
8//! - User registration and authentication
9//! - Connection management
10//! - File transfer operations
11//! - Virtual filesystem support
12//! - Peer discovery and management
13//! - Group communication
14//! - Security settings configuration
15//!
16//! # Example
17//! ```rust
18//! use citadel_sdk::prelude::*;
19//!
20//! async fn example<R: Ratchet>(remote: NodeRemote<R>) -> Result<(), NetworkError> {
21//!     // Register a new user
22//!     let reg = remote.register_with_defaults(
23//!         "127.0.0.1:25021",
24//!         "John Doe",
25//!         "john.doe",
26//!         "password123"
27//!     ).await?;
28//!
29//!     // Connect to a peer
30//!     let auth = AuthenticationRequest::credentialed("john.doe", "password123");
31//!     let conn = remote.connect_with_defaults(auth).await?;
32//!
33//!     // Send a file to a peer
34//!     remote.find_target("john.doe", "peer.name")
35//!         .await?
36//!         .send_file("/path/to/file.txt")
37//!         .await?;
38//!
39//!     Ok(())
40//! }
41//! ```
42//!
43//! # Important Notes
44//! - All operations are asynchronous
45//! - Connections are automatically managed
46//! - File transfers support chunking
47//! - Virtual filesystem is encrypted
48//! - Peer connections require mutual registration
49//!
50//! # Related Components
51//! - [`NodeRemote`]: Core remote interface
52//! - [`ClientServerRemote`]: Client-server communication
53//! - [`PeerRemote`]: Peer-to-peer communication
54//! - [`CitadelClientServerConnection`]: Connection management
55//! - [`RegisterSuccess`]: Registration handling
56//!
57//! [`NodeRemote`]: crate::prelude::NodeRemote
58//! [`ClientServerRemote`]: crate::prelude::ClientServerRemote
59//! [`PeerRemote`]: crate::prelude::PeerRemote
60//! [`CitadelClientServerConnection`]: crate::prelude::CitadelClientServerConnection
61//! [`RegisterSuccess`]: crate::prelude::RegisterSuccess
62
63use crate::prefabs::ClientServerRemote;
64use crate::prelude::results::{PeerConnectSuccess, PeerRegisterStatus};
65use crate::prelude::*;
66use crate::remote_ext::remote_specialization::PeerRemote;
67use crate::remote_ext::results::LocalGroupPeerFullInfo;
68use std::ops::{Deref, DerefMut};
69
70use futures::StreamExt;
71use std::path::PathBuf;
72use std::time::Duration;
73
74pub(crate) mod user_ids {
75    use crate::prelude::*;
76    use std::ops::Deref;
77
78    #[derive(Debug)]
79    /// A reference to a user identifier
80    pub struct SymmetricIdentifierHandleRef<'a, R: Ratchet> {
81        pub(crate) user: VirtualTargetType,
82        pub(crate) remote: &'a NodeRemote<R>,
83        pub(crate) target_username: Option<String>,
84    }
85
86    impl<R: Ratchet> SymmetricIdentifierHandleRef<'_, R> {
87        pub fn into_owned(self) -> SymmetricIdentifierHandle<R> {
88            SymmetricIdentifierHandle {
89                user: self.user,
90                remote: self.remote.clone(),
91                target_username: self.target_username,
92            }
93        }
94    }
95
96    #[derive(Clone, Debug)]
97    /// A convenience structure for executing commands that depend on a specific registered user
98    pub struct SymmetricIdentifierHandle<R: Ratchet> {
99        user: VirtualTargetType,
100        remote: NodeRemote<R>,
101        target_username: Option<String>,
102    }
103
104    pub trait TargetLockedRemote<R: Ratchet>: Send + Sync {
105        fn user(&self) -> &VirtualTargetType;
106        fn remote(&self) -> &NodeRemote<R>;
107        fn target_username(&self) -> Option<&str>;
108        fn user_mut(&mut self) -> &mut VirtualTargetType;
109        fn session_security_settings(&self) -> Option<&SessionSecuritySettings>;
110    }
111
112    impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandleRef<'_, R> {
113        fn user(&self) -> &VirtualTargetType {
114            &self.user
115        }
116        fn remote(&self) -> &NodeRemote<R> {
117            self.remote
118        }
119        fn target_username(&self) -> Option<&str> {
120            self.target_username.as_deref()
121        }
122        fn user_mut(&mut self) -> &mut VirtualTargetType {
123            &mut self.user
124        }
125
126        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
127            None
128        }
129    }
130
131    impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandle<R> {
132        fn user(&self) -> &VirtualTargetType {
133            &self.user
134        }
135        fn remote(&self) -> &NodeRemote<R> {
136            &self.remote
137        }
138        fn target_username(&self) -> Option<&str> {
139            self.target_username.as_deref()
140        }
141        fn user_mut(&mut self) -> &mut VirtualTargetType {
142            &mut self.user
143        }
144
145        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
146            None
147        }
148    }
149
150    impl<R: Ratchet> From<SymmetricIdentifierHandleRef<'_, R>> for SymmetricIdentifierHandle<R> {
151        fn from(this: SymmetricIdentifierHandleRef<'_, R>) -> Self {
152            this.into_owned()
153        }
154    }
155
156    impl<R: Ratchet> Deref for SymmetricIdentifierHandle<R> {
157        type Target = NodeRemote<R>;
158
159        fn deref(&self) -> &Self::Target {
160            &self.remote
161        }
162    }
163
164    impl<R: Ratchet> Deref for SymmetricIdentifierHandleRef<'_, R> {
165        type Target = NodeRemote<R>;
166
167        fn deref(&self) -> &Self::Target {
168            self.remote
169        }
170    }
171}
172
173/// Contains the elements required to communicate with the adjacent node
174pub struct CitadelClientServerConnection<R: Ratchet> {
175    /// An interface to send ordered, reliable, and encrypted messages
176    pub(crate) channel: Option<PeerChannel<R>>,
177    pub remote: ClientServerRemote<R>,
178    /// Only available if UdpMode was enabled at the beginning of a session
179    pub udp_channel_rx: Option<citadel_io::tokio::sync::oneshot::Receiver<UdpChannel<R>>>,
180    /// Contains the Google auth minted at the central server (if the central server enabled it), as well as any other services enabled by the central server
181    pub services: ServicesObject,
182    pub cid: u64,
183    pub session_security_settings: SessionSecuritySettings,
184}
185
186impl<R: Ratchet> CitadelClientServerConnection<R> {
187    /// Splits the channel into a send and receive half. This will render
188    /// the other fields of this connection object innaccessible
189    ///
190    /// # Panics
191    ///  - If the channel has already been taken
192    pub fn split(self) -> (PeerChannelSendHalf<R>, PeerChannelRecvHalf<R>) {
193        self.channel.expect("Channel already taken").split()
194    }
195
196    pub fn take_channel(&mut self) -> Option<PeerChannel<R>> {
197        self.channel.take()
198    }
199}
200
201impl<R: Ratchet> Deref for CitadelClientServerConnection<R> {
202    type Target = ClientServerRemote<R>;
203
204    fn deref(&self) -> &Self::Target {
205        &self.remote
206    }
207}
208
209impl<R: Ratchet> DerefMut for CitadelClientServerConnection<R> {
210    fn deref_mut(&mut self) -> &mut Self::Target {
211        &mut self.remote
212    }
213}
214
215/// Contains the elements entailed by a successful registration
216pub struct RegisterSuccess {
217    pub cid: u64,
218}
219
220#[async_trait]
221/// Endows the [NodeRemote](NodeRemote) with additional functions
222pub trait ProtocolRemoteExt<R: Ratchet>: Remote<R> {
223    /// Registers with custom settings
224    /// Returns a ticket which is used to uniquely identify the request in the protocol
225    async fn register<
226        T: std::net::ToSocketAddrs + Send,
227        P: Into<String> + Send,
228        V: Into<String> + Send,
229        K: Into<SecBuffer> + Send,
230    >(
231        &self,
232        addr: T,
233        full_name: P,
234        username: V,
235        proposed_password: K,
236        default_security_settings: SessionSecuritySettings,
237        server_password: Option<PreSharedKey>,
238    ) -> Result<RegisterSuccess, NetworkError> {
239        let creds =
240            ProposedCredentials::new_register(full_name, username, proposed_password.into())
241                .await?;
242        let register_request = NodeRequest::RegisterToHypernode(RegisterToHypernode {
243            remote_addr: addr
244                .to_socket_addrs()?
245                .next()
246                .ok_or(NetworkError::InternalError("Invalid socket addr"))?,
247            proposed_credentials: creds,
248            static_security_settings: default_security_settings,
249            session_password: server_password.unwrap_or_default(),
250        });
251
252        let mut subscription = self.send_callback_subscription(register_request).await?;
253        while let Some(status) = subscription.next().await {
254            match status.into_result()? {
255                NodeResult::RegisterOkay(RegisterOkay { cid, .. }) => {
256                    return Ok(RegisterSuccess { cid });
257                }
258                NodeResult::RegisterFailure(err) => {
259                    return Err(NetworkError::Generic(err.error_message));
260                }
261                NodeResult::Disconnect(err) => {
262                    return Err(NetworkError::Generic(err.message));
263                }
264                evt => {
265                    log::warn!(target: "citadel", "Invalid NodeResult for Register request received: {evt:?}");
266                }
267            }
268        }
269
270        Err(NetworkError::InternalError(
271            "Internal kernel stream died (register)",
272        ))
273    }
274
275    /// Registers using the default settings. The default uses No Google FCM keys and the default session security settings
276    /// Returns a ticket which is used to uniquely identify the request in the protocol
277    async fn register_with_defaults<
278        T: std::net::ToSocketAddrs + Send,
279        P: Into<String> + Send,
280        V: Into<String> + Send,
281        K: Into<SecBuffer> + Send,
282    >(
283        &self,
284        addr: T,
285        full_name: P,
286        username: V,
287        proposed_password: K,
288    ) -> Result<RegisterSuccess, NetworkError> {
289        self.register(
290            addr,
291            full_name,
292            username,
293            proposed_password,
294            Default::default(),
295            Default::default(),
296        )
297        .await
298    }
299
300    /// Connects with custom settings
301    /// Returns a ticket which is used to uniquely identify the request in the protocol
302    async fn connect(
303        &self,
304        auth: AuthenticationRequest,
305        connect_mode: ConnectMode,
306        udp_mode: UdpMode,
307        keep_alive_timeout: Option<Duration>,
308        session_security_settings: SessionSecuritySettings,
309        server_password: Option<PreSharedKey>,
310    ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
311        let connect_request = NodeRequest::ConnectToHypernode(ConnectToHypernode {
312            auth_request: auth,
313            connect_mode,
314            udp_mode,
315            keep_alive_timeout: keep_alive_timeout.map(|r| r.as_secs()),
316            session_security_settings,
317            session_password: server_password.unwrap_or_default(),
318        });
319
320        let mut subscription = self.send_callback_subscription(connect_request).await?;
321        let status = subscription
322            .next()
323            .await
324            .ok_or(NetworkError::InternalError(
325                "Internal kernel stream died (connect)",
326            ))?;
327
328        return match status.into_result()? {
329            NodeResult::ConnectSuccess(ConnectSuccess {
330                ticket: _,
331                session_cid: cid,
332                remote_addr: _,
333                is_personal: _,
334                v_conn_type,
335                services,
336                welcome_message: _,
337                channel,
338                udp_rx_opt: udp_channel_rx,
339                session_security_settings,
340            }) => Ok(CitadelClientServerConnection {
341                remote: ClientServerRemote::new(
342                    v_conn_type,
343                    self.remote_ref().clone(),
344                    session_security_settings,
345                    None,
346                    None,
347                ),
348                channel: Some(*channel),
349                udp_channel_rx,
350                services,
351                cid,
352                session_security_settings,
353            }),
354            NodeResult::ConnectFail(ConnectFail {
355                ticket: _,
356                cid_opt: _,
357                error_message: err,
358            }) => Err(NetworkError::Generic(err)),
359            NodeResult::Disconnect(err) => {
360                return Err(NetworkError::Generic(err.message));
361            }
362            res => Err(NetworkError::msg(format!(
363                "[connect] An unexpected response occurred: {res:?}"
364            ))),
365        };
366    }
367
368    /// Connects with the default settings
369    /// If FCM keys were created during the registration phase, then those keys will be used for the session. If new FCM keys need to be used, consider using [`Self::connect`]
370    async fn connect_with_defaults(
371        &self,
372        auth: AuthenticationRequest,
373    ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
374        self.connect(
375            auth,
376            Default::default(),
377            Default::default(),
378            None,
379            Default::default(),
380            Default::default(),
381        )
382        .await
383    }
384
385    /// Creates a valid target identifier used to make protocol requests. Raw user IDs or usernames can be used
386    /// ```
387    /// use citadel_sdk::prelude::*;
388    /// # use citadel_sdk::prefabs::client::single_connection::SingleClientServerConnectionKernel;
389    ///
390    /// let server_connection_settings = DefaultServerConnectionSettingsBuilder::credentialed_login("127.0.0.1:25021", "john.doe", "password").build().unwrap();
391    ///
392    /// # SingleClientServerConnectionKernel::new(server_connection_settings, |conn| async move {
393    /// conn.find_target("my_account", "my_peer").await?.send_file("/path/to/file.pdf").await
394    /// // or: conn.find_target(1234, "my_peer").await? [...]
395    /// # });
396    /// ```
397    async fn find_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
398        &self,
399        local_user: T,
400        peer: P,
401    ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
402        let account_manager = self.account_manager();
403        account_manager
404            .find_target_information(local_user, peer)
405            .await?
406            .map(move |(cid, peer)| {
407                if peer.parent_icid != 0 {
408                    SymmetricIdentifierHandleRef {
409                        user: VirtualTargetType::ExternalGroupPeer {
410                            session_cid: cid,
411                            interserver_cid: peer.parent_icid,
412                            peer_cid: peer.cid,
413                        },
414                        remote: self.remote_ref(),
415                        target_username: None,
416                    }
417                } else {
418                    SymmetricIdentifierHandleRef {
419                        user: VirtualTargetType::LocalGroupPeer {
420                            session_cid: cid,
421                            peer_cid: peer.cid,
422                        },
423                        remote: self.remote_ref(),
424                        target_username: None,
425                    }
426                }
427            })
428            .ok_or_else(|| NetworkError::msg("Target pair not found"))
429    }
430
431    /// Creates a proposed target from the valid local user to an unregistered peer in the network. Used when creating registration requests for peers.
432    /// Currently only supports LocalGroup <-> LocalGroup peer connections
433    async fn propose_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
434        &self,
435        local_user: T,
436        peer: P,
437    ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
438        let local_cid = self.get_session_cid(local_user).await?;
439        match peer.into() {
440            UserIdentifier::ID(peer_cid) => Ok(SymmetricIdentifierHandleRef {
441                user: VirtualTargetType::LocalGroupPeer {
442                    session_cid: local_cid,
443                    peer_cid,
444                },
445                remote: self.remote_ref(),
446                target_username: None,
447            }),
448            UserIdentifier::Username(uname) => {
449                let peer_cid = self
450                    .remote_ref()
451                    .account_manager()
452                    .find_target_information(local_cid, uname.clone())
453                    .await?
454                    .map(|r| r.1.cid)
455                    .unwrap_or(0);
456                Ok(SymmetricIdentifierHandleRef {
457                    user: VirtualTargetType::LocalGroupPeer {
458                        session_cid: local_cid,
459                        peer_cid,
460                    },
461                    remote: self.remote_ref(),
462                    target_username: Some(uname),
463                })
464            }
465        }
466    }
467
468    /// Returns a list of local group peers on the network for local_user. May or may not be registered to the user. To get a list of registered users to local_user, run [`Self::get_local_group_mutual_peers`]
469    /// - limit: if None, all peers are obtained. If Some, at most the specified number of peers will be obtained
470    async fn get_local_group_peers<T: Into<UserIdentifier> + Send>(
471        &self,
472        local_user: T,
473        limit: Option<usize>,
474    ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
475        let local_cid = self.get_session_cid(local_user).await?;
476        let command = NodeRequest::PeerCommand(PeerCommand {
477            session_cid: local_cid,
478            command: PeerSignal::GetRegisteredPeers {
479                peer_conn_type: ClientConnectionType::Server {
480                    session_cid: local_cid,
481                },
482                response: None,
483                limit: limit.map(|r| r as i32),
484            },
485        });
486
487        let mut stream = self.send_callback_subscription(command).await?;
488
489        while let Some(status) = stream.next().await {
490            if let NodeResult::PeerEvent(PeerEvent {
491                event:
492                    PeerSignal::GetRegisteredPeers {
493                        peer_conn_type: _,
494                        response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
495                        limit: _,
496                    },
497                ticket: _,
498                ..
499            }) = status.into_result()?
500            {
501                return Ok(peer_info
502                    .into_iter()
503                    .zip(is_onlines.into_iter())
504                    .filter_map(|(peer_info, is_online)| {
505                        peer_info.map(|info| LocalGroupPeerFullInfo {
506                            cid: info.cid,
507                            username: Some(info.username),
508                            full_name: Some(info.full_name),
509                            is_online,
510                        })
511                    })
512                    .collect());
513            }
514        }
515
516        Err(NetworkError::InternalError(
517            "Internal kernel stream died (get_local_group_peers)",
518        ))
519    }
520
521    /// Returns a list of mutually-registered peers with the local_user
522    async fn get_local_group_mutual_peers<T: Into<UserIdentifier> + Send>(
523        &self,
524        local_user: T,
525    ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
526        let local_cid = self.get_session_cid(local_user).await?;
527        let command = NodeRequest::PeerCommand(PeerCommand {
528            session_cid: local_cid,
529            command: PeerSignal::GetMutuals {
530                v_conn_type: ClientConnectionType::Server {
531                    session_cid: local_cid,
532                },
533                response: None,
534            },
535        });
536
537        let mut stream = self.send_callback_subscription(command).await?;
538
539        while let Some(status) = stream.next().await {
540            if let NodeResult::PeerEvent(PeerEvent {
541                event:
542                    PeerSignal::GetMutuals {
543                        v_conn_type: _,
544                        response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
545                    },
546                ticket: _,
547                ..
548            }) = status.into_result()?
549            {
550                return Ok(peer_info
551                    .into_iter()
552                    .zip(is_onlines.into_iter())
553                    .filter_map(|(peer_info, is_online)| {
554                        peer_info.map(|info| LocalGroupPeerFullInfo {
555                            cid: info.cid,
556                            username: Some(info.username),
557                            full_name: Some(info.full_name),
558                            is_online,
559                        })
560                    })
561                    .collect());
562            }
563        }
564
565        Err(NetworkError::msg("Stream died"))
566    }
567
568    /// Returns all the active sessions in the protocol, including all P2P connections hierarchically placed as children to C2S
569    /// connections
570    async fn sessions(&self) -> Result<ActiveSessions, NetworkError> {
571        match self
572            .send_callback_subscription(NodeRequest::GetActiveSessions)
573            .await
574        {
575            Ok(mut stream) => {
576                while let Some(result) = stream.next().await {
577                    match result.into_result()? {
578                        NodeResult::SessionList(res) => return Ok(res.sessions),
579                        res => {
580                            citadel_logging::warn!("Received unexpected result: {res:?}");
581                        }
582                    }
583                }
584
585                citadel_logging::warn!("Failed to receive response from SDK (stream died)");
586                return Err(NetworkError::InternalError(
587                    "Internal kernel stream died (get_active_sessions)",
588                ));
589            }
590            Err(e) => {
591                citadel_logging::warn!(target: "citadel", "Failed to query SDK sessions: {e}");
592            }
593        }
594
595        Err(NetworkError::msg("Failed to query SDK sessions"))
596    }
597
598    #[doc(hidden)]
599    fn remote_ref(&self) -> &NodeRemote<R>;
600
601    #[doc(hidden)]
602    async fn get_session_cid<T: Into<UserIdentifier> + Send>(
603        &self,
604        local_user: T,
605    ) -> Result<u64, NetworkError> {
606        let account_manager = self.account_manager();
607        Ok(account_manager
608            .find_local_user_information(local_user)
609            .await?
610            .ok_or(NetworkError::InvalidRequest("User does not exist"))?)
611    }
612}
613
614impl<R: Ratchet> ProtocolRemoteExt<R> for NodeRemote<R> {
615    fn remote_ref(&self) -> &NodeRemote<R> {
616        self
617    }
618}
619
620impl<R: Ratchet> ProtocolRemoteExt<R> for ClientServerRemote<R> {
621    fn remote_ref(&self) -> &NodeRemote<R> {
622        &self.inner
623    }
624}
625
626#[async_trait]
627/// Some functions require that a target exists
628pub trait ProtocolRemoteTargetExt<R: Ratchet>: TargetLockedRemote<R> {
629    /// Sends a file with a custom size. The smaller the chunks, the higher the degree of scrambling, but the higher the performance cost. A chunk size of zero will use the default
630    async fn send_file_with_custom_opts<T: ObjectSource>(
631        &self,
632        source: T,
633        chunk_size: usize,
634        transfer_type: TransferType,
635    ) -> Result<(), NetworkError> {
636        let chunk_size = if chunk_size == 0 {
637            None
638        } else {
639            Some(chunk_size)
640        };
641        let session_cid = self.user().get_session_cid();
642        let user = *self.user();
643        let remote = self.remote();
644
645        let mut stream = remote
646            .send_callback_subscription(NodeRequest::SendObject(SendObject {
647                source: Box::new(source),
648                chunk_size,
649                session_cid,
650                v_conn_type: user,
651                transfer_type,
652            }))
653            .await?;
654
655        while let Some(event) = stream.next().await {
656            match event.into_result()? {
657                NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
658                    return handle
659                        .transfer_file()
660                        .await
661                        .map_err(|err| NetworkError::Generic(err.into_string()));
662                }
663
664                NodeResult::PeerEvent(PeerEvent {
665                    event: PeerSignal::SignalReceived { .. },
666                    ..
667                }) => {}
668
669                res => {
670                    log::warn!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {res:?}")
671                }
672            }
673        }
674
675        Err(NetworkError::InternalError("File transfer stream died"))
676    }
677
678    /// Sends a file to the provided target using the default chunking size
679    async fn send_file<T: ObjectSource>(&self, source: T) -> Result<(), NetworkError> {
680        self.send_file_with_custom_opts(source, 0, TransferType::FileTransfer)
681            .await
682    }
683
684    /// Sends a file to the provided target using custom chunking size with local encryption.
685    /// Only this local node may decrypt the information send to the adjacent node.
686    async fn remote_encrypted_virtual_filesystem_push_custom_chunking<
687        T: ObjectSource,
688        P: Into<PathBuf> + Send,
689    >(
690        &self,
691        source: T,
692        virtual_directory: P,
693        chunk_size: usize,
694        security_level: SecurityLevel,
695    ) -> Result<(), NetworkError> {
696        self.can_use_revfs()?;
697        let mut virtual_path = virtual_directory.into();
698        virtual_path = prepare_virtual_path(virtual_path);
699        validate_virtual_path(&virtual_path)
700            .map_err(|err| NetworkError::Generic(err.into_string()))?;
701        let tx_type = TransferType::RemoteEncryptedVirtualFilesystem {
702            virtual_path,
703            security_level,
704        };
705        self.send_file_with_custom_opts(source, chunk_size, tx_type)
706            .await
707    }
708
709    /// Sends a file to the provided target using the default chunking size with local encryption.
710    /// Only this local node may decrypt the information send to the adjacent node.
711    async fn remote_encrypted_virtual_filesystem_push<T: ObjectSource, P: Into<PathBuf> + Send>(
712        &self,
713        source: T,
714        virtual_directory: P,
715        security_level: SecurityLevel,
716    ) -> Result<(), NetworkError> {
717        self.remote_encrypted_virtual_filesystem_push_custom_chunking(
718            source,
719            virtual_directory,
720            0,
721            security_level,
722        )
723        .await
724    }
725
726    /// Pulls a virtual file from the RE-VFS. If `delete_on_pull` is true, then, the virtual file
727    /// will be taken from the RE-VFS
728    async fn remote_encrypted_virtual_filesystem_pull<P: Into<PathBuf> + Send>(
729        &self,
730        virtual_directory: P,
731        transfer_security_level: SecurityLevel,
732        delete_on_pull: bool,
733    ) -> Result<PathBuf, NetworkError> {
734        self.can_use_revfs()?;
735        let request = NodeRequest::PullObject(PullObject {
736            v_conn: *self.user(),
737            virtual_dir: virtual_directory.into(),
738            delete_on_pull,
739            transfer_security_level,
740        });
741
742        let mut stream = self.remote().send_callback_subscription(request).await?;
743
744        while let Some(event) = stream.next().await {
745            match event.into_result()? {
746                NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
747                    return handle
748                        .receive_file()
749                        .await
750                        .map_err(|err| NetworkError::Generic(err.into_string()));
751                }
752
753                NodeResult::PeerEvent(PeerEvent {
754                    event: PeerSignal::SignalReceived { .. },
755                    ..
756                }) => {}
757
758                res => {
759                    log::error!(target: "citadel", "Invalid NodeResult for REVFS FileTransfer request received: {res:?}");
760                    return Err(NetworkError::InternalError(
761                        "Received invalid response from protocol",
762                    ));
763                }
764            }
765        }
766
767        Err(NetworkError::InternalError(
768            "REVFS File transfer stream died",
769        ))
770    }
771
772    /// Deletes the file from the RE-VFS. If the contents are desired on delete,
773    /// consider calling `Self::remote_encrypted_virtual_filesystem_pull` with the delete
774    /// parameter set to true
775    async fn remote_encrypted_virtual_filesystem_delete<P: Into<PathBuf> + Send>(
776        &self,
777        virtual_directory: P,
778    ) -> Result<(), NetworkError> {
779        self.can_use_revfs()?;
780        let request = NodeRequest::DeleteObject(DeleteObject {
781            v_conn: *self.user(),
782            virtual_dir: virtual_directory.into(),
783            security_level: Default::default(),
784        });
785
786        let mut stream = self.remote().send_callback_subscription(request).await?;
787        while let Some(event) = stream.next().await {
788            match event.into_result()? {
789                NodeResult::ReVFS(result) => {
790                    return if let Some(error) = result.error_message {
791                        Err(NetworkError::Generic(error))
792                    } else {
793                        Ok(())
794                    }
795                }
796
797                evt => {
798                    log::error!(target: "citadel", "Invalid NodeResult for REVFS Delete request received: {evt:?}");
799                }
800            }
801        }
802
803        Err(NetworkError::InternalError("REVFS Delete stream died"))
804    }
805
806    /// Connects to the peer with custom settings
807    async fn connect_to_peer_custom(
808        &self,
809        session_security_settings: SessionSecuritySettings,
810        udp_mode: UdpMode,
811        peer_session_password: Option<PreSharedKey>,
812    ) -> Result<PeerConnectSuccess<R>, NetworkError> {
813        use std::time::Duration;
814
815        // Timeout for the entire P2P connection process.
816        // This prevents indefinite hangs when the server never responds with PeerChannelCreated.
817        // The timeout should be long enough for normal hole punching (which has its own 30s timeout)
818        // plus key exchange, but short enough to fail fast when something is stuck.
819        const P2P_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
820
821        let session_cid = self.user().get_session_cid();
822        let peer_target = self.try_as_peer_connection().await?;
823
824        let mut stream = self
825            .remote()
826            .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
827                session_cid,
828                command: PeerSignal::PostConnect {
829                    peer_conn_type: peer_target,
830                    ticket_opt: None,
831                    invitee_response: None,
832                    session_security_settings,
833                    udp_mode,
834                    session_password: peer_session_password,
835                },
836            }))
837            .await?;
838
839        let connect_task = async {
840            while let Some(status) = stream.next().await {
841                match status.into_result()? {
842                    NodeResult::PeerChannelCreated(PeerChannelCreated {
843                        ticket: _,
844                        channel,
845                        udp_rx_opt,
846                    }) => {
847                        let username = self.target_username().map(ToString::to_string);
848                        let remote = PeerRemote {
849                            inner: self.remote().clone(),
850                            peer: peer_target.as_virtual_connection(),
851                            username,
852                            session_security_settings,
853                        };
854
855                        return Ok(PeerConnectSuccess {
856                            remote,
857                            channel: *channel,
858                            udp_channel_rx: udp_rx_opt,
859                            incoming_object_transfer_handles: None,
860                        });
861                    }
862
863                    NodeResult::PeerEvent(PeerEvent {
864                        event:
865                            PeerSignal::PostConnect {
866                                invitee_response, ..
867                            },
868                        ..
869                    }) => match invitee_response {
870                        Some(PeerResponse::Timeout) => {
871                            return Err(NetworkError::msg("Peer did not respond in time"))
872                        }
873                        Some(PeerResponse::Decline) => {
874                            return Err(NetworkError::msg("Peer declined to connect"))
875                        }
876                        _ => {}
877                    },
878
879                    _ => {}
880                }
881            }
882
883            Err(NetworkError::InternalError(
884                "Internal kernel stream died (connect_to_peer_custom)",
885            ))
886        };
887
888        match citadel_io::tokio::time::timeout(P2P_CONNECT_TIMEOUT, connect_task).await {
889            Ok(result) => result,
890            Err(_elapsed) => Err(NetworkError::msg(format!(
891                "P2P connection timed out after {}s waiting for PeerChannelCreated",
892                P2P_CONNECT_TIMEOUT.as_secs()
893            ))),
894        }
895    }
896
897    /// Connects to the target peer with default settings
898    async fn connect_to_peer(&self) -> Result<PeerConnectSuccess<R>, NetworkError> {
899        self.connect_to_peer_custom(Default::default(), Default::default(), Default::default())
900            .await
901    }
902
903    /// Posts a registration request to a peer
904    async fn register_to_peer(&self) -> Result<PeerRegisterStatus, NetworkError> {
905        let session_cid = self.user().get_session_cid();
906        let peer_target = self.try_as_peer_connection().await?;
907        // TODO: Get rid of this step. Should be handled by the protocol
908        let local_username = self
909            .remote()
910            .account_manager()
911            .get_username_by_cid(session_cid)
912            .await?
913            .ok_or_else(|| NetworkError::msg("Unable to find username for local user"))?;
914        let peer_username_opt = self.target_username().map(ToString::to_string);
915
916        let mut stream = self
917            .remote()
918            .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
919                session_cid,
920                command: PeerSignal::PostRegister {
921                    peer_conn_type: peer_target,
922                    inviter_username: local_username,
923                    invitee_username: peer_username_opt,
924                    ticket_opt: None,
925                    invitee_response: None,
926                },
927            }))
928            .await?;
929
930        while let Some(status) = stream.next().await {
931            if let NodeResult::PeerEvent(PeerEvent {
932                event:
933                    PeerSignal::PostRegister {
934                        peer_conn_type: _,
935                        inviter_username: _,
936                        invitee_username: _,
937                        ticket_opt: _,
938                        invitee_response: Some(resp),
939                    },
940                ticket: _,
941                ..
942            }) = status.into_result()?
943            {
944                match resp {
945                    PeerResponse::Accept(..) => return Ok(PeerRegisterStatus::Accepted),
946                    PeerResponse::Decline => return Ok(PeerRegisterStatus::Declined),
947                    PeerResponse::Timeout => return Ok(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) }),
948                    _ => {}
949                }
950            }
951        }
952
953        Err(NetworkError::Generic(format!(
954            "Internal kernel stream died (register_to_peer): {:?}",
955            stream.callback_key()
956        )))
957    }
958
959    /// Deregisters the currently locked target. If the target is a client to server
960    /// connection, deregisters from the server. If the target is a p2p connection,
961    /// deregisters the p2p
962    async fn deregister(&self) -> Result<(), NetworkError> {
963        if let Ok(peer_conn) = self.try_as_peer_connection().await {
964            let peer_request = PeerSignal::Deregister {
965                peer_conn_type: peer_conn,
966            };
967            let session_cid = self.user().get_session_cid();
968            let request = NodeRequest::PeerCommand(PeerCommand {
969                session_cid,
970                command: peer_request,
971            });
972
973            let mut subscription = self.remote().send_callback_subscription(request).await?;
974            while let Some(result) = subscription.next().await {
975                if let NodeResult::PeerEvent(PeerEvent {
976                    event: PeerSignal::DeregistrationSuccess { .. },
977                    ticket: _,
978                    ..
979                }) = result.into_result()?
980                {
981                    return Ok(());
982                }
983            }
984        } else {
985            // c2s conn
986            let cid = self.user().get_session_cid();
987            let request = NodeRequest::DeregisterFromHypernode(DeregisterFromHypernode {
988                session_cid: cid,
989                v_conn_type: *self.user(),
990            });
991            let mut subscription = self.remote().send_callback_subscription(request).await?;
992            while let Some(result) = subscription.next().await {
993                match result.into_result()? {
994                    NodeResult::DeRegistration(DeRegistration {
995                        session_cid: _,
996                        ticket_opt: _,
997                        success: true,
998                    }) => return Ok(()),
999                    NodeResult::DeRegistration(DeRegistration {
1000                        session_cid: _,
1001                        ticket_opt: _,
1002                        success: false,
1003                    }) => {
1004                        return Err(NetworkError::msg(
1005                            "Unable to deregister: status=false".to_string(),
1006                        ))
1007                    }
1008
1009                    _ => {}
1010                }
1011            }
1012        }
1013
1014        Err(NetworkError::InternalError("Deregister ended unexpectedly"))
1015    }
1016
1017    async fn disconnect(&self) -> Result<(), NetworkError> {
1018        if let Ok(peer_conn) = self.try_as_peer_connection().await {
1019            if let PeerConnectionType::LocalGroupPeer {
1020                session_cid,
1021                peer_cid: _,
1022            } = peer_conn
1023            {
1024                let request = NodeRequest::PeerCommand(PeerCommand {
1025                    session_cid,
1026                    command: PeerSignal::Disconnect {
1027                        peer_conn_type: peer_conn,
1028                        disconnect_response: None,
1029                    },
1030                });
1031
1032                let mut subscription = self.remote().send_callback_subscription(request).await?;
1033
1034                while let Some(event) = subscription.next().await {
1035                    if let NodeResult::PeerEvent(PeerEvent {
1036                        event:
1037                            PeerSignal::Disconnect {
1038                                peer_conn_type: _,
1039                                disconnect_response: Some(_),
1040                            },
1041                        ticket: _,
1042                        ..
1043                    }) = event.into_result()?
1044                    {
1045                        return Ok(());
1046                    }
1047                }
1048
1049                Err(NetworkError::InternalError(
1050                    "Unable to receive valid disconnect event",
1051                ))
1052            } else {
1053                Err(NetworkError::msg(
1054                    "External group peer functionality not enabled",
1055                ))
1056            }
1057        } else {
1058            //c2s conn
1059            let cid = self.user().get_session_cid();
1060            let request =
1061                NodeRequest::DisconnectFromHypernode(DisconnectFromHypernode { session_cid: cid });
1062
1063            let mut subscription = self.remote().send_callback_subscription(request).await?;
1064            while let Some(event) = subscription.next().await {
1065                if let NodeResult::Disconnect(Disconnect {
1066                    success, message, ..
1067                }) = event.into_result()?
1068                {
1069                    return if success {
1070                        Ok(())
1071                    } else {
1072                        Err(NetworkError::msg(message))
1073                    };
1074                }
1075            }
1076
1077            Err(NetworkError::InternalError(
1078                "Unable to receive valid disconnect event",
1079            ))
1080        }
1081    }
1082
1083    async fn create_group(
1084        &self,
1085        initial_users_to_invite: Option<Vec<UserIdentifier>>,
1086    ) -> Result<GroupChannel, NetworkError> {
1087        let session_cid = self.user().get_session_cid();
1088
1089        let mut initial_users = vec![];
1090        // TODO: allow for custom message group options. For now, don't
1091        let options = MessageGroupOptions::default();
1092        // TODO/NOTE: default is PRIVATE mode, meaning all users in group must be registered to the owner
1093        // in the future, allow for private/public modes by adjusting the below. Initial users should be
1094        // a UserIdentifier
1095        if let Some(initial_users_to_invite) = initial_users_to_invite {
1096            for user in initial_users_to_invite {
1097                initial_users.push(
1098                    self.remote()
1099                        .account_manager()
1100                        .find_target_information(session_cid, user.clone())
1101                        .await
1102                        .map_err(|err| NetworkError::msg(err.into_string()))?
1103                        .ok_or_else(|| {
1104                            NetworkError::msg(format!(
1105                                "Account {user:?} not found for local user {session_cid:?}"
1106                            ))
1107                        })
1108                        .map(|r| r.1.cid)?,
1109                )
1110            }
1111        }
1112
1113        let group_request = GroupBroadcast::Create {
1114            initial_invitees: initial_users,
1115            options,
1116        };
1117        let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1118            session_cid,
1119            command: group_request,
1120        });
1121        let mut subscription = self.remote().send_callback_subscription(request).await?;
1122        while let Some(evt) = subscription.next().await {
1123            if let NodeResult::GroupChannelCreated(GroupChannelCreated {
1124                ticket: _,
1125                channel,
1126                session_cid: _,
1127            }) = evt
1128            {
1129                return Ok(channel);
1130            }
1131        }
1132
1133        Err(NetworkError::InternalError(
1134            "Create_group ended unexpectedly",
1135        ))
1136    }
1137
1138    /// Lists all groups that which the current peer owns
1139    async fn list_owned_groups(&self) -> Result<Vec<MessageGroupKey>, NetworkError> {
1140        let session_cid = self.user().get_session_cid();
1141        let cid_to_check_for = match self.try_as_peer_connection().await {
1142            Ok(res) => res.get_original_target_cid(),
1143            _ => session_cid,
1144        };
1145        let group_request = GroupBroadcast::ListGroupsFor {
1146            cid: cid_to_check_for,
1147        };
1148        let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1149            session_cid,
1150            command: group_request,
1151        });
1152
1153        let mut subscription = self.remote().send_callback_subscription(request).await?;
1154
1155        while let Some(evt) = subscription.next().await {
1156            if let NodeResult::GroupEvent(GroupEvent {
1157                session_cid: _,
1158                ticket: _,
1159                event: GroupBroadcast::ListResponse { groups },
1160            }) = evt.into_result()?
1161            {
1162                return Ok(groups);
1163            }
1164        }
1165
1166        Err(NetworkError::InternalError(
1167            "List_members ended unexpectedly",
1168        ))
1169    }
1170
1171    /// Lists all active sessions, including the local nat type. For each active session,
1172    /// lists each connection info which includes the remote nat type, connection status,
1173    /// peer id, and latest ratchet version.
1174    async fn list_sessions(&self) -> Result<ActiveSessions, NetworkError> {
1175        let request = NodeRequest::GetActiveSessions;
1176        let mut subscription = self.remote().send_callback_subscription(request).await?;
1177
1178        if let Some(NodeResult::SessionList(result)) = subscription.next().await {
1179            return Ok(result.sessions);
1180        }
1181
1182        Err(NetworkError::InternalError(
1183            "List_sessions ended unexpectedly",
1184        ))
1185    }
1186
1187    /// Begins a re-key, updating the container in the process.
1188    /// Returns the new key matrix version. Does not return the new key version
1189    /// if the rekey fails, or, if a current rekey is already executing
1190    async fn rekey(&self) -> Result<Option<u32>, NetworkError> {
1191        let request = NodeRequest::ReKey(ReKey {
1192            v_conn_type: *self.user(),
1193        });
1194        let mut subscription = self.remote().send_callback_subscription(request).await?;
1195
1196        while let Some(evt) = subscription.next().await {
1197            if let NodeResult::ReKeyResult(result) = evt {
1198                return match result.status {
1199                    ReKeyReturnType::Success { version } => Ok(Some(version)),
1200                    ReKeyReturnType::AlreadyInProgress => Ok(None),
1201                    ReKeyReturnType::Failure { err } => {
1202                        Err(NetworkError::Generic(format!("Rekey failed: {err}")))
1203                    }
1204                };
1205            }
1206        }
1207
1208        Err(NetworkError::InternalError("Rekey ended unexpectedly"))
1209    }
1210
1211    /// Checks if the locked target is registered
1212    async fn is_peer_registered(&self) -> Result<bool, NetworkError> {
1213        let target = self.try_as_peer_connection().await?;
1214        if let PeerConnectionType::LocalGroupPeer {
1215            session_cid: local_cid,
1216            peer_cid,
1217        } = target
1218        {
1219            let peers = self.remote().get_local_group_peers(local_cid, None).await?;
1220            citadel_logging::info!(target: "citadel", "Checking to see if {target} is registered in {peers:?}");
1221            Ok(peers.iter().any(|p| p.cid == peer_cid))
1222        } else {
1223            Err(NetworkError::Generic(
1224                "External group peers are not supported yet".to_string(),
1225            ))
1226        }
1227    }
1228
1229    #[doc(hidden)]
1230    async fn try_as_peer_connection(&self) -> Result<PeerConnectionType, NetworkError> {
1231        let verified_return = |user: &VirtualTargetType| {
1232            user.try_as_peer_connection()
1233                .ok_or(NetworkError::InvalidRequest("Target is not a peer"))
1234        };
1235
1236        if self.user().get_target_cid() == 0 {
1237            // in this case, the user re-used a remote locked to a registration target
1238            // where the username was provided, but the cid was 0 (unknown).
1239            let peer_username = self
1240                .target_username()
1241                .ok_or_else(|| NetworkError::msg("target_cid=0, yet, no username was provided"))?;
1242            let session_cid = self.user().get_session_cid();
1243            let expected_peer_cid = self
1244                .remote()
1245                .account_manager()
1246                .get_persistence_handler()
1247                .get_cid_by_username(peer_username);
1248            // get the peer cid from the account manager (implying the peers are already registered).
1249            // fallback to the mapped cid if the peer is not registered
1250            let peer_cid = self
1251                .remote()
1252                .account_manager()
1253                .find_target_information(session_cid, peer_username)
1254                .await
1255                .map_err(|err| NetworkError::Generic(err.into_string()))?
1256                .map(|r| r.1.cid)
1257                .unwrap_or(expected_peer_cid);
1258
1259            let mut user = *self.user();
1260            user.set_target_cid(peer_cid);
1261            verified_return(&user)
1262        } else {
1263            verified_return(self.user())
1264        }
1265    }
1266
1267    #[doc(hidden)]
1268    fn can_use_revfs(&self) -> Result<(), NetworkError> {
1269        if let Some(sess) = self.session_security_settings() {
1270            if sess.crypto_params.kem_algorithm == KemAlgorithm::Kyber {
1271                Ok(())
1272            } else {
1273                Err(NetworkError::InvalidRequest(
1274                    "RE-VFS can only be used with Kyber KEM",
1275                ))
1276            }
1277        } else {
1278            Err(NetworkError::InvalidRequest(
1279                "RE-VFS cannot be used with this remote type",
1280            ))
1281        }
1282    }
1283}
1284
1285impl<T: TargetLockedRemote<R>, R: Ratchet> ProtocolRemoteTargetExt<R> for T {}
1286
1287pub mod results {
1288    use crate::prefabs::client::peer_connection::FileTransferHandleRx;
1289    use crate::prelude::{PeerChannel, UdpChannel};
1290    use crate::remote_ext::remote_specialization::PeerRemote;
1291    use citadel_io::tokio::sync::oneshot::Receiver;
1292    use citadel_proto::prelude::*;
1293    use std::fmt::Debug;
1294
1295    pub struct PeerConnectSuccess<R: Ratchet> {
1296        pub channel: PeerChannel<R>,
1297        pub udp_channel_rx: Option<Receiver<UdpChannel<R>>>,
1298        pub remote: PeerRemote<R>,
1299        /// Receives incoming file/object transfer requests. The handles must be
1300        /// .accepted() before the file/object transfer is allowed to proceed
1301        pub(crate) incoming_object_transfer_handles: Option<FileTransferHandleRx>,
1302    }
1303
1304    impl<R: Ratchet> Debug for PeerConnectSuccess<R> {
1305        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1306            f.debug_struct("PeerConnectSuccess")
1307                .field("channel", &self.channel)
1308                .field("udp_channel_rx", &self.udp_channel_rx)
1309                .finish()
1310        }
1311    }
1312
1313    impl<R: Ratchet> PeerConnectSuccess<R> {
1314        /// Obtains a receiver which yields incoming file/object transfer handles
1315        pub fn get_incoming_file_transfer_handle(
1316            &mut self,
1317        ) -> Result<FileTransferHandleRx, NetworkError> {
1318            self.incoming_object_transfer_handles
1319                .take()
1320                .ok_or(NetworkError::InternalError(
1321                    "This function has already been called",
1322                ))
1323        }
1324    }
1325
1326    pub enum PeerRegisterStatus {
1327        Accepted,
1328        Declined,
1329        Failed { reason: Option<String> },
1330    }
1331
1332    #[derive(Clone, Debug)]
1333    pub struct LocalGroupPeer {
1334        pub cid: u64,
1335        pub is_online: bool,
1336    }
1337
1338    #[derive(Clone, Debug)]
1339    pub struct LocalGroupPeerFullInfo {
1340        pub cid: u64,
1341        pub username: Option<String>,
1342        pub full_name: Option<String>,
1343        pub is_online: bool,
1344    }
1345}
1346
1347pub mod remote_specialization {
1348    use crate::prelude::*;
1349    use std::ops::{Deref, DerefMut};
1350
1351    #[derive(Debug, Clone)]
1352    pub struct PeerRemote<R: Ratchet> {
1353        pub(crate) inner: NodeRemote<R>,
1354        pub(crate) peer: VirtualTargetType,
1355        pub(crate) username: Option<String>,
1356        pub(crate) session_security_settings: SessionSecuritySettings,
1357    }
1358
1359    impl<R: Ratchet> Deref for PeerRemote<R> {
1360        type Target = NodeRemote<R>;
1361        fn deref(&self) -> &Self::Target {
1362            &self.inner
1363        }
1364    }
1365
1366    impl<R: Ratchet> DerefMut for PeerRemote<R> {
1367        fn deref_mut(&mut self) -> &mut Self::Target {
1368            &mut self.inner
1369        }
1370    }
1371
1372    impl<R: Ratchet> TargetLockedRemote<R> for PeerRemote<R> {
1373        fn user(&self) -> &VirtualTargetType {
1374            &self.peer
1375        }
1376        fn remote(&self) -> &NodeRemote<R> {
1377            &self.inner
1378        }
1379        fn target_username(&self) -> Option<&str> {
1380            self.username.as_deref()
1381        }
1382        fn user_mut(&mut self) -> &mut VirtualTargetType {
1383            &mut self.peer
1384        }
1385
1386        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
1387            Some(&self.session_security_settings)
1388        }
1389    }
1390}
1391
1392#[cfg(test)]
1393mod tests {
1394    use crate::prefabs::client::single_connection::SingleClientServerConnectionKernel;
1395    use crate::prefabs::client::DefaultServerConnectionSettingsBuilder;
1396    use crate::prelude::*;
1397    use citadel_io::tokio;
1398    use rstest::rstest;
1399    use std::net::SocketAddr;
1400    use std::sync::atomic::{AtomicBool, Ordering};
1401    use std::sync::Arc;
1402    use uuid::Uuid;
1403
1404    pub struct ReceiverFileTransferKernel<R: Ratchet>(
1405        pub Option<NodeRemote<R>>,
1406        pub Arc<AtomicBool>,
1407    );
1408
1409    #[async_trait]
1410    impl<R: Ratchet> NetKernel<R> for ReceiverFileTransferKernel<R> {
1411        fn load_remote(&mut self, node_remote: NodeRemote<R>) -> Result<(), NetworkError> {
1412            self.0 = Some(node_remote);
1413            Ok(())
1414        }
1415
1416        async fn on_start(&self) -> Result<(), NetworkError> {
1417            Ok(())
1418        }
1419
1420        async fn on_node_event_received(&self, message: NodeResult<R>) -> Result<(), NetworkError> {
1421            log::trace!(target: "citadel", "SERVER received {:?}", message);
1422            if let NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) =
1423                message.into_result()?
1424            {
1425                let mut path = None;
1426                // accept the transfer
1427                handle
1428                    .accept()
1429                    .map_err(|err| NetworkError::msg(err.into_string()))?;
1430
1431                use citadel_types::proto::ObjectTransferStatus;
1432                use futures::StreamExt;
1433                while let Some(status) = handle.next().await {
1434                    match status {
1435                        ObjectTransferStatus::ReceptionComplete => {
1436                            log::trace!(target: "citadel", "Server has finished receiving the file!");
1437                            let cmp = include_bytes!("../../resources/TheBridge.pdf");
1438                            let streamed_data = citadel_io::tokio::fs::read(path.clone().unwrap())
1439                                .await
1440                                .unwrap();
1441                            assert_eq!(
1442                                cmp,
1443                                streamed_data.as_slice(),
1444                                "Original data and streamed data does not match"
1445                            );
1446
1447                            self.1.store(true, Ordering::Relaxed);
1448                            self.0.clone().unwrap().shutdown().await?;
1449                        }
1450
1451                        ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => {
1452                            path = Some(file_path);
1453                            assert_eq!(vfm.name, "TheBridge.pdf")
1454                        }
1455
1456                        _ => {}
1457                    }
1458                }
1459            }
1460
1461            Ok(())
1462        }
1463
1464        async fn on_stop(&mut self) -> Result<(), NetworkError> {
1465            Ok(())
1466        }
1467    }
1468
1469    pub fn server_info<'a, R: Ratchet>(
1470        switch: Arc<AtomicBool>,
1471    ) -> (NodeFuture<'a, ReceiverFileTransferKernel<R>>, SocketAddr) {
1472        crate::test_common::server_test_node(ReceiverFileTransferKernel(None, switch), |_| {})
1473    }
1474
1475    #[rstest]
1476    #[case(
1477        EncryptionAlgorithm::AES_GCM_256,
1478        KemAlgorithm::Kyber,
1479        SigAlgorithm::None
1480    )]
1481    #[case(
1482        EncryptionAlgorithm::KyberHybrid,
1483        KemAlgorithm::Kyber,
1484        SigAlgorithm::Dilithium65
1485    )]
1486    #[timeout(std::time::Duration::from_secs(90))]
1487    #[tokio::test]
1488    async fn test_c2s_file_transfer(
1489        #[case] enx: EncryptionAlgorithm,
1490        #[case] kem: KemAlgorithm,
1491        #[case] sig: SigAlgorithm,
1492    ) {
1493        citadel_logging::setup_log();
1494        let client_success = &AtomicBool::new(false);
1495        let server_success = &Arc::new(AtomicBool::new(false));
1496        let (server, server_addr) = server_info::<StackedRatchet>(server_success.clone());
1497        let uuid = Uuid::new_v4();
1498
1499        let session_security_settings = SessionSecuritySettingsBuilder::default()
1500            .with_crypto_params(enx + kem + sig)
1501            .build()
1502            .unwrap();
1503
1504        let server_connection_settings =
1505            DefaultServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid)
1506                .with_session_security_settings(session_security_settings)
1507                .disable_udp()
1508                .build()
1509                .unwrap();
1510
1511        let client_kernel = SingleClientServerConnectionKernel::new(
1512            server_connection_settings,
1513            |connection| async move {
1514                log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***");
1515                connection
1516                    .send_file_with_custom_opts(
1517                        "../resources/TheBridge.pdf",
1518                        32 * 1024,
1519                        TransferType::FileTransfer,
1520                    )
1521                    .await
1522                    .unwrap();
1523                log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***");
1524                client_success.store(true, Ordering::Relaxed);
1525                connection.shutdown_kernel().await
1526            },
1527        );
1528
1529        let client = DefaultNodeBuilder::default().build(client_kernel).unwrap();
1530
1531        let joined = futures::future::try_join(server, client);
1532
1533        let _ = joined.await.unwrap();
1534
1535        assert!(client_success.load(Ordering::Relaxed));
1536        assert!(server_success.load(Ordering::Relaxed));
1537    }
1538}