Skip to main content

citadel_sdk/
remote_ext.rs

1//! Remote Protocol Extensions
2//!
3//! This module extends the core NodeRemote functionality with high-level operations
4//! for managing connections, file transfers, and peer interactions in the Citadel
5//! Protocol network.
6//!
7//! # Features
8//! - User registration and authentication
9//! - Connection management
10//! - File transfer operations
11//! - Virtual filesystem support
12//! - Peer discovery and management
13//! - Group communication
14//! - Security settings configuration
15//!
16//! # Example
17//! ```rust
18//! use citadel_sdk::prelude::*;
19//!
20//! async fn example<R: Ratchet>(remote: NodeRemote<R>) -> Result<(), NetworkError> {
21//!     // Register a new user
22//!     let reg = remote.register_with_defaults(
23//!         "127.0.0.1:25021",
24//!         "John Doe",
25//!         "john.doe",
26//!         "password123"
27//!     ).await?;
28//!
29//!     // Connect to a peer
30//!     let auth = AuthenticationRequest::credentialed("john.doe", "password123");
31//!     let conn = remote.connect_with_defaults(auth).await?;
32//!
33//!     // Send a file to a peer
34//!     remote.find_target("john.doe", "peer.name")
35//!         .await?
36//!         .send_file("/path/to/file.txt")
37//!         .await?;
38//!
39//!     Ok(())
40//! }
41//! ```
42//!
43//! # Important Notes
44//! - All operations are asynchronous
45//! - Connections are automatically managed
46//! - File transfers support chunking
47//! - Virtual filesystem is encrypted
48//! - Peer connections require mutual registration
49//!
50//! # Related Components
51//! - [`NodeRemote`]: Core remote interface
52//! - [`ClientServerRemote`]: Client-server communication
53//! - [`PeerRemote`]: Peer-to-peer communication
54//! - [`CitadelClientServerConnection`]: Connection management
55//! - [`RegisterSuccess`]: Registration handling
56//!
57//! [`NodeRemote`]: crate::prelude::NodeRemote
58//! [`ClientServerRemote`]: crate::prelude::ClientServerRemote
59//! [`PeerRemote`]: crate::prelude::PeerRemote
60//! [`CitadelClientServerConnection`]: crate::prelude::CitadelClientServerConnection
61//! [`RegisterSuccess`]: crate::prelude::RegisterSuccess
62
63use crate::prefabs::ClientServerRemote;
64use crate::prelude::results::{PeerConnectSuccess, PeerRegisterStatus};
65use crate::prelude::*;
66use crate::remote_ext::remote_specialization::PeerRemote;
67use crate::remote_ext::results::LocalGroupPeerFullInfo;
68use std::ops::{Deref, DerefMut};
69
70use futures::StreamExt;
71use std::path::PathBuf;
72use std::time::Duration;
73
74pub(crate) mod user_ids {
75    use crate::prelude::*;
76    use std::ops::Deref;
77
78    #[derive(Debug)]
79    /// A reference to a user identifier
80    pub struct SymmetricIdentifierHandleRef<'a, R: Ratchet> {
81        pub(crate) user: VirtualTargetType,
82        pub(crate) remote: &'a NodeRemote<R>,
83        pub(crate) target_username: Option<String>,
84    }
85
86    impl<R: Ratchet> SymmetricIdentifierHandleRef<'_, R> {
87        pub fn into_owned(self) -> SymmetricIdentifierHandle<R> {
88            SymmetricIdentifierHandle {
89                user: self.user,
90                remote: self.remote.clone(),
91                target_username: self.target_username,
92            }
93        }
94    }
95
96    #[derive(Clone, Debug)]
97    /// A convenience structure for executing commands that depend on a specific registered user
98    pub struct SymmetricIdentifierHandle<R: Ratchet> {
99        user: VirtualTargetType,
100        remote: NodeRemote<R>,
101        target_username: Option<String>,
102    }
103
104    pub trait TargetLockedRemote<R: Ratchet>: Send + Sync {
105        fn user(&self) -> &VirtualTargetType;
106        fn remote(&self) -> &NodeRemote<R>;
107        fn target_username(&self) -> Option<&str>;
108        fn user_mut(&mut self) -> &mut VirtualTargetType;
109        fn session_security_settings(&self) -> Option<&SessionSecuritySettings>;
110    }
111
112    impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandleRef<'_, R> {
113        fn user(&self) -> &VirtualTargetType {
114            &self.user
115        }
116        fn remote(&self) -> &NodeRemote<R> {
117            self.remote
118        }
119        fn target_username(&self) -> Option<&str> {
120            self.target_username.as_deref()
121        }
122        fn user_mut(&mut self) -> &mut VirtualTargetType {
123            &mut self.user
124        }
125
126        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
127            None
128        }
129    }
130
131    impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandle<R> {
132        fn user(&self) -> &VirtualTargetType {
133            &self.user
134        }
135        fn remote(&self) -> &NodeRemote<R> {
136            &self.remote
137        }
138        fn target_username(&self) -> Option<&str> {
139            self.target_username.as_deref()
140        }
141        fn user_mut(&mut self) -> &mut VirtualTargetType {
142            &mut self.user
143        }
144
145        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
146            None
147        }
148    }
149
150    impl<R: Ratchet> From<SymmetricIdentifierHandleRef<'_, R>> for SymmetricIdentifierHandle<R> {
151        fn from(this: SymmetricIdentifierHandleRef<'_, R>) -> Self {
152            this.into_owned()
153        }
154    }
155
156    impl<R: Ratchet> Deref for SymmetricIdentifierHandle<R> {
157        type Target = NodeRemote<R>;
158
159        fn deref(&self) -> &Self::Target {
160            &self.remote
161        }
162    }
163
164    impl<R: Ratchet> Deref for SymmetricIdentifierHandleRef<'_, R> {
165        type Target = NodeRemote<R>;
166
167        fn deref(&self) -> &Self::Target {
168            self.remote
169        }
170    }
171}
172
173/// Contains the elements required to communicate with the adjacent node
174pub struct CitadelClientServerConnection<R: Ratchet> {
175    /// An interface to send ordered, reliable, and encrypted messages
176    pub(crate) channel: Option<PeerChannel<R>>,
177    pub remote: ClientServerRemote<R>,
178    /// Only available if UdpMode was enabled at the beginning of a session
179    pub udp_channel_rx: Option<citadel_io::tokio::sync::oneshot::Receiver<UdpChannel<R>>>,
180    /// Contains the Google auth minted at the central server (if the central server enabled it), as well as any other services enabled by the central server
181    pub services: ServicesObject,
182    pub cid: u64,
183    pub session_security_settings: SessionSecuritySettings,
184}
185
186impl<R: Ratchet> CitadelClientServerConnection<R> {
187    /// Splits the channel into a send and receive half. This will render
188    /// the other fields of this connection object innaccessible
189    ///
190    /// # Panics
191    ///  - If the channel has already been taken
192    pub fn split(self) -> (PeerChannelSendHalf<R>, PeerChannelRecvHalf<R>) {
193        self.channel.expect("Channel already taken").split()
194    }
195
196    pub fn take_channel(&mut self) -> Option<PeerChannel<R>> {
197        self.channel.take()
198    }
199}
200
201impl<R: Ratchet> Deref for CitadelClientServerConnection<R> {
202    type Target = ClientServerRemote<R>;
203
204    fn deref(&self) -> &Self::Target {
205        &self.remote
206    }
207}
208
209impl<R: Ratchet> DerefMut for CitadelClientServerConnection<R> {
210    fn deref_mut(&mut self) -> &mut Self::Target {
211        &mut self.remote
212    }
213}
214
215/// Contains the elements entailed by a successful registration
216pub struct RegisterSuccess {
217    pub cid: u64,
218}
219
220#[async_trait]
221/// Endows the [NodeRemote](NodeRemote) with additional functions
222pub trait ProtocolRemoteExt<R: Ratchet>: Remote<R> {
223    /// Registers with custom settings
224    /// Returns a ticket which is used to uniquely identify the request in the protocol
225    async fn register<
226        T: std::net::ToSocketAddrs + Send,
227        P: Into<String> + Send,
228        V: Into<String> + Send,
229        K: Into<SecBuffer> + Send,
230    >(
231        &self,
232        addr: T,
233        full_name: P,
234        username: V,
235        proposed_password: K,
236        default_security_settings: SessionSecuritySettings,
237        server_password: Option<PreSharedKey>,
238    ) -> Result<RegisterSuccess, NetworkError> {
239        let creds =
240            ProposedCredentials::new_register(full_name, username, proposed_password.into())
241                .await?;
242        let register_request = NodeRequest::RegisterToHypernode(RegisterToHypernode {
243            remote_addr: addr.to_socket_addrs()?.next().ok_or(citadel_io::error!(
244                citadel_io::ErrorCode::RemoteInvalidSocketAddr
245            ))?,
246            proposed_credentials: creds,
247            static_security_settings: default_security_settings,
248            session_password: server_password.unwrap_or_default(),
249        });
250
251        let mut subscription = self.send_callback_subscription(register_request).await?;
252        while let Some(status) = subscription.next().await {
253            match status.into_result()? {
254                NodeResult::RegisterOkay(RegisterOkay { cid, .. }) => {
255                    return Ok(RegisterSuccess { cid });
256                }
257                NodeResult::RegisterFailure(err) => {
258                    return Err(citadel_io::error!(
259                        citadel_io::ErrorCode::RemoteRegisterFailure,
260                        err.error_message
261                    ));
262                }
263                NodeResult::Disconnect(err) => {
264                    return Err(citadel_io::error!(
265                        citadel_io::ErrorCode::RemoteDisconnected,
266                        err.message
267                    ));
268                }
269                evt => {
270                    log::warn!(target: "citadel", "Invalid NodeResult for Register request received: {evt:?}");
271                }
272            }
273        }
274
275        Err(citadel_io::error!(
276            citadel_io::ErrorCode::RemoteKernelStreamDied,
277            "register"
278        ))
279    }
280
281    /// Registers using the default settings. The default uses No Google FCM keys and the default session security settings
282    /// Returns a ticket which is used to uniquely identify the request in the protocol
283    async fn register_with_defaults<
284        T: std::net::ToSocketAddrs + Send,
285        P: Into<String> + Send,
286        V: Into<String> + Send,
287        K: Into<SecBuffer> + Send,
288    >(
289        &self,
290        addr: T,
291        full_name: P,
292        username: V,
293        proposed_password: K,
294    ) -> Result<RegisterSuccess, NetworkError> {
295        self.register(
296            addr,
297            full_name,
298            username,
299            proposed_password,
300            Default::default(),
301            Default::default(),
302        )
303        .await
304    }
305
306    /// Connects with custom settings
307    /// Returns a ticket which is used to uniquely identify the request in the protocol
308    async fn connect(
309        &self,
310        auth: AuthenticationRequest,
311        connect_mode: ConnectMode,
312        udp_mode: UdpMode,
313        keep_alive_timeout: Option<Duration>,
314        session_security_settings: SessionSecuritySettings,
315        server_password: Option<PreSharedKey>,
316    ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
317        let connect_request = NodeRequest::ConnectToHypernode(ConnectToHypernode {
318            auth_request: auth,
319            connect_mode,
320            udp_mode,
321            keep_alive_timeout: keep_alive_timeout.map(|r| r.as_secs()),
322            session_security_settings,
323            session_password: server_password.unwrap_or_default(),
324        });
325
326        let mut subscription = self.send_callback_subscription(connect_request).await?;
327        let status = subscription.next().await.ok_or(citadel_io::error!(
328            citadel_io::ErrorCode::RemoteKernelStreamDied,
329            "connect"
330        ))?;
331
332        return match status.into_result()? {
333            NodeResult::ConnectSuccess(ConnectSuccess {
334                ticket: _,
335                session_cid: cid,
336                remote_addr: _,
337                is_personal: _,
338                v_conn_type,
339                services,
340                welcome_message: _,
341                channel,
342                udp_rx_opt: udp_channel_rx,
343                session_security_settings,
344            }) => Ok(CitadelClientServerConnection {
345                remote: ClientServerRemote::new(
346                    v_conn_type,
347                    self.remote_ref().clone(),
348                    session_security_settings,
349                    None,
350                    None,
351                ),
352                channel: Some(*channel),
353                udp_channel_rx,
354                services,
355                cid,
356                session_security_settings,
357            }),
358            NodeResult::ConnectFail(ConnectFail {
359                ticket: _,
360                cid_opt: _,
361                error_message: err,
362            }) => Err(citadel_io::error!(
363                citadel_io::ErrorCode::RemoteConnectFailed,
364                err
365            )),
366            NodeResult::Disconnect(err) => {
367                return Err(citadel_io::error!(
368                    citadel_io::ErrorCode::RemoteDisconnected,
369                    err.message
370                ));
371            }
372            res => Err(citadel_io::error!(
373                citadel_io::ErrorCode::RemoteConnectUnexpectedResponse,
374                citadel_io::Dbg(res)
375            )),
376        };
377    }
378
379    /// Connects with the default settings
380    /// If FCM keys were created during the registration phase, then those keys will be used for the session. If new FCM keys need to be used, consider using [`Self::connect`]
381    async fn connect_with_defaults(
382        &self,
383        auth: AuthenticationRequest,
384    ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
385        self.connect(
386            auth,
387            Default::default(),
388            Default::default(),
389            None,
390            Default::default(),
391            Default::default(),
392        )
393        .await
394    }
395
396    /// Creates a valid target identifier used to make protocol requests. Raw user IDs or usernames can be used
397    /// ```
398    /// use citadel_sdk::prelude::*;
399    /// # use citadel_sdk::prefabs::client::single_connection::SingleClientServerConnectionKernel;
400    ///
401    /// let server_connection_settings = DefaultServerConnectionSettingsBuilder::credentialed_login("127.0.0.1:25021", "john.doe", "password").build().unwrap();
402    ///
403    /// # SingleClientServerConnectionKernel::new(server_connection_settings, |conn| async move {
404    /// conn.find_target("my_account", "my_peer").await?.send_file("/path/to/file.pdf").await
405    /// // or: conn.find_target(1234, "my_peer").await? [...]
406    /// # });
407    /// ```
408    async fn find_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
409        &self,
410        local_user: T,
411        peer: P,
412    ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
413        let account_manager = self.account_manager();
414        account_manager
415            .find_target_information(local_user, peer)
416            .await?
417            .map(move |(cid, peer)| {
418                if peer.parent_icid != 0 {
419                    SymmetricIdentifierHandleRef {
420                        user: VirtualTargetType::ExternalGroupPeer {
421                            session_cid: cid,
422                            interserver_cid: peer.parent_icid,
423                            peer_cid: peer.cid,
424                        },
425                        remote: self.remote_ref(),
426                        target_username: None,
427                    }
428                } else {
429                    SymmetricIdentifierHandleRef {
430                        user: VirtualTargetType::LocalGroupPeer {
431                            session_cid: cid,
432                            peer_cid: peer.cid,
433                        },
434                        remote: self.remote_ref(),
435                        target_username: None,
436                    }
437                }
438            })
439            .ok_or_else(|| citadel_io::error!(citadel_io::ErrorCode::RemoteTargetPairNotFound))
440    }
441
442    /// Creates a proposed target from the valid local user to an unregistered peer in the network. Used when creating registration requests for peers.
443    /// Currently only supports LocalGroup <-> LocalGroup peer connections
444    async fn propose_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
445        &self,
446        local_user: T,
447        peer: P,
448    ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
449        let local_cid = self.get_session_cid(local_user).await?;
450        match peer.into() {
451            UserIdentifier::ID(peer_cid) => Ok(SymmetricIdentifierHandleRef {
452                user: VirtualTargetType::LocalGroupPeer {
453                    session_cid: local_cid,
454                    peer_cid,
455                },
456                remote: self.remote_ref(),
457                target_username: None,
458            }),
459            UserIdentifier::Username(uname) => {
460                let peer_cid = self
461                    .remote_ref()
462                    .account_manager()
463                    .find_target_information(local_cid, uname.clone())
464                    .await?
465                    .map(|r| r.1.cid)
466                    .unwrap_or(0);
467                Ok(SymmetricIdentifierHandleRef {
468                    user: VirtualTargetType::LocalGroupPeer {
469                        session_cid: local_cid,
470                        peer_cid,
471                    },
472                    remote: self.remote_ref(),
473                    target_username: Some(uname),
474                })
475            }
476        }
477    }
478
479    /// Returns a list of local group peers on the network for local_user. May or may not be registered to the user. To get a list of registered users to local_user, run [`Self::get_local_group_mutual_peers`]
480    /// - limit: if None, all peers are obtained. If Some, at most the specified number of peers will be obtained
481    async fn get_local_group_peers<T: Into<UserIdentifier> + Send>(
482        &self,
483        local_user: T,
484        limit: Option<usize>,
485    ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
486        let local_cid = self.get_session_cid(local_user).await?;
487        let command = NodeRequest::PeerCommand(PeerCommand {
488            session_cid: local_cid,
489            command: PeerSignal::GetRegisteredPeers {
490                peer_conn_type: ClientConnectionType::Server {
491                    session_cid: local_cid,
492                },
493                response: None,
494                limit: limit.map(|r| r as i32),
495            },
496        });
497
498        let mut stream = self.send_callback_subscription(command).await?;
499
500        while let Some(status) = stream.next().await {
501            if let NodeResult::PeerEvent(PeerEvent {
502                event:
503                    PeerSignal::GetRegisteredPeers {
504                        peer_conn_type: _,
505                        response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
506                        limit: _,
507                    },
508                ticket: _,
509                ..
510            }) = status.into_result()?
511            {
512                return Ok(peer_info
513                    .into_iter()
514                    .zip(is_onlines)
515                    .filter_map(|(peer_info, is_online)| {
516                        peer_info.map(|info| LocalGroupPeerFullInfo {
517                            cid: info.cid,
518                            username: Some(info.username),
519                            full_name: Some(info.full_name),
520                            is_online,
521                        })
522                    })
523                    .collect());
524            }
525        }
526
527        Err(citadel_io::error!(
528            citadel_io::ErrorCode::RemoteKernelStreamDied,
529            "get_local_group_peers"
530        ))
531    }
532
533    /// Returns a list of mutually-registered peers with the local_user
534    async fn get_local_group_mutual_peers<T: Into<UserIdentifier> + Send>(
535        &self,
536        local_user: T,
537    ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
538        let local_cid = self.get_session_cid(local_user).await?;
539        let command = NodeRequest::PeerCommand(PeerCommand {
540            session_cid: local_cid,
541            command: PeerSignal::GetMutuals {
542                v_conn_type: ClientConnectionType::Server {
543                    session_cid: local_cid,
544                },
545                response: None,
546            },
547        });
548
549        let mut stream = self.send_callback_subscription(command).await?;
550
551        while let Some(status) = stream.next().await {
552            if let NodeResult::PeerEvent(PeerEvent {
553                event:
554                    PeerSignal::GetMutuals {
555                        v_conn_type: _,
556                        response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
557                    },
558                ticket: _,
559                ..
560            }) = status.into_result()?
561            {
562                return Ok(peer_info
563                    .into_iter()
564                    .zip(is_onlines)
565                    .filter_map(|(peer_info, is_online)| {
566                        peer_info.map(|info| LocalGroupPeerFullInfo {
567                            cid: info.cid,
568                            username: Some(info.username),
569                            full_name: Some(info.full_name),
570                            is_online,
571                        })
572                    })
573                    .collect());
574            }
575        }
576
577        Err(citadel_io::error!(
578            citadel_io::ErrorCode::RemoteSessionStreamDied
579        ))
580    }
581
582    /// Returns all the active sessions in the protocol, including all P2P connections hierarchically placed as children to C2S
583    /// connections
584    async fn sessions(&self) -> Result<ActiveSessions, NetworkError> {
585        match self
586            .send_callback_subscription(NodeRequest::GetActiveSessions)
587            .await
588        {
589            Ok(mut stream) => {
590                while let Some(result) = stream.next().await {
591                    match result.into_result()? {
592                        NodeResult::SessionList(res) => return Ok(res.sessions),
593                        res => {
594                            citadel_logging::warn!("Received unexpected result: {res:?}");
595                        }
596                    }
597                }
598
599                citadel_logging::warn!("Failed to receive response from SDK (stream died)");
600                return Err(citadel_io::error!(
601                    citadel_io::ErrorCode::RemoteKernelStreamDied,
602                    "get_active_sessions"
603                ));
604            }
605            Err(e) => {
606                citadel_logging::warn!(target: "citadel", "Failed to query SDK sessions: {e}");
607            }
608        }
609
610        Err(citadel_io::error!(
611            citadel_io::ErrorCode::RemoteQuerySessionsFailed
612        ))
613    }
614
615    #[doc(hidden)]
616    fn remote_ref(&self) -> &NodeRemote<R>;
617
618    #[doc(hidden)]
619    async fn get_session_cid<T: Into<UserIdentifier> + Send>(
620        &self,
621        local_user: T,
622    ) -> Result<u64, NetworkError> {
623        let account_manager = self.account_manager();
624        Ok(account_manager
625            .find_local_user_information(local_user)
626            .await?
627            .ok_or(citadel_io::error!(
628                citadel_io::ErrorCode::RemoteUserDoesNotExist
629            ))?)
630    }
631}
632
633impl<R: Ratchet> ProtocolRemoteExt<R> for NodeRemote<R> {
634    fn remote_ref(&self) -> &NodeRemote<R> {
635        self
636    }
637}
638
639impl<R: Ratchet> ProtocolRemoteExt<R> for ClientServerRemote<R> {
640    fn remote_ref(&self) -> &NodeRemote<R> {
641        &self.inner
642    }
643}
644
645#[async_trait]
646/// Some functions require that a target exists
647pub trait ProtocolRemoteTargetExt<R: Ratchet>: TargetLockedRemote<R> {
648    /// Sends a file with a custom size. The smaller the chunks, the higher the degree of scrambling, but the higher the performance cost. A chunk size of zero will use the default
649    async fn send_file_with_custom_opts<T: ObjectSource>(
650        &self,
651        source: T,
652        chunk_size: usize,
653        transfer_type: TransferType,
654    ) -> Result<(), NetworkError> {
655        let chunk_size = if chunk_size == 0 {
656            None
657        } else {
658            Some(chunk_size)
659        };
660        let session_cid = self.user().get_session_cid();
661        let user = *self.user();
662        let remote = self.remote();
663
664        let mut stream = remote
665            .send_callback_subscription(NodeRequest::SendObject(SendObject {
666                source: Box::new(source),
667                chunk_size,
668                session_cid,
669                v_conn_type: user,
670                transfer_type,
671            }))
672            .await?;
673
674        while let Some(event) = stream.next().await {
675            match event.into_result()? {
676                NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
677                    return handle.transfer_file().await.map_err(|err| {
678                        citadel_io::error!(
679                            citadel_io::ErrorCode::RemoteFileTransferFailed,
680                            err.into_string()
681                        )
682                    });
683                }
684
685                NodeResult::PeerEvent(PeerEvent {
686                    event: PeerSignal::SignalReceived { .. },
687                    ..
688                }) => {}
689
690                res => {
691                    log::warn!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {res:?}")
692                }
693            }
694        }
695
696        Err(citadel_io::error!(
697            citadel_io::ErrorCode::RemoteFileTransferStreamDied
698        ))
699    }
700
701    /// Sends a file to the provided target using the default chunking size
702    async fn send_file<T: ObjectSource>(&self, source: T) -> Result<(), NetworkError> {
703        self.send_file_with_custom_opts(source, 0, TransferType::FileTransfer)
704            .await
705    }
706
707    /// Sends a file to the provided target using custom chunking size with local encryption.
708    /// Only this local node may decrypt the information send to the adjacent node.
709    async fn remote_encrypted_virtual_filesystem_push_custom_chunking<
710        T: ObjectSource,
711        P: Into<PathBuf> + Send,
712    >(
713        &self,
714        source: T,
715        virtual_directory: P,
716        chunk_size: usize,
717        security_level: SecurityLevel,
718    ) -> Result<(), NetworkError> {
719        self.can_use_revfs()?;
720        let mut virtual_path = virtual_directory.into();
721        virtual_path = prepare_virtual_path(virtual_path);
722        validate_virtual_path(&virtual_path).map_err(|err| {
723            citadel_io::error!(
724                citadel_io::ErrorCode::RemoteRevfsInvalidVirtualPath,
725                err.into_string()
726            )
727        })?;
728        let tx_type = TransferType::RemoteEncryptedVirtualFilesystem {
729            virtual_path,
730            security_level,
731        };
732        self.send_file_with_custom_opts(source, chunk_size, tx_type)
733            .await
734    }
735
736    /// Sends a file to the provided target using the default chunking size with local encryption.
737    /// Only this local node may decrypt the information send to the adjacent node.
738    async fn remote_encrypted_virtual_filesystem_push<T: ObjectSource, P: Into<PathBuf> + Send>(
739        &self,
740        source: T,
741        virtual_directory: P,
742        security_level: SecurityLevel,
743    ) -> Result<(), NetworkError> {
744        self.remote_encrypted_virtual_filesystem_push_custom_chunking(
745            source,
746            virtual_directory,
747            0,
748            security_level,
749        )
750        .await
751    }
752
753    /// Pulls a virtual file from the RE-VFS. If `delete_on_pull` is true, then, the virtual file
754    /// will be taken from the RE-VFS
755    async fn remote_encrypted_virtual_filesystem_pull<P: Into<PathBuf> + Send>(
756        &self,
757        virtual_directory: P,
758        transfer_security_level: SecurityLevel,
759        delete_on_pull: bool,
760    ) -> Result<PathBuf, NetworkError> {
761        self.can_use_revfs()?;
762        let request = NodeRequest::PullObject(PullObject {
763            v_conn: *self.user(),
764            virtual_dir: virtual_directory.into(),
765            delete_on_pull,
766            transfer_security_level,
767        });
768
769        let mut stream = self.remote().send_callback_subscription(request).await?;
770
771        while let Some(event) = stream.next().await {
772            match event.into_result()? {
773                NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
774                    return handle.receive_file().await.map_err(|err| {
775                        citadel_io::error!(
776                            citadel_io::ErrorCode::RemoteFileTransferFailed,
777                            err.into_string()
778                        )
779                    });
780                }
781
782                NodeResult::PeerEvent(PeerEvent {
783                    event: PeerSignal::SignalReceived { .. },
784                    ..
785                }) => {}
786
787                res => {
788                    log::error!(target: "citadel", "Invalid NodeResult for REVFS FileTransfer request received: {res:?}");
789                    return Err(citadel_io::error!(
790                        citadel_io::ErrorCode::RemoteRevfsInvalidResponse
791                    ));
792                }
793            }
794        }
795
796        Err(citadel_io::error!(
797            citadel_io::ErrorCode::RemoteRevfsFileTransferStreamDied
798        ))
799    }
800
801    /// Deletes the file from the RE-VFS. If the contents are desired on delete,
802    /// consider calling `Self::remote_encrypted_virtual_filesystem_pull` with the delete
803    /// parameter set to true
804    async fn remote_encrypted_virtual_filesystem_delete<P: Into<PathBuf> + Send>(
805        &self,
806        virtual_directory: P,
807    ) -> Result<(), NetworkError> {
808        self.can_use_revfs()?;
809        let request = NodeRequest::DeleteObject(DeleteObject {
810            v_conn: *self.user(),
811            virtual_dir: virtual_directory.into(),
812            security_level: Default::default(),
813        });
814
815        let mut stream = self.remote().send_callback_subscription(request).await?;
816        while let Some(event) = stream.next().await {
817            match event.into_result()? {
818                NodeResult::ReVFS(result) => {
819                    return if let Some(error) = result.error_message {
820                        Err(citadel_io::error!(
821                            citadel_io::ErrorCode::RemoteFileTransferFailed,
822                            error
823                        ))
824                    } else {
825                        Ok(())
826                    }
827                }
828
829                evt => {
830                    log::error!(target: "citadel", "Invalid NodeResult for REVFS Delete request received: {evt:?}");
831                }
832            }
833        }
834
835        Err(citadel_io::error!(
836            citadel_io::ErrorCode::RemoteRevfsDeleteStreamDied
837        ))
838    }
839
840    /// Connects to the peer with custom settings
841    async fn connect_to_peer_custom(
842        &self,
843        session_security_settings: SessionSecuritySettings,
844        udp_mode: UdpMode,
845        peer_session_password: Option<PreSharedKey>,
846    ) -> Result<PeerConnectSuccess<R>, NetworkError> {
847        use std::time::Duration;
848
849        // Timeout for the entire P2P connection process.
850        // This prevents indefinite hangs when the server never responds with PeerChannelCreated.
851        // The timeout should be long enough for normal hole punching (which has its own 30s timeout)
852        // plus key exchange, but short enough to fail fast when something is stuck.
853        const P2P_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
854
855        let session_cid = self.user().get_session_cid();
856        let peer_target = self.try_as_peer_connection().await?;
857
858        let mut stream = self
859            .remote()
860            .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
861                session_cid,
862                command: PeerSignal::PostConnect {
863                    peer_conn_type: peer_target,
864                    ticket_opt: None,
865                    invitee_response: None,
866                    session_security_settings,
867                    udp_mode,
868                    session_password: peer_session_password,
869                },
870            }))
871            .await?;
872
873        let connect_task = async {
874            while let Some(status) = stream.next().await {
875                match status.into_result()? {
876                    NodeResult::PeerChannelCreated(PeerChannelCreated {
877                        ticket: _,
878                        channel,
879                        udp_rx_opt,
880                    }) => {
881                        let username = self.target_username().map(ToString::to_string);
882                        let remote = PeerRemote {
883                            inner: self.remote().clone(),
884                            peer: peer_target.as_virtual_connection(),
885                            username,
886                            session_security_settings,
887                        };
888
889                        return Ok(PeerConnectSuccess {
890                            remote,
891                            channel: *channel,
892                            udp_channel_rx: udp_rx_opt,
893                            incoming_object_transfer_handles: None,
894                        });
895                    }
896
897                    NodeResult::PeerEvent(PeerEvent {
898                        event:
899                            PeerSignal::PostConnect {
900                                invitee_response, ..
901                            },
902                        ..
903                    }) => match invitee_response {
904                        Some(PeerResponse::Timeout) => {
905                            return Err(citadel_io::error!(
906                                citadel_io::ErrorCode::RemotePeerNoResponse
907                            ))
908                        }
909                        Some(PeerResponse::Decline) => {
910                            return Err(citadel_io::error!(
911                                citadel_io::ErrorCode::RemotePeerDeclined
912                            ))
913                        }
914                        _ => {}
915                    },
916
917                    _ => {}
918                }
919            }
920
921            Err(citadel_io::error!(
922                citadel_io::ErrorCode::RemoteKernelStreamDied,
923                "connect_to_peer_custom"
924            ))
925        };
926
927        match citadel_io::time::timeout(P2P_CONNECT_TIMEOUT, connect_task).await {
928            Ok(result) => result,
929            Err(_elapsed) => Err(citadel_io::error!(
930                citadel_io::ErrorCode::RemoteP2pConnectTimeout,
931                P2P_CONNECT_TIMEOUT.as_secs()
932            )),
933        }
934    }
935
936    /// Connects to the target peer with default settings
937    async fn connect_to_peer(&self) -> Result<PeerConnectSuccess<R>, NetworkError> {
938        self.connect_to_peer_custom(Default::default(), Default::default(), Default::default())
939            .await
940    }
941
942    /// Posts a registration request to a peer
943    async fn register_to_peer(&self) -> Result<PeerRegisterStatus, NetworkError> {
944        let session_cid = self.user().get_session_cid();
945        let peer_target = self.try_as_peer_connection().await?;
946        // TODO: Get rid of this step. Should be handled by the protocol
947        let local_username = self
948            .remote()
949            .account_manager()
950            .get_username_by_cid(session_cid)
951            .await?
952            .ok_or_else(|| citadel_io::error!(citadel_io::ErrorCode::RemoteLocalUsernameMissing))?;
953        let peer_username_opt = self.target_username().map(ToString::to_string);
954
955        let mut stream = self
956            .remote()
957            .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
958                session_cid,
959                command: PeerSignal::PostRegister {
960                    peer_conn_type: peer_target,
961                    inviter_username: local_username,
962                    invitee_username: peer_username_opt,
963                    ticket_opt: None,
964                    invitee_response: None,
965                },
966            }))
967            .await?;
968
969        while let Some(status) = stream.next().await {
970            if let NodeResult::PeerEvent(PeerEvent {
971                event:
972                    PeerSignal::PostRegister {
973                        peer_conn_type: _,
974                        inviter_username: _,
975                        invitee_username: _,
976                        ticket_opt: _,
977                        invitee_response: Some(resp),
978                    },
979                ticket: _,
980                ..
981            }) = status.into_result()?
982            {
983                match resp {
984                    PeerResponse::Accept(..) => return Ok(PeerRegisterStatus::Accepted),
985                    PeerResponse::Decline => return Ok(PeerRegisterStatus::Declined),
986                    PeerResponse::Timeout => return Ok(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) }),
987                    _ => {}
988                }
989            }
990        }
991
992        Err(citadel_io::error!(
993            citadel_io::ErrorCode::RemoteKernelStreamDied,
994            format!("register_to_peer: {:?}", stream.callback_key())
995        ))
996    }
997
998    /// Deregisters the currently locked target. If the target is a client to server
999    /// connection, deregisters from the server. If the target is a p2p connection,
1000    /// deregisters the p2p
1001    async fn deregister(&self) -> Result<(), NetworkError> {
1002        if let Ok(peer_conn) = self.try_as_peer_connection().await {
1003            let peer_request = PeerSignal::Deregister {
1004                peer_conn_type: peer_conn,
1005            };
1006            let session_cid = self.user().get_session_cid();
1007            let request = NodeRequest::PeerCommand(PeerCommand {
1008                session_cid,
1009                command: peer_request,
1010            });
1011
1012            let mut subscription = self.remote().send_callback_subscription(request).await?;
1013            while let Some(result) = subscription.next().await {
1014                if let NodeResult::PeerEvent(PeerEvent {
1015                    event: PeerSignal::DeregistrationSuccess { .. },
1016                    ticket: _,
1017                    ..
1018                }) = result.into_result()?
1019                {
1020                    return Ok(());
1021                }
1022            }
1023        } else {
1024            // c2s conn
1025            let cid = self.user().get_session_cid();
1026            let request = NodeRequest::DeregisterFromHypernode(DeregisterFromHypernode {
1027                session_cid: cid,
1028                v_conn_type: *self.user(),
1029            });
1030            let mut subscription = self.remote().send_callback_subscription(request).await?;
1031            while let Some(result) = subscription.next().await {
1032                match result.into_result()? {
1033                    NodeResult::DeRegistration(DeRegistration {
1034                        session_cid: _,
1035                        ticket_opt: _,
1036                        success: true,
1037                    }) => return Ok(()),
1038                    NodeResult::DeRegistration(DeRegistration {
1039                        session_cid: _,
1040                        ticket_opt: _,
1041                        success: false,
1042                    }) => {
1043                        return Err(citadel_io::error!(
1044                            citadel_io::ErrorCode::RemoteDeregisterFailed
1045                        ))
1046                    }
1047
1048                    _ => {}
1049                }
1050            }
1051        }
1052
1053        Err(citadel_io::error!(
1054            citadel_io::ErrorCode::RemoteDeregisterEndedUnexpectedly
1055        ))
1056    }
1057
1058    async fn disconnect(&self) -> Result<(), NetworkError> {
1059        if let Ok(peer_conn) = self.try_as_peer_connection().await {
1060            if let PeerConnectionType::LocalGroupPeer {
1061                session_cid,
1062                peer_cid: _,
1063            } = peer_conn
1064            {
1065                let request = NodeRequest::PeerCommand(PeerCommand {
1066                    session_cid,
1067                    command: PeerSignal::Disconnect {
1068                        peer_conn_type: peer_conn,
1069                        disconnect_response: None,
1070                        disconnect_token: None,
1071                    },
1072                });
1073
1074                let mut subscription = self.remote().send_callback_subscription(request).await?;
1075
1076                while let Some(event) = subscription.next().await {
1077                    if let NodeResult::PeerEvent(PeerEvent {
1078                        event:
1079                            PeerSignal::Disconnect {
1080                                peer_conn_type: _,
1081                                disconnect_response: Some(_),
1082                                ..
1083                            },
1084                        ticket: _,
1085                        ..
1086                    }) = event.into_result()?
1087                    {
1088                        return Ok(());
1089                    }
1090                }
1091
1092                Err(citadel_io::error!(
1093                    citadel_io::ErrorCode::RemoteDisconnectEventMissing
1094                ))
1095            } else {
1096                Err(citadel_io::error!(
1097                    citadel_io::ErrorCode::RemoteExternalGroupPeerUnsupported
1098                ))
1099            }
1100        } else {
1101            //c2s conn
1102            let cid = self.user().get_session_cid();
1103            let request =
1104                NodeRequest::DisconnectFromHypernode(DisconnectFromHypernode { session_cid: cid });
1105
1106            let mut subscription = self.remote().send_callback_subscription(request).await?;
1107            while let Some(event) = subscription.next().await {
1108                if let NodeResult::Disconnect(Disconnect {
1109                    success, message, ..
1110                }) = event.into_result()?
1111                {
1112                    return if success {
1113                        Ok(())
1114                    } else {
1115                        Err(citadel_io::error!(
1116                            citadel_io::ErrorCode::RemoteDisconnected,
1117                            message
1118                        ))
1119                    };
1120                }
1121            }
1122
1123            Err(citadel_io::error!(
1124                citadel_io::ErrorCode::RemoteDisconnectEventMissing
1125            ))
1126        }
1127    }
1128
1129    async fn create_group(
1130        &self,
1131        initial_users_to_invite: Option<Vec<UserIdentifier>>,
1132    ) -> Result<GroupChannel, NetworkError> {
1133        self.create_group_with_options(initial_users_to_invite, MessageGroupOptions::default())
1134            .await
1135    }
1136
1137    /// Create a group with explicit [`MessageGroupOptions`] — e.g. a zero-trust
1138    /// [`GroupHierarchyMode::CommandHierarchy`](citadel_types::proto::GroupHierarchyMode) where a
1139    /// superior can read its subordinates' messages. `options.hierarchy` carries the initial rank
1140    /// assignment (member cid → command path); it is consumed owner-locally and never reaches the relay.
1141    async fn create_group_with_options(
1142        &self,
1143        initial_users_to_invite: Option<Vec<UserIdentifier>>,
1144        options: MessageGroupOptions,
1145    ) -> Result<GroupChannel, NetworkError> {
1146        let session_cid = self.user().get_session_cid();
1147
1148        let mut initial_users = vec![];
1149        // NOTE: default is PRIVATE mode, meaning all users in group must be registered to the owner.
1150        // Initial users are UserIdentifiers resolved to cids below.
1151        if let Some(initial_users_to_invite) = initial_users_to_invite {
1152            for user in initial_users_to_invite {
1153                initial_users.push(
1154                    self.remote()
1155                        .account_manager()
1156                        .find_target_information(session_cid, user.clone())
1157                        .await?
1158                        .ok_or_else(|| {
1159                            citadel_io::error!(
1160                                citadel_io::ErrorCode::RemoteGroupAccountNotFound,
1161                                citadel_io::Dbg(user),
1162                                citadel_io::Dbg(session_cid)
1163                            )
1164                        })
1165                        .map(|r| r.1.cid)?,
1166                )
1167            }
1168        }
1169
1170        let group_request = GroupBroadcast::Create {
1171            initial_invitees: initial_users,
1172            options,
1173        };
1174        let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1175            session_cid,
1176            command: group_request,
1177        });
1178        let mut subscription = self.remote().send_callback_subscription(request).await?;
1179        while let Some(evt) = subscription.next().await {
1180            if let NodeResult::GroupChannelCreated(GroupChannelCreated {
1181                ticket: _,
1182                channel,
1183                session_cid: _,
1184            }) = evt
1185            {
1186                return Ok(channel);
1187            }
1188        }
1189
1190        Err(citadel_io::error!(
1191            citadel_io::ErrorCode::RemoteCreateGroupEndedUnexpectedly
1192        ))
1193    }
1194
1195    /// Lists all groups that which the current peer owns
1196    async fn list_owned_groups(&self) -> Result<Vec<MessageGroupKey>, NetworkError> {
1197        let session_cid = self.user().get_session_cid();
1198        let cid_to_check_for = match self.try_as_peer_connection().await {
1199            Ok(res) => res.get_original_target_cid(),
1200            _ => session_cid,
1201        };
1202        let group_request = GroupBroadcast::ListGroupsFor {
1203            cid: cid_to_check_for,
1204        };
1205        let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1206            session_cid,
1207            command: group_request,
1208        });
1209
1210        let mut subscription = self.remote().send_callback_subscription(request).await?;
1211
1212        while let Some(evt) = subscription.next().await {
1213            if let NodeResult::GroupEvent(GroupEvent {
1214                session_cid: _,
1215                ticket: _,
1216                event: GroupBroadcast::ListResponse { groups },
1217            }) = evt.into_result()?
1218            {
1219                return Ok(groups);
1220            }
1221        }
1222
1223        Err(citadel_io::error!(
1224            citadel_io::ErrorCode::RemoteListGroupsEndedUnexpectedly
1225        ))
1226    }
1227
1228    /// Lists all active sessions, including the local nat type. For each active session,
1229    /// lists each connection info which includes the remote nat type, connection status,
1230    /// peer id, and latest ratchet version.
1231    async fn list_sessions(&self) -> Result<ActiveSessions, NetworkError> {
1232        let request = NodeRequest::GetActiveSessions;
1233        let mut subscription = self.remote().send_callback_subscription(request).await?;
1234
1235        if let Some(NodeResult::SessionList(result)) = subscription.next().await {
1236            return Ok(result.sessions);
1237        }
1238
1239        Err(citadel_io::error!(
1240            citadel_io::ErrorCode::RemoteListSessionsEndedUnexpectedly
1241        ))
1242    }
1243
1244    /// Begins a re-key, updating the container in the process.
1245    /// Returns the new key matrix version. Does not return the new key version
1246    /// if the rekey fails, or, if a current rekey is already executing
1247    async fn rekey(&self) -> Result<Option<u32>, NetworkError> {
1248        let request = NodeRequest::ReKey(ReKey {
1249            v_conn_type: *self.user(),
1250        });
1251        let mut subscription = self.remote().send_callback_subscription(request).await?;
1252
1253        while let Some(evt) = subscription.next().await {
1254            if let NodeResult::ReKeyResult(result) = evt {
1255                return match result.status {
1256                    ReKeyReturnType::Success { version } => Ok(Some(version)),
1257                    ReKeyReturnType::AlreadyInProgress => Ok(None),
1258                    ReKeyReturnType::Failure { err } => Err(citadel_io::error!(
1259                        citadel_io::ErrorCode::RemoteRekeyFailed,
1260                        err
1261                    )),
1262                };
1263            }
1264        }
1265
1266        Err(citadel_io::error!(
1267            citadel_io::ErrorCode::RemoteRekeyEndedUnexpectedly
1268        ))
1269    }
1270
1271    /// Checks if the locked target is registered
1272    async fn is_peer_registered(&self) -> Result<bool, NetworkError> {
1273        let target = self.try_as_peer_connection().await?;
1274        if let PeerConnectionType::LocalGroupPeer {
1275            session_cid: local_cid,
1276            peer_cid,
1277        } = target
1278        {
1279            let peers = self.remote().get_local_group_peers(local_cid, None).await?;
1280            citadel_logging::info!(target: "citadel", "Checking to see if {target} is registered in {peers:?}");
1281            Ok(peers.iter().any(|p| p.cid == peer_cid))
1282        } else {
1283            Err(citadel_io::error!(
1284                citadel_io::ErrorCode::RemoteExternalGroupPeerUnsupportedYet
1285            ))
1286        }
1287    }
1288
1289    #[doc(hidden)]
1290    async fn try_as_peer_connection(&self) -> Result<PeerConnectionType, NetworkError> {
1291        let verified_return = |user: &VirtualTargetType| {
1292            user.try_as_peer_connection().ok_or(citadel_io::error!(
1293                citadel_io::ErrorCode::RemoteTargetNotPeer
1294            ))
1295        };
1296
1297        if self.user().get_target_cid() == 0 {
1298            // in this case, the user re-used a remote locked to a registration target
1299            // where the username was provided, but the cid was 0 (unknown).
1300            let peer_username = self.target_username().ok_or_else(|| {
1301                citadel_io::error!(citadel_io::ErrorCode::RemoteTargetCidZeroNoUsername)
1302            })?;
1303            let session_cid = self.user().get_session_cid();
1304            let expected_peer_cid = self
1305                .remote()
1306                .account_manager()
1307                .get_persistence_handler()
1308                .get_cid_by_username(peer_username);
1309            // get the peer cid from the account manager (implying the peers are already registered).
1310            // fallback to the mapped cid if the peer is not registered
1311            let peer_cid = self
1312                .remote()
1313                .account_manager()
1314                .find_target_information(session_cid, peer_username)
1315                .await?
1316                .map(|r| r.1.cid)
1317                .unwrap_or(expected_peer_cid);
1318
1319            let mut user = *self.user();
1320            user.set_target_cid(peer_cid);
1321            verified_return(&user)
1322        } else {
1323            verified_return(self.user())
1324        }
1325    }
1326
1327    #[doc(hidden)]
1328    fn can_use_revfs(&self) -> Result<(), NetworkError> {
1329        if let Some(sess) = self.session_security_settings() {
1330            if sess.crypto_params.kem_algorithm == KemAlgorithm::MlKem {
1331                Ok(())
1332            } else {
1333                Err(citadel_io::error!(
1334                    citadel_io::ErrorCode::RemoteRevfsRequiresKyber
1335                ))
1336            }
1337        } else {
1338            Err(citadel_io::error!(
1339                citadel_io::ErrorCode::RemoteRevfsUnsupportedRemote
1340            ))
1341        }
1342    }
1343}
1344
1345impl<T: TargetLockedRemote<R>, R: Ratchet> ProtocolRemoteTargetExt<R> for T {}
1346
1347pub mod results {
1348    use crate::prefabs::client::peer_connection::FileTransferHandleRx;
1349    use crate::prelude::{PeerChannel, UdpChannel};
1350    use crate::remote_ext::remote_specialization::PeerRemote;
1351    use citadel_io::tokio::sync::oneshot::Receiver;
1352    use citadel_proto::prelude::*;
1353    use std::fmt::Debug;
1354
1355    pub struct PeerConnectSuccess<R: Ratchet> {
1356        pub channel: PeerChannel<R>,
1357        pub udp_channel_rx: Option<Receiver<UdpChannel<R>>>,
1358        pub remote: PeerRemote<R>,
1359        /// Receives incoming file/object transfer requests. The handles must be
1360        /// .accepted() before the file/object transfer is allowed to proceed
1361        pub(crate) incoming_object_transfer_handles: Option<FileTransferHandleRx>,
1362    }
1363
1364    impl<R: Ratchet> Debug for PeerConnectSuccess<R> {
1365        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1366            f.debug_struct("PeerConnectSuccess")
1367                .field("channel", &self.channel)
1368                .field("udp_channel_rx", &self.udp_channel_rx)
1369                .finish()
1370        }
1371    }
1372
1373    impl<R: Ratchet> PeerConnectSuccess<R> {
1374        /// Obtains a receiver which yields incoming file/object transfer handles
1375        pub fn get_incoming_file_transfer_handle(
1376            &mut self,
1377        ) -> Result<FileTransferHandleRx, NetworkError> {
1378            self.incoming_object_transfer_handles
1379                .take()
1380                .ok_or(citadel_io::error!(
1381                    citadel_io::ErrorCode::RemoteFunctionAlreadyCalled
1382                ))
1383        }
1384    }
1385
1386    pub enum PeerRegisterStatus {
1387        Accepted,
1388        Declined,
1389        Failed { reason: Option<String> },
1390    }
1391
1392    #[derive(Clone, Debug)]
1393    pub struct LocalGroupPeer {
1394        pub cid: u64,
1395        pub is_online: bool,
1396    }
1397
1398    #[derive(Clone, Debug)]
1399    pub struct LocalGroupPeerFullInfo {
1400        pub cid: u64,
1401        pub username: Option<String>,
1402        pub full_name: Option<String>,
1403        pub is_online: bool,
1404    }
1405}
1406
1407pub mod remote_specialization {
1408    use crate::prelude::*;
1409    use std::ops::{Deref, DerefMut};
1410
1411    #[derive(Debug, Clone)]
1412    pub struct PeerRemote<R: Ratchet> {
1413        pub(crate) inner: NodeRemote<R>,
1414        pub(crate) peer: VirtualTargetType,
1415        pub(crate) username: Option<String>,
1416        pub(crate) session_security_settings: SessionSecuritySettings,
1417    }
1418
1419    impl<R: Ratchet> Deref for PeerRemote<R> {
1420        type Target = NodeRemote<R>;
1421        fn deref(&self) -> &Self::Target {
1422            &self.inner
1423        }
1424    }
1425
1426    impl<R: Ratchet> DerefMut for PeerRemote<R> {
1427        fn deref_mut(&mut self) -> &mut Self::Target {
1428            &mut self.inner
1429        }
1430    }
1431
1432    impl<R: Ratchet> TargetLockedRemote<R> for PeerRemote<R> {
1433        fn user(&self) -> &VirtualTargetType {
1434            &self.peer
1435        }
1436        fn remote(&self) -> &NodeRemote<R> {
1437            &self.inner
1438        }
1439        fn target_username(&self) -> Option<&str> {
1440            self.username.as_deref()
1441        }
1442        fn user_mut(&mut self) -> &mut VirtualTargetType {
1443            &mut self.peer
1444        }
1445
1446        fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
1447            Some(&self.session_security_settings)
1448        }
1449    }
1450}
1451
1452#[cfg(all(test, not(target_family = "wasm")))]
1453mod tests {
1454    use crate::prefabs::client::single_connection::SingleClientServerConnectionKernel;
1455    use crate::prefabs::client::DefaultServerConnectionSettingsBuilder;
1456    use crate::prelude::*;
1457    use citadel_io::tokio;
1458    use rstest::rstest;
1459    use std::net::SocketAddr;
1460    use std::sync::atomic::{AtomicBool, Ordering};
1461    use std::sync::Arc;
1462    use uuid::Uuid;
1463
1464    pub struct ReceiverFileTransferKernel<R: Ratchet>(
1465        pub Option<NodeRemote<R>>,
1466        pub Arc<AtomicBool>,
1467    );
1468
1469    #[async_trait]
1470    impl<R: Ratchet> NetKernel<R> for ReceiverFileTransferKernel<R> {
1471        fn load_remote(&mut self, node_remote: NodeRemote<R>) -> Result<(), NetworkError> {
1472            self.0 = Some(node_remote);
1473            Ok(())
1474        }
1475
1476        async fn on_start(&self) -> Result<(), NetworkError> {
1477            Ok(())
1478        }
1479
1480        async fn on_node_event_received(&self, message: NodeResult<R>) -> Result<(), NetworkError> {
1481            log::trace!(target: "citadel", "SERVER received {:?}", message);
1482            if let NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) =
1483                message.into_result()?
1484            {
1485                let mut path = None;
1486                // accept the transfer
1487                handle
1488                    .accept()
1489                    .map_err(|err| NetworkError::msg(err.into_string()))?;
1490
1491                use citadel_types::proto::ObjectTransferStatus;
1492                use futures::StreamExt;
1493                while let Some(status) = handle.next().await {
1494                    match status {
1495                        ObjectTransferStatus::ReceptionComplete => {
1496                            log::trace!(target: "citadel", "Server has finished receiving the file!");
1497                            let cmp = include_bytes!("../../resources/TheBridge.pdf");
1498                            let streamed_data = citadel_io::tokio::fs::read(path.clone().unwrap())
1499                                .await
1500                                .unwrap();
1501                            assert_eq!(
1502                                cmp,
1503                                streamed_data.as_slice(),
1504                                "Original data and streamed data does not match"
1505                            );
1506
1507                            self.1.store(true, Ordering::Relaxed);
1508                            self.0.clone().unwrap().shutdown().await?;
1509                        }
1510
1511                        ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => {
1512                            path = Some(file_path);
1513                            assert_eq!(vfm.name, "TheBridge.pdf")
1514                        }
1515
1516                        _ => {}
1517                    }
1518                }
1519            }
1520
1521            Ok(())
1522        }
1523
1524        async fn on_stop(&mut self) -> Result<(), NetworkError> {
1525            Ok(())
1526        }
1527    }
1528
1529    pub fn server_info<'a, R: Ratchet>(
1530        switch: Arc<AtomicBool>,
1531    ) -> (NodeFuture<'a, ReceiverFileTransferKernel<R>>, SocketAddr) {
1532        crate::test_common::server_test_node(ReceiverFileTransferKernel(None, switch), |_| {})
1533    }
1534
1535    #[rstest]
1536    #[case(
1537        EncryptionAlgorithm::AES_GCM_256,
1538        KemAlgorithm::MlKem,
1539        SigAlgorithm::None
1540    )]
1541    #[case(
1542        EncryptionAlgorithm::MlKemHybrid,
1543        KemAlgorithm::MlKem,
1544        SigAlgorithm::MlDsa65
1545    )]
1546    #[timeout(std::time::Duration::from_secs(90))]
1547    #[tokio::test]
1548    async fn test_c2s_file_transfer(
1549        #[case] enx: EncryptionAlgorithm,
1550        #[case] kem: KemAlgorithm,
1551        #[case] sig: SigAlgorithm,
1552    ) {
1553        citadel_logging::setup_log();
1554        let client_success = &AtomicBool::new(false);
1555        let server_success = &Arc::new(AtomicBool::new(false));
1556        let (server, server_addr) = server_info::<StackedRatchet>(server_success.clone());
1557        let uuid = Uuid::new_v4();
1558
1559        let session_security_settings = SessionSecuritySettingsBuilder::default()
1560            .with_crypto_params(enx + kem + sig)
1561            .build()
1562            .unwrap();
1563
1564        let server_connection_settings =
1565            DefaultServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid)
1566                .with_session_security_settings(session_security_settings)
1567                .disable_udp()
1568                .build()
1569                .unwrap();
1570
1571        let client_kernel = SingleClientServerConnectionKernel::new(
1572            server_connection_settings,
1573            |connection| async move {
1574                log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***");
1575                connection
1576                    .send_file_with_custom_opts(
1577                        "../resources/TheBridge.pdf",
1578                        32 * 1024,
1579                        TransferType::FileTransfer,
1580                    )
1581                    .await
1582                    .unwrap();
1583                log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***");
1584                client_success.store(true, Ordering::Relaxed);
1585                connection.shutdown_kernel().await
1586            },
1587        );
1588
1589        let client = DefaultNodeBuilder::default().build(client_kernel).unwrap();
1590
1591        let joined = futures::future::try_join(server, client);
1592
1593        let _ = joined.await.unwrap();
1594
1595        assert!(client_success.load(Ordering::Relaxed));
1596        assert!(server_success.load(Ordering::Relaxed));
1597    }
1598}