1use crate::prefabs::ClientServerRemote;
64use crate::prelude::results::{PeerConnectSuccess, PeerRegisterStatus};
65use crate::prelude::*;
66use crate::remote_ext::remote_specialization::PeerRemote;
67use crate::remote_ext::results::LocalGroupPeerFullInfo;
68use std::ops::{Deref, DerefMut};
69
70use futures::StreamExt;
71use std::path::PathBuf;
72use std::time::Duration;
73
74pub(crate) mod user_ids {
75 use crate::prelude::*;
76 use std::ops::Deref;
77
78 #[derive(Debug)]
79 pub struct SymmetricIdentifierHandleRef<'a, R: Ratchet> {
81 pub(crate) user: VirtualTargetType,
82 pub(crate) remote: &'a NodeRemote<R>,
83 pub(crate) target_username: Option<String>,
84 }
85
86 impl<R: Ratchet> SymmetricIdentifierHandleRef<'_, R> {
87 pub fn into_owned(self) -> SymmetricIdentifierHandle<R> {
88 SymmetricIdentifierHandle {
89 user: self.user,
90 remote: self.remote.clone(),
91 target_username: self.target_username,
92 }
93 }
94 }
95
96 #[derive(Clone, Debug)]
97 pub struct SymmetricIdentifierHandle<R: Ratchet> {
99 user: VirtualTargetType,
100 remote: NodeRemote<R>,
101 target_username: Option<String>,
102 }
103
104 pub trait TargetLockedRemote<R: Ratchet>: Send + Sync {
105 fn user(&self) -> &VirtualTargetType;
106 fn remote(&self) -> &NodeRemote<R>;
107 fn target_username(&self) -> Option<&str>;
108 fn user_mut(&mut self) -> &mut VirtualTargetType;
109 fn session_security_settings(&self) -> Option<&SessionSecuritySettings>;
110 }
111
112 impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandleRef<'_, R> {
113 fn user(&self) -> &VirtualTargetType {
114 &self.user
115 }
116 fn remote(&self) -> &NodeRemote<R> {
117 self.remote
118 }
119 fn target_username(&self) -> Option<&str> {
120 self.target_username.as_deref()
121 }
122 fn user_mut(&mut self) -> &mut VirtualTargetType {
123 &mut self.user
124 }
125
126 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
127 None
128 }
129 }
130
131 impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandle<R> {
132 fn user(&self) -> &VirtualTargetType {
133 &self.user
134 }
135 fn remote(&self) -> &NodeRemote<R> {
136 &self.remote
137 }
138 fn target_username(&self) -> Option<&str> {
139 self.target_username.as_deref()
140 }
141 fn user_mut(&mut self) -> &mut VirtualTargetType {
142 &mut self.user
143 }
144
145 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
146 None
147 }
148 }
149
150 impl<R: Ratchet> From<SymmetricIdentifierHandleRef<'_, R>> for SymmetricIdentifierHandle<R> {
151 fn from(this: SymmetricIdentifierHandleRef<'_, R>) -> Self {
152 this.into_owned()
153 }
154 }
155
156 impl<R: Ratchet> Deref for SymmetricIdentifierHandle<R> {
157 type Target = NodeRemote<R>;
158
159 fn deref(&self) -> &Self::Target {
160 &self.remote
161 }
162 }
163
164 impl<R: Ratchet> Deref for SymmetricIdentifierHandleRef<'_, R> {
165 type Target = NodeRemote<R>;
166
167 fn deref(&self) -> &Self::Target {
168 self.remote
169 }
170 }
171}
172
173pub struct CitadelClientServerConnection<R: Ratchet> {
175 pub(crate) channel: Option<PeerChannel<R>>,
177 pub remote: ClientServerRemote<R>,
178 pub udp_channel_rx: Option<citadel_io::tokio::sync::oneshot::Receiver<UdpChannel<R>>>,
180 pub services: ServicesObject,
182 pub cid: u64,
183 pub session_security_settings: SessionSecuritySettings,
184}
185
186impl<R: Ratchet> CitadelClientServerConnection<R> {
187 pub fn split(self) -> (PeerChannelSendHalf<R>, PeerChannelRecvHalf<R>) {
193 self.channel.expect("Channel already taken").split()
194 }
195
196 pub fn take_channel(&mut self) -> Option<PeerChannel<R>> {
197 self.channel.take()
198 }
199}
200
201impl<R: Ratchet> Deref for CitadelClientServerConnection<R> {
202 type Target = ClientServerRemote<R>;
203
204 fn deref(&self) -> &Self::Target {
205 &self.remote
206 }
207}
208
209impl<R: Ratchet> DerefMut for CitadelClientServerConnection<R> {
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 &mut self.remote
212 }
213}
214
215pub struct RegisterSuccess {
217 pub cid: u64,
218}
219
220#[async_trait]
221pub trait ProtocolRemoteExt<R: Ratchet>: Remote<R> {
223 async fn register<
226 T: std::net::ToSocketAddrs + Send,
227 P: Into<String> + Send,
228 V: Into<String> + Send,
229 K: Into<SecBuffer> + Send,
230 >(
231 &self,
232 addr: T,
233 full_name: P,
234 username: V,
235 proposed_password: K,
236 default_security_settings: SessionSecuritySettings,
237 server_password: Option<PreSharedKey>,
238 ) -> Result<RegisterSuccess, NetworkError> {
239 let creds =
240 ProposedCredentials::new_register(full_name, username, proposed_password.into())
241 .await?;
242 let register_request = NodeRequest::RegisterToHypernode(RegisterToHypernode {
243 remote_addr: addr.to_socket_addrs()?.next().ok_or(citadel_io::error!(
244 citadel_io::ErrorCode::RemoteInvalidSocketAddr
245 ))?,
246 proposed_credentials: creds,
247 static_security_settings: default_security_settings,
248 session_password: server_password.unwrap_or_default(),
249 });
250
251 let mut subscription = self.send_callback_subscription(register_request).await?;
252 while let Some(status) = subscription.next().await {
253 match status.into_result()? {
254 NodeResult::RegisterOkay(RegisterOkay { cid, .. }) => {
255 return Ok(RegisterSuccess { cid });
256 }
257 NodeResult::RegisterFailure(err) => {
258 return Err(citadel_io::error!(
259 citadel_io::ErrorCode::RemoteRegisterFailure,
260 err.error_message
261 ));
262 }
263 NodeResult::Disconnect(err) => {
264 return Err(citadel_io::error!(
265 citadel_io::ErrorCode::RemoteDisconnected,
266 err.message
267 ));
268 }
269 evt => {
270 log::warn!(target: "citadel", "Invalid NodeResult for Register request received: {evt:?}");
271 }
272 }
273 }
274
275 Err(citadel_io::error!(
276 citadel_io::ErrorCode::RemoteKernelStreamDied,
277 "register"
278 ))
279 }
280
281 async fn register_with_defaults<
284 T: std::net::ToSocketAddrs + Send,
285 P: Into<String> + Send,
286 V: Into<String> + Send,
287 K: Into<SecBuffer> + Send,
288 >(
289 &self,
290 addr: T,
291 full_name: P,
292 username: V,
293 proposed_password: K,
294 ) -> Result<RegisterSuccess, NetworkError> {
295 self.register(
296 addr,
297 full_name,
298 username,
299 proposed_password,
300 Default::default(),
301 Default::default(),
302 )
303 .await
304 }
305
306 async fn connect(
309 &self,
310 auth: AuthenticationRequest,
311 connect_mode: ConnectMode,
312 udp_mode: UdpMode,
313 keep_alive_timeout: Option<Duration>,
314 session_security_settings: SessionSecuritySettings,
315 server_password: Option<PreSharedKey>,
316 ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
317 let connect_request = NodeRequest::ConnectToHypernode(ConnectToHypernode {
318 auth_request: auth,
319 connect_mode,
320 udp_mode,
321 keep_alive_timeout: keep_alive_timeout.map(|r| r.as_secs()),
322 session_security_settings,
323 session_password: server_password.unwrap_or_default(),
324 });
325
326 let mut subscription = self.send_callback_subscription(connect_request).await?;
327 let status = subscription.next().await.ok_or(citadel_io::error!(
328 citadel_io::ErrorCode::RemoteKernelStreamDied,
329 "connect"
330 ))?;
331
332 return match status.into_result()? {
333 NodeResult::ConnectSuccess(ConnectSuccess {
334 ticket: _,
335 session_cid: cid,
336 remote_addr: _,
337 is_personal: _,
338 v_conn_type,
339 services,
340 welcome_message: _,
341 channel,
342 udp_rx_opt: udp_channel_rx,
343 session_security_settings,
344 }) => Ok(CitadelClientServerConnection {
345 remote: ClientServerRemote::new(
346 v_conn_type,
347 self.remote_ref().clone(),
348 session_security_settings,
349 None,
350 None,
351 ),
352 channel: Some(*channel),
353 udp_channel_rx,
354 services,
355 cid,
356 session_security_settings,
357 }),
358 NodeResult::ConnectFail(ConnectFail {
359 ticket: _,
360 cid_opt: _,
361 error_message: err,
362 }) => Err(citadel_io::error!(
363 citadel_io::ErrorCode::RemoteConnectFailed,
364 err
365 )),
366 NodeResult::Disconnect(err) => {
367 return Err(citadel_io::error!(
368 citadel_io::ErrorCode::RemoteDisconnected,
369 err.message
370 ));
371 }
372 res => Err(citadel_io::error!(
373 citadel_io::ErrorCode::RemoteConnectUnexpectedResponse,
374 citadel_io::Dbg(res)
375 )),
376 };
377 }
378
379 async fn connect_with_defaults(
382 &self,
383 auth: AuthenticationRequest,
384 ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
385 self.connect(
386 auth,
387 Default::default(),
388 Default::default(),
389 None,
390 Default::default(),
391 Default::default(),
392 )
393 .await
394 }
395
396 async fn find_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
409 &self,
410 local_user: T,
411 peer: P,
412 ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
413 let account_manager = self.account_manager();
414 account_manager
415 .find_target_information(local_user, peer)
416 .await?
417 .map(move |(cid, peer)| {
418 if peer.parent_icid != 0 {
419 SymmetricIdentifierHandleRef {
420 user: VirtualTargetType::ExternalGroupPeer {
421 session_cid: cid,
422 interserver_cid: peer.parent_icid,
423 peer_cid: peer.cid,
424 },
425 remote: self.remote_ref(),
426 target_username: None,
427 }
428 } else {
429 SymmetricIdentifierHandleRef {
430 user: VirtualTargetType::LocalGroupPeer {
431 session_cid: cid,
432 peer_cid: peer.cid,
433 },
434 remote: self.remote_ref(),
435 target_username: None,
436 }
437 }
438 })
439 .ok_or_else(|| citadel_io::error!(citadel_io::ErrorCode::RemoteTargetPairNotFound))
440 }
441
442 async fn propose_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
445 &self,
446 local_user: T,
447 peer: P,
448 ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
449 let local_cid = self.get_session_cid(local_user).await?;
450 match peer.into() {
451 UserIdentifier::ID(peer_cid) => Ok(SymmetricIdentifierHandleRef {
452 user: VirtualTargetType::LocalGroupPeer {
453 session_cid: local_cid,
454 peer_cid,
455 },
456 remote: self.remote_ref(),
457 target_username: None,
458 }),
459 UserIdentifier::Username(uname) => {
460 let peer_cid = self
461 .remote_ref()
462 .account_manager()
463 .find_target_information(local_cid, uname.clone())
464 .await?
465 .map(|r| r.1.cid)
466 .unwrap_or(0);
467 Ok(SymmetricIdentifierHandleRef {
468 user: VirtualTargetType::LocalGroupPeer {
469 session_cid: local_cid,
470 peer_cid,
471 },
472 remote: self.remote_ref(),
473 target_username: Some(uname),
474 })
475 }
476 }
477 }
478
479 async fn get_local_group_peers<T: Into<UserIdentifier> + Send>(
482 &self,
483 local_user: T,
484 limit: Option<usize>,
485 ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
486 let local_cid = self.get_session_cid(local_user).await?;
487 let command = NodeRequest::PeerCommand(PeerCommand {
488 session_cid: local_cid,
489 command: PeerSignal::GetRegisteredPeers {
490 peer_conn_type: ClientConnectionType::Server {
491 session_cid: local_cid,
492 },
493 response: None,
494 limit: limit.map(|r| r as i32),
495 },
496 });
497
498 let mut stream = self.send_callback_subscription(command).await?;
499
500 while let Some(status) = stream.next().await {
501 if let NodeResult::PeerEvent(PeerEvent {
502 event:
503 PeerSignal::GetRegisteredPeers {
504 peer_conn_type: _,
505 response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
506 limit: _,
507 },
508 ticket: _,
509 ..
510 }) = status.into_result()?
511 {
512 return Ok(peer_info
513 .into_iter()
514 .zip(is_onlines)
515 .filter_map(|(peer_info, is_online)| {
516 peer_info.map(|info| LocalGroupPeerFullInfo {
517 cid: info.cid,
518 username: Some(info.username),
519 full_name: Some(info.full_name),
520 is_online,
521 })
522 })
523 .collect());
524 }
525 }
526
527 Err(citadel_io::error!(
528 citadel_io::ErrorCode::RemoteKernelStreamDied,
529 "get_local_group_peers"
530 ))
531 }
532
533 async fn get_local_group_mutual_peers<T: Into<UserIdentifier> + Send>(
535 &self,
536 local_user: T,
537 ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
538 let local_cid = self.get_session_cid(local_user).await?;
539 let command = NodeRequest::PeerCommand(PeerCommand {
540 session_cid: local_cid,
541 command: PeerSignal::GetMutuals {
542 v_conn_type: ClientConnectionType::Server {
543 session_cid: local_cid,
544 },
545 response: None,
546 },
547 });
548
549 let mut stream = self.send_callback_subscription(command).await?;
550
551 while let Some(status) = stream.next().await {
552 if let NodeResult::PeerEvent(PeerEvent {
553 event:
554 PeerSignal::GetMutuals {
555 v_conn_type: _,
556 response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
557 },
558 ticket: _,
559 ..
560 }) = status.into_result()?
561 {
562 return Ok(peer_info
563 .into_iter()
564 .zip(is_onlines)
565 .filter_map(|(peer_info, is_online)| {
566 peer_info.map(|info| LocalGroupPeerFullInfo {
567 cid: info.cid,
568 username: Some(info.username),
569 full_name: Some(info.full_name),
570 is_online,
571 })
572 })
573 .collect());
574 }
575 }
576
577 Err(citadel_io::error!(
578 citadel_io::ErrorCode::RemoteSessionStreamDied
579 ))
580 }
581
582 async fn sessions(&self) -> Result<ActiveSessions, NetworkError> {
585 match self
586 .send_callback_subscription(NodeRequest::GetActiveSessions)
587 .await
588 {
589 Ok(mut stream) => {
590 while let Some(result) = stream.next().await {
591 match result.into_result()? {
592 NodeResult::SessionList(res) => return Ok(res.sessions),
593 res => {
594 citadel_logging::warn!("Received unexpected result: {res:?}");
595 }
596 }
597 }
598
599 citadel_logging::warn!("Failed to receive response from SDK (stream died)");
600 return Err(citadel_io::error!(
601 citadel_io::ErrorCode::RemoteKernelStreamDied,
602 "get_active_sessions"
603 ));
604 }
605 Err(e) => {
606 citadel_logging::warn!(target: "citadel", "Failed to query SDK sessions: {e}");
607 }
608 }
609
610 Err(citadel_io::error!(
611 citadel_io::ErrorCode::RemoteQuerySessionsFailed
612 ))
613 }
614
615 #[doc(hidden)]
616 fn remote_ref(&self) -> &NodeRemote<R>;
617
618 #[doc(hidden)]
619 async fn get_session_cid<T: Into<UserIdentifier> + Send>(
620 &self,
621 local_user: T,
622 ) -> Result<u64, NetworkError> {
623 let account_manager = self.account_manager();
624 Ok(account_manager
625 .find_local_user_information(local_user)
626 .await?
627 .ok_or(citadel_io::error!(
628 citadel_io::ErrorCode::RemoteUserDoesNotExist
629 ))?)
630 }
631}
632
633impl<R: Ratchet> ProtocolRemoteExt<R> for NodeRemote<R> {
634 fn remote_ref(&self) -> &NodeRemote<R> {
635 self
636 }
637}
638
639impl<R: Ratchet> ProtocolRemoteExt<R> for ClientServerRemote<R> {
640 fn remote_ref(&self) -> &NodeRemote<R> {
641 &self.inner
642 }
643}
644
645#[async_trait]
646pub trait ProtocolRemoteTargetExt<R: Ratchet>: TargetLockedRemote<R> {
648 async fn send_file_with_custom_opts<T: ObjectSource>(
650 &self,
651 source: T,
652 chunk_size: usize,
653 transfer_type: TransferType,
654 ) -> Result<(), NetworkError> {
655 let chunk_size = if chunk_size == 0 {
656 None
657 } else {
658 Some(chunk_size)
659 };
660 let session_cid = self.user().get_session_cid();
661 let user = *self.user();
662 let remote = self.remote();
663
664 let mut stream = remote
665 .send_callback_subscription(NodeRequest::SendObject(SendObject {
666 source: Box::new(source),
667 chunk_size,
668 session_cid,
669 v_conn_type: user,
670 transfer_type,
671 }))
672 .await?;
673
674 while let Some(event) = stream.next().await {
675 match event.into_result()? {
676 NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
677 return handle.transfer_file().await.map_err(|err| {
678 citadel_io::error!(
679 citadel_io::ErrorCode::RemoteFileTransferFailed,
680 err.into_string()
681 )
682 });
683 }
684
685 NodeResult::PeerEvent(PeerEvent {
686 event: PeerSignal::SignalReceived { .. },
687 ..
688 }) => {}
689
690 res => {
691 log::warn!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {res:?}")
692 }
693 }
694 }
695
696 Err(citadel_io::error!(
697 citadel_io::ErrorCode::RemoteFileTransferStreamDied
698 ))
699 }
700
701 async fn send_file<T: ObjectSource>(&self, source: T) -> Result<(), NetworkError> {
703 self.send_file_with_custom_opts(source, 0, TransferType::FileTransfer)
704 .await
705 }
706
707 async fn remote_encrypted_virtual_filesystem_push_custom_chunking<
710 T: ObjectSource,
711 P: Into<PathBuf> + Send,
712 >(
713 &self,
714 source: T,
715 virtual_directory: P,
716 chunk_size: usize,
717 security_level: SecurityLevel,
718 ) -> Result<(), NetworkError> {
719 self.can_use_revfs()?;
720 let mut virtual_path = virtual_directory.into();
721 virtual_path = prepare_virtual_path(virtual_path);
722 validate_virtual_path(&virtual_path).map_err(|err| {
723 citadel_io::error!(
724 citadel_io::ErrorCode::RemoteRevfsInvalidVirtualPath,
725 err.into_string()
726 )
727 })?;
728 let tx_type = TransferType::RemoteEncryptedVirtualFilesystem {
729 virtual_path,
730 security_level,
731 };
732 self.send_file_with_custom_opts(source, chunk_size, tx_type)
733 .await
734 }
735
736 async fn remote_encrypted_virtual_filesystem_push<T: ObjectSource, P: Into<PathBuf> + Send>(
739 &self,
740 source: T,
741 virtual_directory: P,
742 security_level: SecurityLevel,
743 ) -> Result<(), NetworkError> {
744 self.remote_encrypted_virtual_filesystem_push_custom_chunking(
745 source,
746 virtual_directory,
747 0,
748 security_level,
749 )
750 .await
751 }
752
753 async fn remote_encrypted_virtual_filesystem_pull<P: Into<PathBuf> + Send>(
756 &self,
757 virtual_directory: P,
758 transfer_security_level: SecurityLevel,
759 delete_on_pull: bool,
760 ) -> Result<PathBuf, NetworkError> {
761 self.can_use_revfs()?;
762 let request = NodeRequest::PullObject(PullObject {
763 v_conn: *self.user(),
764 virtual_dir: virtual_directory.into(),
765 delete_on_pull,
766 transfer_security_level,
767 });
768
769 let mut stream = self.remote().send_callback_subscription(request).await?;
770
771 while let Some(event) = stream.next().await {
772 match event.into_result()? {
773 NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
774 return handle.receive_file().await.map_err(|err| {
775 citadel_io::error!(
776 citadel_io::ErrorCode::RemoteFileTransferFailed,
777 err.into_string()
778 )
779 });
780 }
781
782 NodeResult::PeerEvent(PeerEvent {
783 event: PeerSignal::SignalReceived { .. },
784 ..
785 }) => {}
786
787 res => {
788 log::error!(target: "citadel", "Invalid NodeResult for REVFS FileTransfer request received: {res:?}");
789 return Err(citadel_io::error!(
790 citadel_io::ErrorCode::RemoteRevfsInvalidResponse
791 ));
792 }
793 }
794 }
795
796 Err(citadel_io::error!(
797 citadel_io::ErrorCode::RemoteRevfsFileTransferStreamDied
798 ))
799 }
800
801 async fn remote_encrypted_virtual_filesystem_delete<P: Into<PathBuf> + Send>(
805 &self,
806 virtual_directory: P,
807 ) -> Result<(), NetworkError> {
808 self.can_use_revfs()?;
809 let request = NodeRequest::DeleteObject(DeleteObject {
810 v_conn: *self.user(),
811 virtual_dir: virtual_directory.into(),
812 security_level: Default::default(),
813 });
814
815 let mut stream = self.remote().send_callback_subscription(request).await?;
816 while let Some(event) = stream.next().await {
817 match event.into_result()? {
818 NodeResult::ReVFS(result) => {
819 return if let Some(error) = result.error_message {
820 Err(citadel_io::error!(
821 citadel_io::ErrorCode::RemoteFileTransferFailed,
822 error
823 ))
824 } else {
825 Ok(())
826 }
827 }
828
829 evt => {
830 log::error!(target: "citadel", "Invalid NodeResult for REVFS Delete request received: {evt:?}");
831 }
832 }
833 }
834
835 Err(citadel_io::error!(
836 citadel_io::ErrorCode::RemoteRevfsDeleteStreamDied
837 ))
838 }
839
840 async fn connect_to_peer_custom(
842 &self,
843 session_security_settings: SessionSecuritySettings,
844 udp_mode: UdpMode,
845 peer_session_password: Option<PreSharedKey>,
846 ) -> Result<PeerConnectSuccess<R>, NetworkError> {
847 use std::time::Duration;
848
849 const P2P_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
854
855 let session_cid = self.user().get_session_cid();
856 let peer_target = self.try_as_peer_connection().await?;
857
858 let mut stream = self
859 .remote()
860 .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
861 session_cid,
862 command: PeerSignal::PostConnect {
863 peer_conn_type: peer_target,
864 ticket_opt: None,
865 invitee_response: None,
866 session_security_settings,
867 udp_mode,
868 session_password: peer_session_password,
869 },
870 }))
871 .await?;
872
873 let connect_task = async {
874 while let Some(status) = stream.next().await {
875 match status.into_result()? {
876 NodeResult::PeerChannelCreated(PeerChannelCreated {
877 ticket: _,
878 channel,
879 udp_rx_opt,
880 }) => {
881 let username = self.target_username().map(ToString::to_string);
882 let remote = PeerRemote {
883 inner: self.remote().clone(),
884 peer: peer_target.as_virtual_connection(),
885 username,
886 session_security_settings,
887 };
888
889 return Ok(PeerConnectSuccess {
890 remote,
891 channel: *channel,
892 udp_channel_rx: udp_rx_opt,
893 incoming_object_transfer_handles: None,
894 });
895 }
896
897 NodeResult::PeerEvent(PeerEvent {
898 event:
899 PeerSignal::PostConnect {
900 invitee_response, ..
901 },
902 ..
903 }) => match invitee_response {
904 Some(PeerResponse::Timeout) => {
905 return Err(citadel_io::error!(
906 citadel_io::ErrorCode::RemotePeerNoResponse
907 ))
908 }
909 Some(PeerResponse::Decline) => {
910 return Err(citadel_io::error!(
911 citadel_io::ErrorCode::RemotePeerDeclined
912 ))
913 }
914 _ => {}
915 },
916
917 _ => {}
918 }
919 }
920
921 Err(citadel_io::error!(
922 citadel_io::ErrorCode::RemoteKernelStreamDied,
923 "connect_to_peer_custom"
924 ))
925 };
926
927 match citadel_io::time::timeout(P2P_CONNECT_TIMEOUT, connect_task).await {
928 Ok(result) => result,
929 Err(_elapsed) => Err(citadel_io::error!(
930 citadel_io::ErrorCode::RemoteP2pConnectTimeout,
931 P2P_CONNECT_TIMEOUT.as_secs()
932 )),
933 }
934 }
935
936 async fn connect_to_peer(&self) -> Result<PeerConnectSuccess<R>, NetworkError> {
938 self.connect_to_peer_custom(Default::default(), Default::default(), Default::default())
939 .await
940 }
941
942 async fn register_to_peer(&self) -> Result<PeerRegisterStatus, NetworkError> {
944 let session_cid = self.user().get_session_cid();
945 let peer_target = self.try_as_peer_connection().await?;
946 let local_username = self
948 .remote()
949 .account_manager()
950 .get_username_by_cid(session_cid)
951 .await?
952 .ok_or_else(|| citadel_io::error!(citadel_io::ErrorCode::RemoteLocalUsernameMissing))?;
953 let peer_username_opt = self.target_username().map(ToString::to_string);
954
955 let mut stream = self
956 .remote()
957 .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
958 session_cid,
959 command: PeerSignal::PostRegister {
960 peer_conn_type: peer_target,
961 inviter_username: local_username,
962 invitee_username: peer_username_opt,
963 ticket_opt: None,
964 invitee_response: None,
965 },
966 }))
967 .await?;
968
969 while let Some(status) = stream.next().await {
970 if let NodeResult::PeerEvent(PeerEvent {
971 event:
972 PeerSignal::PostRegister {
973 peer_conn_type: _,
974 inviter_username: _,
975 invitee_username: _,
976 ticket_opt: _,
977 invitee_response: Some(resp),
978 },
979 ticket: _,
980 ..
981 }) = status.into_result()?
982 {
983 match resp {
984 PeerResponse::Accept(..) => return Ok(PeerRegisterStatus::Accepted),
985 PeerResponse::Decline => return Ok(PeerRegisterStatus::Declined),
986 PeerResponse::Timeout => return Ok(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) }),
987 _ => {}
988 }
989 }
990 }
991
992 Err(citadel_io::error!(
993 citadel_io::ErrorCode::RemoteKernelStreamDied,
994 format!("register_to_peer: {:?}", stream.callback_key())
995 ))
996 }
997
998 async fn deregister(&self) -> Result<(), NetworkError> {
1002 if let Ok(peer_conn) = self.try_as_peer_connection().await {
1003 let peer_request = PeerSignal::Deregister {
1004 peer_conn_type: peer_conn,
1005 };
1006 let session_cid = self.user().get_session_cid();
1007 let request = NodeRequest::PeerCommand(PeerCommand {
1008 session_cid,
1009 command: peer_request,
1010 });
1011
1012 let mut subscription = self.remote().send_callback_subscription(request).await?;
1013 while let Some(result) = subscription.next().await {
1014 if let NodeResult::PeerEvent(PeerEvent {
1015 event: PeerSignal::DeregistrationSuccess { .. },
1016 ticket: _,
1017 ..
1018 }) = result.into_result()?
1019 {
1020 return Ok(());
1021 }
1022 }
1023 } else {
1024 let cid = self.user().get_session_cid();
1026 let request = NodeRequest::DeregisterFromHypernode(DeregisterFromHypernode {
1027 session_cid: cid,
1028 v_conn_type: *self.user(),
1029 });
1030 let mut subscription = self.remote().send_callback_subscription(request).await?;
1031 while let Some(result) = subscription.next().await {
1032 match result.into_result()? {
1033 NodeResult::DeRegistration(DeRegistration {
1034 session_cid: _,
1035 ticket_opt: _,
1036 success: true,
1037 }) => return Ok(()),
1038 NodeResult::DeRegistration(DeRegistration {
1039 session_cid: _,
1040 ticket_opt: _,
1041 success: false,
1042 }) => {
1043 return Err(citadel_io::error!(
1044 citadel_io::ErrorCode::RemoteDeregisterFailed
1045 ))
1046 }
1047
1048 _ => {}
1049 }
1050 }
1051 }
1052
1053 Err(citadel_io::error!(
1054 citadel_io::ErrorCode::RemoteDeregisterEndedUnexpectedly
1055 ))
1056 }
1057
1058 async fn disconnect(&self) -> Result<(), NetworkError> {
1059 if let Ok(peer_conn) = self.try_as_peer_connection().await {
1060 if let PeerConnectionType::LocalGroupPeer {
1061 session_cid,
1062 peer_cid: _,
1063 } = peer_conn
1064 {
1065 let request = NodeRequest::PeerCommand(PeerCommand {
1066 session_cid,
1067 command: PeerSignal::Disconnect {
1068 peer_conn_type: peer_conn,
1069 disconnect_response: None,
1070 disconnect_token: None,
1071 },
1072 });
1073
1074 let mut subscription = self.remote().send_callback_subscription(request).await?;
1075
1076 while let Some(event) = subscription.next().await {
1077 if let NodeResult::PeerEvent(PeerEvent {
1078 event:
1079 PeerSignal::Disconnect {
1080 peer_conn_type: _,
1081 disconnect_response: Some(_),
1082 ..
1083 },
1084 ticket: _,
1085 ..
1086 }) = event.into_result()?
1087 {
1088 return Ok(());
1089 }
1090 }
1091
1092 Err(citadel_io::error!(
1093 citadel_io::ErrorCode::RemoteDisconnectEventMissing
1094 ))
1095 } else {
1096 Err(citadel_io::error!(
1097 citadel_io::ErrorCode::RemoteExternalGroupPeerUnsupported
1098 ))
1099 }
1100 } else {
1101 let cid = self.user().get_session_cid();
1103 let request =
1104 NodeRequest::DisconnectFromHypernode(DisconnectFromHypernode { session_cid: cid });
1105
1106 let mut subscription = self.remote().send_callback_subscription(request).await?;
1107 while let Some(event) = subscription.next().await {
1108 if let NodeResult::Disconnect(Disconnect {
1109 success, message, ..
1110 }) = event.into_result()?
1111 {
1112 return if success {
1113 Ok(())
1114 } else {
1115 Err(citadel_io::error!(
1116 citadel_io::ErrorCode::RemoteDisconnected,
1117 message
1118 ))
1119 };
1120 }
1121 }
1122
1123 Err(citadel_io::error!(
1124 citadel_io::ErrorCode::RemoteDisconnectEventMissing
1125 ))
1126 }
1127 }
1128
1129 async fn create_group(
1130 &self,
1131 initial_users_to_invite: Option<Vec<UserIdentifier>>,
1132 ) -> Result<GroupChannel, NetworkError> {
1133 self.create_group_with_options(initial_users_to_invite, MessageGroupOptions::default())
1134 .await
1135 }
1136
1137 async fn create_group_with_options(
1142 &self,
1143 initial_users_to_invite: Option<Vec<UserIdentifier>>,
1144 options: MessageGroupOptions,
1145 ) -> Result<GroupChannel, NetworkError> {
1146 let session_cid = self.user().get_session_cid();
1147
1148 let mut initial_users = vec![];
1149 if let Some(initial_users_to_invite) = initial_users_to_invite {
1152 for user in initial_users_to_invite {
1153 initial_users.push(
1154 self.remote()
1155 .account_manager()
1156 .find_target_information(session_cid, user.clone())
1157 .await?
1158 .ok_or_else(|| {
1159 citadel_io::error!(
1160 citadel_io::ErrorCode::RemoteGroupAccountNotFound,
1161 citadel_io::Dbg(user),
1162 citadel_io::Dbg(session_cid)
1163 )
1164 })
1165 .map(|r| r.1.cid)?,
1166 )
1167 }
1168 }
1169
1170 let group_request = GroupBroadcast::Create {
1171 initial_invitees: initial_users,
1172 options,
1173 };
1174 let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1175 session_cid,
1176 command: group_request,
1177 });
1178 let mut subscription = self.remote().send_callback_subscription(request).await?;
1179 while let Some(evt) = subscription.next().await {
1180 if let NodeResult::GroupChannelCreated(GroupChannelCreated {
1181 ticket: _,
1182 channel,
1183 session_cid: _,
1184 }) = evt
1185 {
1186 return Ok(channel);
1187 }
1188 }
1189
1190 Err(citadel_io::error!(
1191 citadel_io::ErrorCode::RemoteCreateGroupEndedUnexpectedly
1192 ))
1193 }
1194
1195 async fn list_owned_groups(&self) -> Result<Vec<MessageGroupKey>, NetworkError> {
1197 let session_cid = self.user().get_session_cid();
1198 let cid_to_check_for = match self.try_as_peer_connection().await {
1199 Ok(res) => res.get_original_target_cid(),
1200 _ => session_cid,
1201 };
1202 let group_request = GroupBroadcast::ListGroupsFor {
1203 cid: cid_to_check_for,
1204 };
1205 let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1206 session_cid,
1207 command: group_request,
1208 });
1209
1210 let mut subscription = self.remote().send_callback_subscription(request).await?;
1211
1212 while let Some(evt) = subscription.next().await {
1213 if let NodeResult::GroupEvent(GroupEvent {
1214 session_cid: _,
1215 ticket: _,
1216 event: GroupBroadcast::ListResponse { groups },
1217 }) = evt.into_result()?
1218 {
1219 return Ok(groups);
1220 }
1221 }
1222
1223 Err(citadel_io::error!(
1224 citadel_io::ErrorCode::RemoteListGroupsEndedUnexpectedly
1225 ))
1226 }
1227
1228 async fn list_sessions(&self) -> Result<ActiveSessions, NetworkError> {
1232 let request = NodeRequest::GetActiveSessions;
1233 let mut subscription = self.remote().send_callback_subscription(request).await?;
1234
1235 if let Some(NodeResult::SessionList(result)) = subscription.next().await {
1236 return Ok(result.sessions);
1237 }
1238
1239 Err(citadel_io::error!(
1240 citadel_io::ErrorCode::RemoteListSessionsEndedUnexpectedly
1241 ))
1242 }
1243
1244 async fn rekey(&self) -> Result<Option<u32>, NetworkError> {
1248 let request = NodeRequest::ReKey(ReKey {
1249 v_conn_type: *self.user(),
1250 });
1251 let mut subscription = self.remote().send_callback_subscription(request).await?;
1252
1253 while let Some(evt) = subscription.next().await {
1254 if let NodeResult::ReKeyResult(result) = evt {
1255 return match result.status {
1256 ReKeyReturnType::Success { version } => Ok(Some(version)),
1257 ReKeyReturnType::AlreadyInProgress => Ok(None),
1258 ReKeyReturnType::Failure { err } => Err(citadel_io::error!(
1259 citadel_io::ErrorCode::RemoteRekeyFailed,
1260 err
1261 )),
1262 };
1263 }
1264 }
1265
1266 Err(citadel_io::error!(
1267 citadel_io::ErrorCode::RemoteRekeyEndedUnexpectedly
1268 ))
1269 }
1270
1271 async fn is_peer_registered(&self) -> Result<bool, NetworkError> {
1273 let target = self.try_as_peer_connection().await?;
1274 if let PeerConnectionType::LocalGroupPeer {
1275 session_cid: local_cid,
1276 peer_cid,
1277 } = target
1278 {
1279 let peers = self.remote().get_local_group_peers(local_cid, None).await?;
1280 citadel_logging::info!(target: "citadel", "Checking to see if {target} is registered in {peers:?}");
1281 Ok(peers.iter().any(|p| p.cid == peer_cid))
1282 } else {
1283 Err(citadel_io::error!(
1284 citadel_io::ErrorCode::RemoteExternalGroupPeerUnsupportedYet
1285 ))
1286 }
1287 }
1288
1289 #[doc(hidden)]
1290 async fn try_as_peer_connection(&self) -> Result<PeerConnectionType, NetworkError> {
1291 let verified_return = |user: &VirtualTargetType| {
1292 user.try_as_peer_connection().ok_or(citadel_io::error!(
1293 citadel_io::ErrorCode::RemoteTargetNotPeer
1294 ))
1295 };
1296
1297 if self.user().get_target_cid() == 0 {
1298 let peer_username = self.target_username().ok_or_else(|| {
1301 citadel_io::error!(citadel_io::ErrorCode::RemoteTargetCidZeroNoUsername)
1302 })?;
1303 let session_cid = self.user().get_session_cid();
1304 let expected_peer_cid = self
1305 .remote()
1306 .account_manager()
1307 .get_persistence_handler()
1308 .get_cid_by_username(peer_username);
1309 let peer_cid = self
1312 .remote()
1313 .account_manager()
1314 .find_target_information(session_cid, peer_username)
1315 .await?
1316 .map(|r| r.1.cid)
1317 .unwrap_or(expected_peer_cid);
1318
1319 let mut user = *self.user();
1320 user.set_target_cid(peer_cid);
1321 verified_return(&user)
1322 } else {
1323 verified_return(self.user())
1324 }
1325 }
1326
1327 #[doc(hidden)]
1328 fn can_use_revfs(&self) -> Result<(), NetworkError> {
1329 if let Some(sess) = self.session_security_settings() {
1330 if sess.crypto_params.kem_algorithm == KemAlgorithm::MlKem {
1331 Ok(())
1332 } else {
1333 Err(citadel_io::error!(
1334 citadel_io::ErrorCode::RemoteRevfsRequiresKyber
1335 ))
1336 }
1337 } else {
1338 Err(citadel_io::error!(
1339 citadel_io::ErrorCode::RemoteRevfsUnsupportedRemote
1340 ))
1341 }
1342 }
1343}
1344
1345impl<T: TargetLockedRemote<R>, R: Ratchet> ProtocolRemoteTargetExt<R> for T {}
1346
1347pub mod results {
1348 use crate::prefabs::client::peer_connection::FileTransferHandleRx;
1349 use crate::prelude::{PeerChannel, UdpChannel};
1350 use crate::remote_ext::remote_specialization::PeerRemote;
1351 use citadel_io::tokio::sync::oneshot::Receiver;
1352 use citadel_proto::prelude::*;
1353 use std::fmt::Debug;
1354
1355 pub struct PeerConnectSuccess<R: Ratchet> {
1356 pub channel: PeerChannel<R>,
1357 pub udp_channel_rx: Option<Receiver<UdpChannel<R>>>,
1358 pub remote: PeerRemote<R>,
1359 pub(crate) incoming_object_transfer_handles: Option<FileTransferHandleRx>,
1362 }
1363
1364 impl<R: Ratchet> Debug for PeerConnectSuccess<R> {
1365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1366 f.debug_struct("PeerConnectSuccess")
1367 .field("channel", &self.channel)
1368 .field("udp_channel_rx", &self.udp_channel_rx)
1369 .finish()
1370 }
1371 }
1372
1373 impl<R: Ratchet> PeerConnectSuccess<R> {
1374 pub fn get_incoming_file_transfer_handle(
1376 &mut self,
1377 ) -> Result<FileTransferHandleRx, NetworkError> {
1378 self.incoming_object_transfer_handles
1379 .take()
1380 .ok_or(citadel_io::error!(
1381 citadel_io::ErrorCode::RemoteFunctionAlreadyCalled
1382 ))
1383 }
1384 }
1385
1386 pub enum PeerRegisterStatus {
1387 Accepted,
1388 Declined,
1389 Failed { reason: Option<String> },
1390 }
1391
1392 #[derive(Clone, Debug)]
1393 pub struct LocalGroupPeer {
1394 pub cid: u64,
1395 pub is_online: bool,
1396 }
1397
1398 #[derive(Clone, Debug)]
1399 pub struct LocalGroupPeerFullInfo {
1400 pub cid: u64,
1401 pub username: Option<String>,
1402 pub full_name: Option<String>,
1403 pub is_online: bool,
1404 }
1405}
1406
1407pub mod remote_specialization {
1408 use crate::prelude::*;
1409 use std::ops::{Deref, DerefMut};
1410
1411 #[derive(Debug, Clone)]
1412 pub struct PeerRemote<R: Ratchet> {
1413 pub(crate) inner: NodeRemote<R>,
1414 pub(crate) peer: VirtualTargetType,
1415 pub(crate) username: Option<String>,
1416 pub(crate) session_security_settings: SessionSecuritySettings,
1417 }
1418
1419 impl<R: Ratchet> Deref for PeerRemote<R> {
1420 type Target = NodeRemote<R>;
1421 fn deref(&self) -> &Self::Target {
1422 &self.inner
1423 }
1424 }
1425
1426 impl<R: Ratchet> DerefMut for PeerRemote<R> {
1427 fn deref_mut(&mut self) -> &mut Self::Target {
1428 &mut self.inner
1429 }
1430 }
1431
1432 impl<R: Ratchet> TargetLockedRemote<R> for PeerRemote<R> {
1433 fn user(&self) -> &VirtualTargetType {
1434 &self.peer
1435 }
1436 fn remote(&self) -> &NodeRemote<R> {
1437 &self.inner
1438 }
1439 fn target_username(&self) -> Option<&str> {
1440 self.username.as_deref()
1441 }
1442 fn user_mut(&mut self) -> &mut VirtualTargetType {
1443 &mut self.peer
1444 }
1445
1446 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
1447 Some(&self.session_security_settings)
1448 }
1449 }
1450}
1451
1452#[cfg(all(test, not(target_family = "wasm")))]
1453mod tests {
1454 use crate::prefabs::client::single_connection::SingleClientServerConnectionKernel;
1455 use crate::prefabs::client::DefaultServerConnectionSettingsBuilder;
1456 use crate::prelude::*;
1457 use citadel_io::tokio;
1458 use rstest::rstest;
1459 use std::net::SocketAddr;
1460 use std::sync::atomic::{AtomicBool, Ordering};
1461 use std::sync::Arc;
1462 use uuid::Uuid;
1463
1464 pub struct ReceiverFileTransferKernel<R: Ratchet>(
1465 pub Option<NodeRemote<R>>,
1466 pub Arc<AtomicBool>,
1467 );
1468
1469 #[async_trait]
1470 impl<R: Ratchet> NetKernel<R> for ReceiverFileTransferKernel<R> {
1471 fn load_remote(&mut self, node_remote: NodeRemote<R>) -> Result<(), NetworkError> {
1472 self.0 = Some(node_remote);
1473 Ok(())
1474 }
1475
1476 async fn on_start(&self) -> Result<(), NetworkError> {
1477 Ok(())
1478 }
1479
1480 async fn on_node_event_received(&self, message: NodeResult<R>) -> Result<(), NetworkError> {
1481 log::trace!(target: "citadel", "SERVER received {:?}", message);
1482 if let NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) =
1483 message.into_result()?
1484 {
1485 let mut path = None;
1486 handle
1488 .accept()
1489 .map_err(|err| NetworkError::msg(err.into_string()))?;
1490
1491 use citadel_types::proto::ObjectTransferStatus;
1492 use futures::StreamExt;
1493 while let Some(status) = handle.next().await {
1494 match status {
1495 ObjectTransferStatus::ReceptionComplete => {
1496 log::trace!(target: "citadel", "Server has finished receiving the file!");
1497 let cmp = include_bytes!("../../resources/TheBridge.pdf");
1498 let streamed_data = citadel_io::tokio::fs::read(path.clone().unwrap())
1499 .await
1500 .unwrap();
1501 assert_eq!(
1502 cmp,
1503 streamed_data.as_slice(),
1504 "Original data and streamed data does not match"
1505 );
1506
1507 self.1.store(true, Ordering::Relaxed);
1508 self.0.clone().unwrap().shutdown().await?;
1509 }
1510
1511 ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => {
1512 path = Some(file_path);
1513 assert_eq!(vfm.name, "TheBridge.pdf")
1514 }
1515
1516 _ => {}
1517 }
1518 }
1519 }
1520
1521 Ok(())
1522 }
1523
1524 async fn on_stop(&mut self) -> Result<(), NetworkError> {
1525 Ok(())
1526 }
1527 }
1528
1529 pub fn server_info<'a, R: Ratchet>(
1530 switch: Arc<AtomicBool>,
1531 ) -> (NodeFuture<'a, ReceiverFileTransferKernel<R>>, SocketAddr) {
1532 crate::test_common::server_test_node(ReceiverFileTransferKernel(None, switch), |_| {})
1533 }
1534
1535 #[rstest]
1536 #[case(
1537 EncryptionAlgorithm::AES_GCM_256,
1538 KemAlgorithm::MlKem,
1539 SigAlgorithm::None
1540 )]
1541 #[case(
1542 EncryptionAlgorithm::MlKemHybrid,
1543 KemAlgorithm::MlKem,
1544 SigAlgorithm::MlDsa65
1545 )]
1546 #[timeout(std::time::Duration::from_secs(90))]
1547 #[tokio::test]
1548 async fn test_c2s_file_transfer(
1549 #[case] enx: EncryptionAlgorithm,
1550 #[case] kem: KemAlgorithm,
1551 #[case] sig: SigAlgorithm,
1552 ) {
1553 citadel_logging::setup_log();
1554 let client_success = &AtomicBool::new(false);
1555 let server_success = &Arc::new(AtomicBool::new(false));
1556 let (server, server_addr) = server_info::<StackedRatchet>(server_success.clone());
1557 let uuid = Uuid::new_v4();
1558
1559 let session_security_settings = SessionSecuritySettingsBuilder::default()
1560 .with_crypto_params(enx + kem + sig)
1561 .build()
1562 .unwrap();
1563
1564 let server_connection_settings =
1565 DefaultServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid)
1566 .with_session_security_settings(session_security_settings)
1567 .disable_udp()
1568 .build()
1569 .unwrap();
1570
1571 let client_kernel = SingleClientServerConnectionKernel::new(
1572 server_connection_settings,
1573 |connection| async move {
1574 log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***");
1575 connection
1576 .send_file_with_custom_opts(
1577 "../resources/TheBridge.pdf",
1578 32 * 1024,
1579 TransferType::FileTransfer,
1580 )
1581 .await
1582 .unwrap();
1583 log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***");
1584 client_success.store(true, Ordering::Relaxed);
1585 connection.shutdown_kernel().await
1586 },
1587 );
1588
1589 let client = DefaultNodeBuilder::default().build(client_kernel).unwrap();
1590
1591 let joined = futures::future::try_join(server, client);
1592
1593 let _ = joined.await.unwrap();
1594
1595 assert!(client_success.load(Ordering::Relaxed));
1596 assert!(server_success.load(Ordering::Relaxed));
1597 }
1598}