citadel_sdk/
remote_ext.rs

1//! Remote Protocol Extensions
2//!
3//! This module extends the core NodeRemote functionality with high-level operations
4//! for managing connections, file transfers, and peer interactions in the Citadel
5//! Protocol network.
6//!
7//! # Features
8//! - User registration and authentication
9//! - Connection management
10//! - File transfer operations
11//! - Virtual filesystem support
12//! - Peer discovery and management
13//! - Group communication
14//! - Security settings configuration
15//!
16//! # Example
17//! ```rust
18//! use citadel_sdk::prelude::*;
19//!
20//! async fn example<R: Ratchet>(remote: NodeRemote<R>) -> Result<(), NetworkError> {
21//!     // Register a new user
22//!     let reg = remote.register_with_defaults(
23//!         "127.0.0.1:25021",
24//!         "John Doe",
25//!         "john.doe",
26//!         "password123"
27//!     ).await?;
28//!
29//!     // Connect to a peer
30//!     let auth = AuthenticationRequest::credentialed("john.doe", "password123");
31//!     let conn = remote.connect_with_defaults(auth).await?;
32//!
33//!     // Send a file to a peer
34//!     remote.find_target("john.doe", "peer.name")
35//!         .await?
36//!         .send_file("/path/to/file.txt")
37//!         .await?;
38//!
39//!     Ok(())
40//! }
41//! ```
42//!
43//! # Important Notes
44//! - All operations are asynchronous
45//! - Connections are automatically managed
46//! - File transfers support chunking
47//! - Virtual filesystem is encrypted
48//! - Peer connections require mutual registration
49//!
50//! # Related Components
51//! - [`NodeRemote`]: Core remote interface
52//! - [`ClientServerRemote`]: Client-server communication
53//! - [`PeerRemote`]: Peer-to-peer communication
54//! - [`CitadelClientServerConnection`]: Connection management
55//! - [`RegisterSuccess`]: Registration handling
56//!
57//! [`NodeRemote`]: crate::prelude::NodeRemote
58//! [`ClientServerRemote`]: crate::prelude::ClientServerRemote
59//! [`PeerRemote`]: crate::prelude::PeerRemote
60//! [`CitadelClientServerConnection`]: crate::prelude::CitadelClientServerConnection
61//! [`RegisterSuccess`]: crate::prelude::RegisterSuccess
62
63use crate::prefabs::ClientServerRemote;
64use crate::prelude::results::{PeerConnectSuccess, PeerRegisterStatus};
65use crate::prelude::*;
66use crate::remote_ext::remote_specialization::PeerRemote;
67use crate::remote_ext::results::LocalGroupPeerFullInfo;
68use std::ops::{Deref, DerefMut};
69
70use futures::StreamExt;
71use std::path::PathBuf;
72use std::time::Duration;
73
74pub(crate) mod user_ids {
75    use crate::prelude::*;
76    use std::ops::Deref;
77
78    #[derive(Debug)]
79    /// A reference to a user identifier
80    pub struct SymmetricIdentifierHandleRef<'a, R: Ratchet> {
81        pub(crate) user: VirtualTargetType,
82        pub(crate) remote: &'a NodeRemote<R>,
83        pub(crate) target_username: Option<String>,
84    }
85
86    impl<R: Ratchet> SymmetricIdentifierHandleRef<'_, R> {
87        pub fn into_owned(self) -> SymmetricIdentifierHandle<R> {
88            SymmetricIdentifierHandle {
89                user: self.user,
90                remote: self.remote.clone(),
91                target_username: self.target_username,
92            }
93        }
94    }
95
96    #[derive(Clone, Debug)]
97    /// A convenience structure for executing commands that depend on a specific registered user
98    pub struct SymmetricIdentifierHandle<R: Ratchet> {
99        user: VirtualTargetType,
100        remote: NodeRemote<R>,
101        target_username: Option<String>,
102    }
103
104    pub trait TargetLockedRemote<R: Ratchet>: Send + Sync {
105        fn user(&self) -> &VirtualTargetType;
106        fn remote(&self) -> &NodeRemote<R>;
107        fn target_username(&self) -> Option<&str>;
108        fn user_mut(&mut self) -> &mut VirtualTargetType;
109        fn session_security_settings(&self) -> Option<&SessionSecuritySettings>;
110    }
111
112    impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandleRef<'_, R> {
113        fn user(&self) -> &VirtualTargetType {
114            &self.user
115        }
116        fn remote(&self) -> &NodeRemote<R> {
117            self.remote
118        }
119        fn target_username(&self) -> Option<&str> {
120            self.target_username.as_deref()
121        }
122        fn user_mut(&mut self) -> &mut VirtualTargetType {
123            &mut self.user
124        }
125
126        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
127            None
128        }
129    }
130
131    impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandle<R> {
132        fn user(&self) -> &VirtualTargetType {
133            &self.user
134        }
135        fn remote(&self) -> &NodeRemote<R> {
136            &self.remote
137        }
138        fn target_username(&self) -> Option<&str> {
139            self.target_username.as_deref()
140        }
141        fn user_mut(&mut self) -> &mut VirtualTargetType {
142            &mut self.user
143        }
144
145        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
146            None
147        }
148    }
149
150    impl<R: Ratchet> From<SymmetricIdentifierHandleRef<'_, R>> for SymmetricIdentifierHandle<R> {
151        fn from(this: SymmetricIdentifierHandleRef<'_, R>) -> Self {
152            this.into_owned()
153        }
154    }
155
156    impl<R: Ratchet> Deref for SymmetricIdentifierHandle<R> {
157        type Target = NodeRemote<R>;
158
159        fn deref(&self) -> &Self::Target {
160            &self.remote
161        }
162    }
163
164    impl<R: Ratchet> Deref for SymmetricIdentifierHandleRef<'_, R> {
165        type Target = NodeRemote<R>;
166
167        fn deref(&self) -> &Self::Target {
168            self.remote
169        }
170    }
171}
172
173/// Contains the elements required to communicate with the adjacent node
174pub struct CitadelClientServerConnection<R: Ratchet> {
175    /// An interface to send ordered, reliable, and encrypted messages
176    pub(crate) channel: Option<PeerChannel<R>>,
177    pub remote: ClientServerRemote<R>,
178    /// Only available if UdpMode was enabled at the beginning of a session
179    pub udp_channel_rx: Option<citadel_io::tokio::sync::oneshot::Receiver<UdpChannel<R>>>,
180    /// Contains the Google auth minted at the central server (if the central server enabled it), as well as any other services enabled by the central server
181    pub services: ServicesObject,
182    pub cid: u64,
183    pub session_security_settings: SessionSecuritySettings,
184}
185
186impl<R: Ratchet> CitadelClientServerConnection<R> {
187    /// Splits the channel into a send and receive half. This will render
188    /// the other fields of this connection object innaccessible
189    ///
190    /// # Panics
191    ///  - If the channel has already been taken
192    pub fn split(self) -> (PeerChannelSendHalf<R>, PeerChannelRecvHalf<R>) {
193        self.channel.expect("Channel already taken").split()
194    }
195
196    pub fn take_channel(&mut self) -> Option<PeerChannel<R>> {
197        self.channel.take()
198    }
199}
200
201impl<R: Ratchet> Deref for CitadelClientServerConnection<R> {
202    type Target = ClientServerRemote<R>;
203
204    fn deref(&self) -> &Self::Target {
205        &self.remote
206    }
207}
208
209impl<R: Ratchet> DerefMut for CitadelClientServerConnection<R> {
210    fn deref_mut(&mut self) -> &mut Self::Target {
211        &mut self.remote
212    }
213}
214
215/// Contains the elements entailed by a successful registration
216pub struct RegisterSuccess {
217    pub cid: u64,
218}
219
220#[async_trait]
221/// Endows the [NodeRemote](NodeRemote) with additional functions
222pub trait ProtocolRemoteExt<R: Ratchet>: Remote<R> {
223    /// Registers with custom settings
224    /// Returns a ticket which is used to uniquely identify the request in the protocol
225    async fn register<
226        T: std::net::ToSocketAddrs + Send,
227        P: Into<String> + Send,
228        V: Into<String> + Send,
229        K: Into<SecBuffer> + Send,
230    >(
231        &self,
232        addr: T,
233        full_name: P,
234        username: V,
235        proposed_password: K,
236        default_security_settings: SessionSecuritySettings,
237        server_password: Option<PreSharedKey>,
238    ) -> Result<RegisterSuccess, NetworkError> {
239        let creds =
240            ProposedCredentials::new_register(full_name, username, proposed_password.into())
241                .await?;
242        let register_request = NodeRequest::RegisterToHypernode(RegisterToHypernode {
243            remote_addr: addr
244                .to_socket_addrs()?
245                .next()
246                .ok_or(NetworkError::InternalError("Invalid socket addr"))?,
247            proposed_credentials: creds,
248            static_security_settings: default_security_settings,
249            session_password: server_password.unwrap_or_default(),
250        });
251
252        let mut subscription = self.send_callback_subscription(register_request).await?;
253        while let Some(status) = subscription.next().await {
254            match map_errors(status)? {
255                NodeResult::RegisterOkay(RegisterOkay { cid, .. }) => {
256                    return Ok(RegisterSuccess { cid });
257                }
258                NodeResult::RegisterFailure(err) => {
259                    return Err(NetworkError::Generic(err.error_message));
260                }
261                NodeResult::Disconnect(err) => {
262                    return Err(NetworkError::Generic(err.message));
263                }
264                evt => {
265                    log::warn!(target: "citadel", "Invalid NodeResult for Register request received: {evt:?}");
266                }
267            }
268        }
269
270        Err(NetworkError::InternalError(
271            "Internal kernel stream died (register)",
272        ))
273    }
274
275    /// Registers using the default settings. The default uses No Google FCM keys and the default session security settings
276    /// Returns a ticket which is used to uniquely identify the request in the protocol
277    async fn register_with_defaults<
278        T: std::net::ToSocketAddrs + Send,
279        P: Into<String> + Send,
280        V: Into<String> + Send,
281        K: Into<SecBuffer> + Send,
282    >(
283        &self,
284        addr: T,
285        full_name: P,
286        username: V,
287        proposed_password: K,
288    ) -> Result<RegisterSuccess, NetworkError> {
289        self.register(
290            addr,
291            full_name,
292            username,
293            proposed_password,
294            Default::default(),
295            Default::default(),
296        )
297        .await
298    }
299
300    /// Connects with custom settings
301    /// Returns a ticket which is used to uniquely identify the request in the protocol
302    async fn connect(
303        &self,
304        auth: AuthenticationRequest,
305        connect_mode: ConnectMode,
306        udp_mode: UdpMode,
307        keep_alive_timeout: Option<Duration>,
308        session_security_settings: SessionSecuritySettings,
309        server_password: Option<PreSharedKey>,
310    ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
311        let connect_request = NodeRequest::ConnectToHypernode(ConnectToHypernode {
312            auth_request: auth,
313            connect_mode,
314            udp_mode,
315            keep_alive_timeout: keep_alive_timeout.map(|r| r.as_secs()),
316            session_security_settings,
317            session_password: server_password.unwrap_or_default(),
318        });
319
320        let mut subscription = self.send_callback_subscription(connect_request).await?;
321        let status = subscription
322            .next()
323            .await
324            .ok_or(NetworkError::InternalError(
325                "Internal kernel stream died (connect)",
326            ))?;
327
328        return match map_errors(status)? {
329            NodeResult::ConnectSuccess(ConnectSuccess {
330                ticket: _,
331                session_cid: cid,
332                remote_addr: _,
333                is_personal: _,
334                v_conn_type,
335                services,
336                welcome_message: _,
337                channel,
338                udp_rx_opt: udp_channel_rx,
339                session_security_settings,
340            }) => Ok(CitadelClientServerConnection {
341                remote: ClientServerRemote::new(
342                    v_conn_type,
343                    self.remote_ref().clone(),
344                    session_security_settings,
345                    None,
346                    None,
347                ),
348                channel: Some(*channel),
349                udp_channel_rx,
350                services,
351                cid,
352                session_security_settings,
353            }),
354            NodeResult::ConnectFail(ConnectFail {
355                ticket: _,
356                cid_opt: _,
357                error_message: err,
358            }) => Err(NetworkError::Generic(err)),
359            NodeResult::Disconnect(err) => {
360                return Err(NetworkError::Generic(err.message));
361            }
362            res => Err(NetworkError::msg(format!(
363                "[connect] An unexpected response occurred: {res:?}"
364            ))),
365        };
366    }
367
368    /// Connects with the default settings
369    /// If FCM keys were created during the registration phase, then those keys will be used for the session. If new FCM keys need to be used, consider using [`Self::connect`]
370    async fn connect_with_defaults(
371        &self,
372        auth: AuthenticationRequest,
373    ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
374        self.connect(
375            auth,
376            Default::default(),
377            Default::default(),
378            None,
379            Default::default(),
380            Default::default(),
381        )
382        .await
383    }
384
385    /// Creates a valid target identifier used to make protocol requests. Raw user IDs or usernames can be used
386    /// ```
387    /// use citadel_sdk::prelude::*;
388    /// # use citadel_sdk::prefabs::client::single_connection::SingleClientServerConnectionKernel;
389    ///
390    /// let server_connection_settings = DefaultServerConnectionSettingsBuilder::credentialed_login("127.0.0.1:25021", "john.doe", "password").build().unwrap();
391    ///
392    /// # SingleClientServerConnectionKernel::new(server_connection_settings, |conn| async move {
393    /// conn.find_target("my_account", "my_peer").await?.send_file("/path/to/file.pdf").await
394    /// // or: conn.find_target(1234, "my_peer").await? [...]
395    /// # });
396    /// ```
397    async fn find_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
398        &self,
399        local_user: T,
400        peer: P,
401    ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
402        let account_manager = self.account_manager();
403        account_manager
404            .find_target_information(local_user, peer)
405            .await?
406            .map(move |(cid, peer)| {
407                if peer.parent_icid != 0 {
408                    SymmetricIdentifierHandleRef {
409                        user: VirtualTargetType::ExternalGroupPeer {
410                            session_cid: cid,
411                            interserver_cid: peer.parent_icid,
412                            peer_cid: peer.cid,
413                        },
414                        remote: self.remote_ref(),
415                        target_username: None,
416                    }
417                } else {
418                    SymmetricIdentifierHandleRef {
419                        user: VirtualTargetType::LocalGroupPeer {
420                            session_cid: cid,
421                            peer_cid: peer.cid,
422                        },
423                        remote: self.remote_ref(),
424                        target_username: None,
425                    }
426                }
427            })
428            .ok_or_else(|| NetworkError::msg("Target pair not found"))
429    }
430
431    /// Creates a proposed target from the valid local user to an unregistered peer in the network. Used when creating registration requests for peers.
432    /// Currently only supports LocalGroup <-> LocalGroup peer connections
433    async fn propose_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
434        &self,
435        local_user: T,
436        peer: P,
437    ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
438        let local_cid = self.get_session_cid(local_user).await?;
439        match peer.into() {
440            UserIdentifier::ID(peer_cid) => Ok(SymmetricIdentifierHandleRef {
441                user: VirtualTargetType::LocalGroupPeer {
442                    session_cid: local_cid,
443                    peer_cid,
444                },
445                remote: self.remote_ref(),
446                target_username: None,
447            }),
448            UserIdentifier::Username(uname) => {
449                let peer_cid = self
450                    .remote_ref()
451                    .account_manager()
452                    .find_target_information(local_cid, uname.clone())
453                    .await?
454                    .map(|r| r.1.cid)
455                    .unwrap_or(0);
456                Ok(SymmetricIdentifierHandleRef {
457                    user: VirtualTargetType::LocalGroupPeer {
458                        session_cid: local_cid,
459                        peer_cid,
460                    },
461                    remote: self.remote_ref(),
462                    target_username: Some(uname),
463                })
464            }
465        }
466    }
467
468    /// Returns a list of local group peers on the network for local_user. May or may not be registered to the user. To get a list of registered users to local_user, run [`Self::get_local_group_mutual_peers`]
469    /// - limit: if None, all peers are obtained. If Some, at most the specified number of peers will be obtained
470    async fn get_local_group_peers<T: Into<UserIdentifier> + Send>(
471        &self,
472        local_user: T,
473        limit: Option<usize>,
474    ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
475        let local_cid = self.get_session_cid(local_user).await?;
476        let command = NodeRequest::PeerCommand(PeerCommand {
477            session_cid: local_cid,
478            command: PeerSignal::GetRegisteredPeers {
479                peer_conn_type: NodeConnectionType::LocalGroupPeerToLocalGroupServer(local_cid),
480                response: None,
481                limit: limit.map(|r| r as i32),
482            },
483        });
484
485        let mut stream = self.send_callback_subscription(command).await?;
486
487        while let Some(status) = stream.next().await {
488            if let NodeResult::PeerEvent(PeerEvent {
489                event:
490                    PeerSignal::GetRegisteredPeers {
491                        peer_conn_type: _,
492                        response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
493                        limit: _,
494                    },
495                ticket: _,
496                ..
497            }) = map_errors(status)?
498            {
499                return Ok(peer_info
500                    .into_iter()
501                    .zip(is_onlines.into_iter())
502                    .filter_map(|(peer_info, is_online)| {
503                        peer_info.map(|info| LocalGroupPeerFullInfo {
504                            cid: info.cid,
505                            username: Some(info.username),
506                            full_name: Some(info.full_name),
507                            is_online,
508                        })
509                    })
510                    .collect());
511            }
512        }
513
514        Err(NetworkError::InternalError(
515            "Internal kernel stream died (get_local_group_peers)",
516        ))
517    }
518
519    /// Returns a list of mutually-registered peers with the local_user
520    async fn get_local_group_mutual_peers<T: Into<UserIdentifier> + Send>(
521        &self,
522        local_user: T,
523    ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
524        let local_cid = self.get_session_cid(local_user).await?;
525        let command = NodeRequest::PeerCommand(PeerCommand {
526            session_cid: local_cid,
527            command: PeerSignal::GetMutuals {
528                v_conn_type: NodeConnectionType::LocalGroupPeerToLocalGroupServer(local_cid),
529                response: None,
530            },
531        });
532
533        let mut stream = self.send_callback_subscription(command).await?;
534
535        while let Some(status) = stream.next().await {
536            if let NodeResult::PeerEvent(PeerEvent {
537                event:
538                    PeerSignal::GetMutuals {
539                        v_conn_type: _,
540                        response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
541                    },
542                ticket: _,
543                ..
544            }) = map_errors(status)?
545            {
546                return Ok(peer_info
547                    .into_iter()
548                    .zip(is_onlines.into_iter())
549                    .filter_map(|(peer_info, is_online)| {
550                        peer_info.map(|info| LocalGroupPeerFullInfo {
551                            cid: info.cid,
552                            username: Some(info.username),
553                            full_name: Some(info.full_name),
554                            is_online,
555                        })
556                    })
557                    .collect());
558            }
559        }
560
561        Err(NetworkError::InternalError(
562            "Internal kernel stream died (get_local_group_mutual_peers)",
563        ))
564    }
565
566    #[doc(hidden)]
567    fn remote_ref(&self) -> &NodeRemote<R>;
568
569    #[doc(hidden)]
570    async fn get_session_cid<T: Into<UserIdentifier> + Send>(
571        &self,
572        local_user: T,
573    ) -> Result<u64, NetworkError> {
574        let account_manager = self.account_manager();
575        Ok(account_manager
576            .find_local_user_information(local_user)
577            .await?
578            .ok_or(NetworkError::InvalidRequest("User does not exist"))?)
579    }
580}
581
582pub fn map_errors<R: Ratchet>(result: NodeResult<R>) -> Result<NodeResult<R>, NetworkError> {
583    match result {
584        NodeResult::ConnectFail(ConnectFail {
585            ticket: _,
586            cid_opt: _,
587            error_message: err,
588        }) => Err(NetworkError::Generic(err)),
589        NodeResult::RegisterFailure(RegisterFailure {
590            ticket: _,
591            error_message: err,
592        }) => Err(NetworkError::Generic(err)),
593        NodeResult::InternalServerError(InternalServerError {
594            ticket_opt: _,
595            cid_opt: _,
596            message: err,
597        }) => Err(NetworkError::Generic(err)),
598        NodeResult::PeerEvent(PeerEvent {
599            event:
600                PeerSignal::SignalError {
601                    ticket: _,
602                    error: err,
603                    peer_connection_type: _,
604                },
605            ticket: _,
606            ..
607        }) => Err(NetworkError::Generic(err)),
608        res => Ok(res),
609    }
610}
611
612impl<R: Ratchet> ProtocolRemoteExt<R> for NodeRemote<R> {
613    fn remote_ref(&self) -> &NodeRemote<R> {
614        self
615    }
616}
617
618impl<R: Ratchet> ProtocolRemoteExt<R> for ClientServerRemote<R> {
619    fn remote_ref(&self) -> &NodeRemote<R> {
620        &self.inner
621    }
622}
623
624#[async_trait]
625/// Some functions require that a target exists
626pub trait ProtocolRemoteTargetExt<R: Ratchet>: TargetLockedRemote<R> {
627    /// Sends a file with a custom size. The smaller the chunks, the higher the degree of scrambling, but the higher the performance cost. A chunk size of zero will use the default
628    async fn send_file_with_custom_opts<T: ObjectSource>(
629        &self,
630        source: T,
631        chunk_size: usize,
632        transfer_type: TransferType,
633    ) -> Result<(), NetworkError> {
634        let chunk_size = if chunk_size == 0 {
635            None
636        } else {
637            Some(chunk_size)
638        };
639        let session_cid = self.user().get_session_cid();
640        let user = *self.user();
641        let remote = self.remote();
642
643        let mut stream = remote
644            .send_callback_subscription(NodeRequest::SendObject(SendObject {
645                source: Box::new(source),
646                chunk_size,
647                session_cid,
648                v_conn_type: user,
649                transfer_type,
650            }))
651            .await?;
652
653        while let Some(event) = stream.next().await {
654            match map_errors(event)? {
655                NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
656                    return handle
657                        .transfer_file()
658                        .await
659                        .map_err(|err| NetworkError::Generic(err.into_string()));
660                }
661
662                NodeResult::PeerEvent(PeerEvent {
663                    event: PeerSignal::SignalReceived { .. },
664                    ..
665                }) => {}
666
667                res => {
668                    log::warn!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {res:?}")
669                }
670            }
671        }
672
673        Err(NetworkError::InternalError("File transfer stream died"))
674    }
675
676    /// Sends a file to the provided target using the default chunking size
677    async fn send_file<T: ObjectSource>(&self, source: T) -> Result<(), NetworkError> {
678        self.send_file_with_custom_opts(source, 0, TransferType::FileTransfer)
679            .await
680    }
681
682    /// Sends a file to the provided target using custom chunking size with local encryption.
683    /// Only this local node may decrypt the information send to the adjacent node.
684    async fn remote_encrypted_virtual_filesystem_push_custom_chunking<
685        T: ObjectSource,
686        P: Into<PathBuf> + Send,
687    >(
688        &self,
689        source: T,
690        virtual_directory: P,
691        chunk_size: usize,
692        security_level: SecurityLevel,
693    ) -> Result<(), NetworkError> {
694        self.can_use_revfs()?;
695        let mut virtual_path = virtual_directory.into();
696        virtual_path = prepare_virtual_path(virtual_path);
697        validate_virtual_path(&virtual_path)
698            .map_err(|err| NetworkError::Generic(err.into_string()))?;
699        let tx_type = TransferType::RemoteEncryptedVirtualFilesystem {
700            virtual_path,
701            security_level,
702        };
703        self.send_file_with_custom_opts(source, chunk_size, tx_type)
704            .await
705    }
706
707    /// Sends a file to the provided target using the default chunking size with local encryption.
708    /// Only this local node may decrypt the information send to the adjacent node.
709    async fn remote_encrypted_virtual_filesystem_push<T: ObjectSource, P: Into<PathBuf> + Send>(
710        &self,
711        source: T,
712        virtual_directory: P,
713        security_level: SecurityLevel,
714    ) -> Result<(), NetworkError> {
715        self.remote_encrypted_virtual_filesystem_push_custom_chunking(
716            source,
717            virtual_directory,
718            0,
719            security_level,
720        )
721        .await
722    }
723
724    /// Pulls a virtual file from the RE-VFS. If `delete_on_pull` is true, then, the virtual file
725    /// will be taken from the RE-VFS
726    async fn remote_encrypted_virtual_filesystem_pull<P: Into<PathBuf> + Send>(
727        &self,
728        virtual_directory: P,
729        transfer_security_level: SecurityLevel,
730        delete_on_pull: bool,
731    ) -> Result<PathBuf, NetworkError> {
732        self.can_use_revfs()?;
733        let request = NodeRequest::PullObject(PullObject {
734            v_conn: *self.user(),
735            virtual_dir: virtual_directory.into(),
736            delete_on_pull,
737            transfer_security_level,
738        });
739
740        let mut stream = self.remote().send_callback_subscription(request).await?;
741
742        while let Some(event) = stream.next().await {
743            match map_errors(event)? {
744                NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
745                    return handle
746                        .receive_file()
747                        .await
748                        .map_err(|err| NetworkError::Generic(err.into_string()));
749                }
750
751                NodeResult::PeerEvent(PeerEvent {
752                    event: PeerSignal::SignalReceived { .. },
753                    ..
754                }) => {}
755
756                res => {
757                    log::error!(target: "citadel", "Invalid NodeResult for REVFS FileTransfer request received: {res:?}");
758                    return Err(NetworkError::InternalError(
759                        "Received invalid response from protocol",
760                    ));
761                }
762            }
763        }
764
765        Err(NetworkError::InternalError(
766            "REVFS File transfer stream died",
767        ))
768    }
769
770    /// Deletes the file from the RE-VFS. If the contents are desired on delete,
771    /// consider calling `Self::remote_encrypted_virtual_filesystem_pull` with the delete
772    /// parameter set to true
773    async fn remote_encrypted_virtual_filesystem_delete<P: Into<PathBuf> + Send>(
774        &self,
775        virtual_directory: P,
776    ) -> Result<(), NetworkError> {
777        self.can_use_revfs()?;
778        let request = NodeRequest::DeleteObject(DeleteObject {
779            v_conn: *self.user(),
780            virtual_dir: virtual_directory.into(),
781            security_level: Default::default(),
782        });
783
784        let mut stream = self.remote().send_callback_subscription(request).await?;
785        while let Some(event) = stream.next().await {
786            match map_errors(event)? {
787                NodeResult::ReVFS(result) => {
788                    return if let Some(error) = result.error_message {
789                        Err(NetworkError::Generic(error))
790                    } else {
791                        Ok(())
792                    }
793                }
794
795                evt => {
796                    log::error!(target: "citadel", "Invalid NodeResult for REVFS Delete request received: {evt:?}");
797                }
798            }
799        }
800
801        Err(NetworkError::InternalError("REVFS Delete stream died"))
802    }
803
804    /// Connects to the peer with custom settings
805    async fn connect_to_peer_custom(
806        &self,
807        session_security_settings: SessionSecuritySettings,
808        udp_mode: UdpMode,
809        peer_session_password: Option<PreSharedKey>,
810    ) -> Result<PeerConnectSuccess<R>, NetworkError> {
811        let session_cid = self.user().get_session_cid();
812        let peer_target = self.try_as_peer_connection().await?;
813
814        let mut stream = self
815            .remote()
816            .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
817                session_cid,
818                command: PeerSignal::PostConnect {
819                    peer_conn_type: peer_target,
820                    ticket_opt: None,
821                    invitee_response: None,
822                    session_security_settings,
823                    udp_mode,
824                    session_password: peer_session_password,
825                },
826            }))
827            .await?;
828
829        while let Some(status) = stream.next().await {
830            match map_errors(status)? {
831                NodeResult::PeerChannelCreated(PeerChannelCreated {
832                    ticket: _,
833                    channel,
834                    udp_rx_opt,
835                }) => {
836                    let username = self.target_username().map(ToString::to_string);
837                    let remote = PeerRemote {
838                        inner: self.remote().clone(),
839                        peer: peer_target.as_virtual_connection(),
840                        username,
841                        session_security_settings,
842                    };
843
844                    return Ok(PeerConnectSuccess {
845                        remote,
846                        channel: *channel,
847                        udp_channel_rx: udp_rx_opt,
848                        incoming_object_transfer_handles: None,
849                    });
850                }
851
852                NodeResult::PeerEvent(PeerEvent {
853                    event:
854                        PeerSignal::PostConnect {
855                            invitee_response, ..
856                        },
857                    ..
858                }) => match invitee_response {
859                    Some(PeerResponse::Timeout) => {
860                        return Err(NetworkError::msg("Peer did not respond in time"))
861                    }
862                    Some(PeerResponse::Decline) => {
863                        return Err(NetworkError::msg("Peer declined to connect"))
864                    }
865                    _ => {}
866                },
867
868                _ => {}
869            }
870        }
871
872        Err(NetworkError::InternalError(
873            "Internal kernel stream died (connect_to_peer_custom)",
874        ))
875    }
876
877    /// Connects to the target peer with default settings
878    async fn connect_to_peer(&self) -> Result<PeerConnectSuccess<R>, NetworkError> {
879        self.connect_to_peer_custom(Default::default(), Default::default(), Default::default())
880            .await
881    }
882
883    /// Posts a registration request to a peer
884    async fn register_to_peer(&self) -> Result<PeerRegisterStatus, NetworkError> {
885        let session_cid = self.user().get_session_cid();
886        let peer_target = self.try_as_peer_connection().await?;
887        // TODO: Get rid of this step. Should be handled by the protocol
888        let local_username = self
889            .remote()
890            .account_manager()
891            .get_username_by_cid(session_cid)
892            .await?
893            .ok_or_else(|| NetworkError::msg("Unable to find username for local user"))?;
894        let peer_username_opt = self.target_username().map(ToString::to_string);
895
896        let mut stream = self
897            .remote()
898            .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
899                session_cid,
900                command: PeerSignal::PostRegister {
901                    peer_conn_type: peer_target,
902                    inviter_username: local_username,
903                    invitee_username: peer_username_opt,
904                    ticket_opt: None,
905                    invitee_response: None,
906                },
907            }))
908            .await?;
909
910        while let Some(status) = stream.next().await {
911            if let NodeResult::PeerEvent(PeerEvent {
912                event:
913                    PeerSignal::PostRegister {
914                        peer_conn_type: _,
915                        inviter_username: _,
916                        invitee_username: _,
917                        ticket_opt: _,
918                        invitee_response: Some(resp),
919                    },
920                ticket: _,
921                ..
922            }) = map_errors(status)?
923            {
924                match resp {
925                    PeerResponse::Accept(..) => return Ok(PeerRegisterStatus::Accepted),
926                    PeerResponse::Decline => return Ok(PeerRegisterStatus::Declined),
927                    PeerResponse::Timeout => return Ok(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) }),
928                    _ => {}
929                }
930            }
931        }
932
933        Err(NetworkError::Generic(format!(
934            "Internal kernel stream died (register_to_peer): {:?}",
935            stream.callback_key()
936        )))
937    }
938
939    /// Deregisters the currently locked target. If the target is a client to server
940    /// connection, deregisters from the server. If the target is a p2p connection,
941    /// deregisters the p2p
942    async fn deregister(&self) -> Result<(), NetworkError> {
943        if let Ok(peer_conn) = self.try_as_peer_connection().await {
944            let peer_request = PeerSignal::Deregister {
945                peer_conn_type: peer_conn,
946            };
947            let session_cid = self.user().get_session_cid();
948            let request = NodeRequest::PeerCommand(PeerCommand {
949                session_cid,
950                command: peer_request,
951            });
952
953            let mut subscription = self.remote().send_callback_subscription(request).await?;
954            while let Some(result) = subscription.next().await {
955                if let NodeResult::PeerEvent(PeerEvent {
956                    event: PeerSignal::DeregistrationSuccess { .. },
957                    ticket: _,
958                    ..
959                }) = map_errors(result)?
960                {
961                    return Ok(());
962                }
963            }
964        } else {
965            // c2s conn
966            let cid = self.user().get_session_cid();
967            let request = NodeRequest::DeregisterFromHypernode(DeregisterFromHypernode {
968                session_cid: cid,
969                v_conn_type: *self.user(),
970            });
971            let mut subscription = self.remote().send_callback_subscription(request).await?;
972            while let Some(result) = subscription.next().await {
973                match map_errors(result)? {
974                    NodeResult::DeRegistration(DeRegistration {
975                        session_cid: _,
976                        ticket_opt: _,
977                        success: true,
978                    }) => return Ok(()),
979                    NodeResult::DeRegistration(DeRegistration {
980                        session_cid: _,
981                        ticket_opt: _,
982                        success: false,
983                    }) => {
984                        return Err(NetworkError::msg(
985                            "Unable to deregister: status=false".to_string(),
986                        ))
987                    }
988
989                    _ => {}
990                }
991            }
992        }
993
994        Err(NetworkError::InternalError("Deregister ended unexpectedly"))
995    }
996
997    async fn disconnect(&self) -> Result<(), NetworkError> {
998        if let Ok(peer_conn) = self.try_as_peer_connection().await {
999            if let PeerConnectionType::LocalGroupPeer {
1000                session_cid,
1001                peer_cid: _,
1002            } = peer_conn
1003            {
1004                let request = NodeRequest::PeerCommand(PeerCommand {
1005                    session_cid,
1006                    command: PeerSignal::Disconnect {
1007                        peer_conn_type: peer_conn,
1008                        disconnect_response: None,
1009                    },
1010                });
1011
1012                let mut subscription = self.remote().send_callback_subscription(request).await?;
1013
1014                while let Some(event) = subscription.next().await {
1015                    if let NodeResult::PeerEvent(PeerEvent {
1016                        event:
1017                            PeerSignal::Disconnect {
1018                                peer_conn_type: _,
1019                                disconnect_response: Some(_),
1020                            },
1021                        ticket: _,
1022                        ..
1023                    }) = map_errors(event)?
1024                    {
1025                        return Ok(());
1026                    }
1027                }
1028
1029                Err(NetworkError::InternalError(
1030                    "Unable to receive valid disconnect event",
1031                ))
1032            } else {
1033                Err(NetworkError::msg(
1034                    "External group peer functionality not enabled",
1035                ))
1036            }
1037        } else {
1038            //c2s conn
1039            let cid = self.user().get_session_cid();
1040            let request =
1041                NodeRequest::DisconnectFromHypernode(DisconnectFromHypernode { session_cid: cid });
1042
1043            let mut subscription = self.remote().send_callback_subscription(request).await?;
1044            while let Some(event) = subscription.next().await {
1045                if let NodeResult::Disconnect(Disconnect {
1046                    success, message, ..
1047                }) = map_errors(event)?
1048                {
1049                    return if success {
1050                        Ok(())
1051                    } else {
1052                        Err(NetworkError::msg(message))
1053                    };
1054                }
1055            }
1056
1057            Err(NetworkError::InternalError(
1058                "Unable to receive valid disconnect event",
1059            ))
1060        }
1061    }
1062
1063    async fn create_group(
1064        &self,
1065        initial_users_to_invite: Option<Vec<UserIdentifier>>,
1066    ) -> Result<GroupChannel, NetworkError> {
1067        let session_cid = self.user().get_session_cid();
1068
1069        let mut initial_users = vec![];
1070        // TODO: allow for custom message group options. For now, don't
1071        let options = MessageGroupOptions::default();
1072        // TODO/NOTE: default is PRIVATE mode, meaning all users in group must be registered to the owner
1073        // in the future, allow for private/public modes by adjusting the below. Initial users should be
1074        // a UserIdentifier
1075        if let Some(initial_users_to_invite) = initial_users_to_invite {
1076            for user in initial_users_to_invite {
1077                initial_users.push(
1078                    self.remote()
1079                        .account_manager()
1080                        .find_target_information(session_cid, user.clone())
1081                        .await
1082                        .map_err(|err| NetworkError::msg(err.into_string()))?
1083                        .ok_or_else(|| {
1084                            NetworkError::msg(format!(
1085                                "Account {user:?} not found for local user {session_cid:?}"
1086                            ))
1087                        })
1088                        .map(|r| r.1.cid)?,
1089                )
1090            }
1091        }
1092
1093        let group_request = GroupBroadcast::Create {
1094            initial_invitees: initial_users,
1095            options,
1096        };
1097        let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1098            session_cid,
1099            command: group_request,
1100        });
1101        let mut subscription = self.remote().send_callback_subscription(request).await?;
1102        while let Some(evt) = subscription.next().await {
1103            if let NodeResult::GroupChannelCreated(GroupChannelCreated {
1104                ticket: _,
1105                channel,
1106                session_cid: _,
1107            }) = evt
1108            {
1109                return Ok(channel);
1110            }
1111        }
1112
1113        Err(NetworkError::InternalError(
1114            "Create_group ended unexpectedly",
1115        ))
1116    }
1117
1118    /// Lists all groups that which the current peer owns
1119    async fn list_owned_groups(&self) -> Result<Vec<MessageGroupKey>, NetworkError> {
1120        let session_cid = self.user().get_session_cid();
1121        let cid_to_check_for = match self.try_as_peer_connection().await {
1122            Ok(res) => res.get_original_target_cid(),
1123            _ => session_cid,
1124        };
1125        let group_request = GroupBroadcast::ListGroupsFor {
1126            cid: cid_to_check_for,
1127        };
1128        let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1129            session_cid,
1130            command: group_request,
1131        });
1132
1133        let mut subscription = self.remote().send_callback_subscription(request).await?;
1134
1135        while let Some(evt) = subscription.next().await {
1136            if let NodeResult::GroupEvent(GroupEvent {
1137                session_cid: _,
1138                ticket: _,
1139                event: GroupBroadcast::ListResponse { groups },
1140            }) = map_errors(evt)?
1141            {
1142                return Ok(groups);
1143            }
1144        }
1145
1146        Err(NetworkError::InternalError(
1147            "List_members ended unexpectedly",
1148        ))
1149    }
1150
1151    /// Begins a re-key, updating the container in the process.
1152    /// Returns the new key matrix version. Does not return the new key version
1153    /// if the rekey fails, or, if a current rekey is already executing
1154    async fn rekey(&self) -> Result<Option<u32>, NetworkError> {
1155        let request = NodeRequest::ReKey(ReKey {
1156            v_conn_type: *self.user(),
1157        });
1158        let mut subscription = self.remote().send_callback_subscription(request).await?;
1159
1160        while let Some(evt) = subscription.next().await {
1161            if let NodeResult::ReKeyResult(result) = evt {
1162                return match result.status {
1163                    ReKeyReturnType::Success { version } => Ok(Some(version)),
1164                    ReKeyReturnType::AlreadyInProgress => Ok(None),
1165                    ReKeyReturnType::Failure { err } => {
1166                        Err(NetworkError::Generic(format!("Rekey failed: {err}")))
1167                    }
1168                };
1169            }
1170        }
1171
1172        Err(NetworkError::InternalError("Rekey ended unexpectedly"))
1173    }
1174
1175    /// Checks if the locked target is registered
1176    async fn is_peer_registered(&self) -> Result<bool, NetworkError> {
1177        let target = self.try_as_peer_connection().await?;
1178        if let PeerConnectionType::LocalGroupPeer {
1179            session_cid: local_cid,
1180            peer_cid,
1181        } = target
1182        {
1183            let peers = self.remote().get_local_group_peers(local_cid, None).await?;
1184            citadel_logging::info!(target: "citadel", "Checking to see if {target} is registered in {peers:?}");
1185            Ok(peers.iter().any(|p| p.cid == peer_cid))
1186        } else {
1187            Err(NetworkError::Generic(
1188                "External group peers are not supported yet".to_string(),
1189            ))
1190        }
1191    }
1192
1193    #[doc(hidden)]
1194    async fn try_as_peer_connection(&self) -> Result<PeerConnectionType, NetworkError> {
1195        let verified_return = |user: &VirtualTargetType| {
1196            user.try_as_peer_connection()
1197                .ok_or(NetworkError::InvalidRequest("Target is not a peer"))
1198        };
1199
1200        if self.user().get_target_cid() == 0 {
1201            // in this case, the user re-used a remote locked to a registration target
1202            // where the username was provided, but the cid was 0 (unknown).
1203            let peer_username = self
1204                .target_username()
1205                .ok_or_else(|| NetworkError::msg("target_cid=0, yet, no username was provided"))?;
1206            let session_cid = self.user().get_session_cid();
1207            let expected_peer_cid = self
1208                .remote()
1209                .account_manager()
1210                .get_persistence_handler()
1211                .get_cid_by_username(peer_username);
1212            // get the peer cid from the account manager (implying the peers are already registered).
1213            // fallback to the mapped cid if the peer is not registered
1214            let peer_cid = self
1215                .remote()
1216                .account_manager()
1217                .find_target_information(session_cid, peer_username)
1218                .await
1219                .map_err(|err| NetworkError::Generic(err.into_string()))?
1220                .map(|r| r.1.cid)
1221                .unwrap_or(expected_peer_cid);
1222
1223            let mut user = *self.user();
1224            user.set_target_cid(peer_cid);
1225            verified_return(&user)
1226        } else {
1227            verified_return(self.user())
1228        }
1229    }
1230
1231    #[doc(hidden)]
1232    fn can_use_revfs(&self) -> Result<(), NetworkError> {
1233        if let Some(sess) = self.session_security_settings() {
1234            if sess.crypto_params.kem_algorithm == KemAlgorithm::Kyber {
1235                Ok(())
1236            } else {
1237                Err(NetworkError::InvalidRequest(
1238                    "RE-VFS can only be used with Kyber KEM",
1239                ))
1240            }
1241        } else {
1242            Err(NetworkError::InvalidRequest(
1243                "RE-VFS cannot be used with this remote type",
1244            ))
1245        }
1246    }
1247}
1248
1249impl<T: TargetLockedRemote<R>, R: Ratchet> ProtocolRemoteTargetExt<R> for T {}
1250
1251pub mod results {
1252    use crate::prefabs::client::peer_connection::FileTransferHandleRx;
1253    use crate::prelude::{PeerChannel, UdpChannel};
1254    use crate::remote_ext::remote_specialization::PeerRemote;
1255    use citadel_io::tokio::sync::oneshot::Receiver;
1256    use citadel_proto::prelude::*;
1257    use std::fmt::Debug;
1258
1259    pub struct PeerConnectSuccess<R: Ratchet> {
1260        pub channel: PeerChannel<R>,
1261        pub udp_channel_rx: Option<Receiver<UdpChannel<R>>>,
1262        pub remote: PeerRemote<R>,
1263        /// Receives incoming file/object transfer requests. The handles must be
1264        /// .accepted() before the file/object transfer is allowed to proceed
1265        pub(crate) incoming_object_transfer_handles: Option<FileTransferHandleRx>,
1266    }
1267
1268    impl<R: Ratchet> Debug for PeerConnectSuccess<R> {
1269        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1270            f.debug_struct("PeerConnectSuccess")
1271                .field("channel", &self.channel)
1272                .field("udp_channel_rx", &self.udp_channel_rx)
1273                .finish()
1274        }
1275    }
1276
1277    impl<R: Ratchet> PeerConnectSuccess<R> {
1278        /// Obtains a receiver which yields incoming file/object transfer handles
1279        pub fn get_incoming_file_transfer_handle(
1280            &mut self,
1281        ) -> Result<FileTransferHandleRx, NetworkError> {
1282            self.incoming_object_transfer_handles
1283                .take()
1284                .ok_or(NetworkError::InternalError(
1285                    "This function has already been called",
1286                ))
1287        }
1288    }
1289
1290    pub enum PeerRegisterStatus {
1291        Accepted,
1292        Declined,
1293        Failed { reason: Option<String> },
1294    }
1295
1296    #[derive(Clone, Debug)]
1297    pub struct LocalGroupPeer {
1298        pub cid: u64,
1299        pub is_online: bool,
1300    }
1301
1302    #[derive(Clone, Debug)]
1303    pub struct LocalGroupPeerFullInfo {
1304        pub cid: u64,
1305        pub username: Option<String>,
1306        pub full_name: Option<String>,
1307        pub is_online: bool,
1308    }
1309}
1310
1311pub mod remote_specialization {
1312    use crate::prelude::*;
1313    use std::ops::{Deref, DerefMut};
1314
1315    #[derive(Debug, Clone)]
1316    pub struct PeerRemote<R: Ratchet> {
1317        pub(crate) inner: NodeRemote<R>,
1318        pub(crate) peer: VirtualTargetType,
1319        pub(crate) username: Option<String>,
1320        pub(crate) session_security_settings: SessionSecuritySettings,
1321    }
1322
1323    impl<R: Ratchet> Deref for PeerRemote<R> {
1324        type Target = NodeRemote<R>;
1325        fn deref(&self) -> &Self::Target {
1326            &self.inner
1327        }
1328    }
1329
1330    impl<R: Ratchet> DerefMut for PeerRemote<R> {
1331        fn deref_mut(&mut self) -> &mut Self::Target {
1332            &mut self.inner
1333        }
1334    }
1335
1336    impl<R: Ratchet> TargetLockedRemote<R> for PeerRemote<R> {
1337        fn user(&self) -> &VirtualTargetType {
1338            &self.peer
1339        }
1340        fn remote(&self) -> &NodeRemote<R> {
1341            &self.inner
1342        }
1343        fn target_username(&self) -> Option<&str> {
1344            self.username.as_deref()
1345        }
1346        fn user_mut(&mut self) -> &mut VirtualTargetType {
1347            &mut self.peer
1348        }
1349
1350        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
1351            Some(&self.session_security_settings)
1352        }
1353    }
1354}
1355
1356#[cfg(test)]
1357mod tests {
1358    use crate::prefabs::client::single_connection::SingleClientServerConnectionKernel;
1359    use crate::prefabs::client::DefaultServerConnectionSettingsBuilder;
1360    use crate::prelude::*;
1361    use citadel_io::tokio;
1362    use rstest::rstest;
1363    use std::net::SocketAddr;
1364    use std::sync::atomic::{AtomicBool, Ordering};
1365    use std::sync::Arc;
1366    use uuid::Uuid;
1367
1368    pub struct ReceiverFileTransferKernel<R: Ratchet>(
1369        pub Option<NodeRemote<R>>,
1370        pub Arc<AtomicBool>,
1371    );
1372
1373    #[async_trait]
1374    impl<R: Ratchet> NetKernel<R> for ReceiverFileTransferKernel<R> {
1375        fn load_remote(&mut self, node_remote: NodeRemote<R>) -> Result<(), NetworkError> {
1376            self.0 = Some(node_remote);
1377            Ok(())
1378        }
1379
1380        async fn on_start(&self) -> Result<(), NetworkError> {
1381            Ok(())
1382        }
1383
1384        async fn on_node_event_received(&self, message: NodeResult<R>) -> Result<(), NetworkError> {
1385            log::trace!(target: "citadel", "SERVER received {:?}", message);
1386            if let NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) =
1387                map_errors(message)?
1388            {
1389                let mut path = None;
1390                // accept the transfer
1391                handle
1392                    .accept()
1393                    .map_err(|err| NetworkError::msg(err.into_string()))?;
1394
1395                use citadel_types::proto::ObjectTransferStatus;
1396                use futures::StreamExt;
1397                while let Some(status) = handle.next().await {
1398                    match status {
1399                        ObjectTransferStatus::ReceptionComplete => {
1400                            log::trace!(target: "citadel", "Server has finished receiving the file!");
1401                            let cmp = include_bytes!("../../resources/TheBridge.pdf");
1402                            let streamed_data = citadel_io::tokio::fs::read(path.clone().unwrap())
1403                                .await
1404                                .unwrap();
1405                            assert_eq!(
1406                                cmp,
1407                                streamed_data.as_slice(),
1408                                "Original data and streamed data does not match"
1409                            );
1410
1411                            self.1.store(true, Ordering::Relaxed);
1412                            self.0.clone().unwrap().shutdown().await?;
1413                        }
1414
1415                        ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => {
1416                            path = Some(file_path);
1417                            assert_eq!(vfm.name, "TheBridge.pdf")
1418                        }
1419
1420                        _ => {}
1421                    }
1422                }
1423            }
1424
1425            Ok(())
1426        }
1427
1428        async fn on_stop(&mut self) -> Result<(), NetworkError> {
1429            Ok(())
1430        }
1431    }
1432
1433    pub fn server_info<'a, R: Ratchet>(
1434        switch: Arc<AtomicBool>,
1435    ) -> (NodeFuture<'a, ReceiverFileTransferKernel<R>>, SocketAddr) {
1436        crate::test_common::server_test_node(ReceiverFileTransferKernel(None, switch), |_| {})
1437    }
1438
1439    #[rstest]
1440    #[case(
1441        EncryptionAlgorithm::AES_GCM_256,
1442        KemAlgorithm::Kyber,
1443        SigAlgorithm::None
1444    )]
1445    #[case(
1446        EncryptionAlgorithm::KyberHybrid,
1447        KemAlgorithm::Kyber,
1448        SigAlgorithm::Dilithium65
1449    )]
1450    #[timeout(std::time::Duration::from_secs(90))]
1451    #[tokio::test]
1452    async fn test_c2s_file_transfer(
1453        #[case] enx: EncryptionAlgorithm,
1454        #[case] kem: KemAlgorithm,
1455        #[case] sig: SigAlgorithm,
1456    ) {
1457        citadel_logging::setup_log();
1458        let client_success = &AtomicBool::new(false);
1459        let server_success = &Arc::new(AtomicBool::new(false));
1460        let (server, server_addr) = server_info::<StackedRatchet>(server_success.clone());
1461        let uuid = Uuid::new_v4();
1462
1463        let session_security_settings = SessionSecuritySettingsBuilder::default()
1464            .with_crypto_params(enx + kem + sig)
1465            .build()
1466            .unwrap();
1467
1468        let server_connection_settings =
1469            DefaultServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid)
1470                .with_session_security_settings(session_security_settings)
1471                .disable_udp()
1472                .build()
1473                .unwrap();
1474
1475        let client_kernel = SingleClientServerConnectionKernel::new(
1476            server_connection_settings,
1477            |connection| async move {
1478                log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***");
1479                connection
1480                    .send_file_with_custom_opts(
1481                        "../resources/TheBridge.pdf",
1482                        32 * 1024,
1483                        TransferType::FileTransfer,
1484                    )
1485                    .await
1486                    .unwrap();
1487                log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***");
1488                client_success.store(true, Ordering::Relaxed);
1489                connection.shutdown_kernel().await
1490            },
1491        );
1492
1493        let client = DefaultNodeBuilder::default().build(client_kernel).unwrap();
1494
1495        let joined = futures::future::try_join(server, client);
1496
1497        let _ = joined.await.unwrap();
1498
1499        assert!(client_success.load(Ordering::Relaxed));
1500        assert!(server_success.load(Ordering::Relaxed));
1501    }
1502}