1use crate::prefabs::ClientServerRemote;
64use crate::prelude::results::{PeerConnectSuccess, PeerRegisterStatus};
65use crate::prelude::*;
66use crate::remote_ext::remote_specialization::PeerRemote;
67use crate::remote_ext::results::LocalGroupPeerFullInfo;
68use std::ops::{Deref, DerefMut};
69
70use futures::StreamExt;
71use std::path::PathBuf;
72use std::time::Duration;
73
74pub(crate) mod user_ids {
75 use crate::prelude::*;
76 use std::ops::Deref;
77
78 #[derive(Debug)]
79 pub struct SymmetricIdentifierHandleRef<'a, R: Ratchet> {
81 pub(crate) user: VirtualTargetType,
82 pub(crate) remote: &'a NodeRemote<R>,
83 pub(crate) target_username: Option<String>,
84 }
85
86 impl<R: Ratchet> SymmetricIdentifierHandleRef<'_, R> {
87 pub fn into_owned(self) -> SymmetricIdentifierHandle<R> {
88 SymmetricIdentifierHandle {
89 user: self.user,
90 remote: self.remote.clone(),
91 target_username: self.target_username,
92 }
93 }
94 }
95
96 #[derive(Clone, Debug)]
97 pub struct SymmetricIdentifierHandle<R: Ratchet> {
99 user: VirtualTargetType,
100 remote: NodeRemote<R>,
101 target_username: Option<String>,
102 }
103
104 pub trait TargetLockedRemote<R: Ratchet>: Send + Sync {
105 fn user(&self) -> &VirtualTargetType;
106 fn remote(&self) -> &NodeRemote<R>;
107 fn target_username(&self) -> Option<&str>;
108 fn user_mut(&mut self) -> &mut VirtualTargetType;
109 fn session_security_settings(&self) -> Option<&SessionSecuritySettings>;
110 }
111
112 impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandleRef<'_, R> {
113 fn user(&self) -> &VirtualTargetType {
114 &self.user
115 }
116 fn remote(&self) -> &NodeRemote<R> {
117 self.remote
118 }
119 fn target_username(&self) -> Option<&str> {
120 self.target_username.as_deref()
121 }
122 fn user_mut(&mut self) -> &mut VirtualTargetType {
123 &mut self.user
124 }
125
126 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
127 None
128 }
129 }
130
131 impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandle<R> {
132 fn user(&self) -> &VirtualTargetType {
133 &self.user
134 }
135 fn remote(&self) -> &NodeRemote<R> {
136 &self.remote
137 }
138 fn target_username(&self) -> Option<&str> {
139 self.target_username.as_deref()
140 }
141 fn user_mut(&mut self) -> &mut VirtualTargetType {
142 &mut self.user
143 }
144
145 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
146 None
147 }
148 }
149
150 impl<R: Ratchet> From<SymmetricIdentifierHandleRef<'_, R>> for SymmetricIdentifierHandle<R> {
151 fn from(this: SymmetricIdentifierHandleRef<'_, R>) -> Self {
152 this.into_owned()
153 }
154 }
155
156 impl<R: Ratchet> Deref for SymmetricIdentifierHandle<R> {
157 type Target = NodeRemote<R>;
158
159 fn deref(&self) -> &Self::Target {
160 &self.remote
161 }
162 }
163
164 impl<R: Ratchet> Deref for SymmetricIdentifierHandleRef<'_, R> {
165 type Target = NodeRemote<R>;
166
167 fn deref(&self) -> &Self::Target {
168 self.remote
169 }
170 }
171}
172
173pub struct CitadelClientServerConnection<R: Ratchet> {
175 pub(crate) channel: Option<PeerChannel<R>>,
177 pub remote: ClientServerRemote<R>,
178 pub udp_channel_rx: Option<citadel_io::tokio::sync::oneshot::Receiver<UdpChannel<R>>>,
180 pub services: ServicesObject,
182 pub cid: u64,
183 pub session_security_settings: SessionSecuritySettings,
184}
185
186impl<R: Ratchet> CitadelClientServerConnection<R> {
187 pub fn split(self) -> (PeerChannelSendHalf<R>, PeerChannelRecvHalf<R>) {
193 self.channel.expect("Channel already taken").split()
194 }
195
196 pub fn take_channel(&mut self) -> Option<PeerChannel<R>> {
197 self.channel.take()
198 }
199}
200
201impl<R: Ratchet> Deref for CitadelClientServerConnection<R> {
202 type Target = ClientServerRemote<R>;
203
204 fn deref(&self) -> &Self::Target {
205 &self.remote
206 }
207}
208
209impl<R: Ratchet> DerefMut for CitadelClientServerConnection<R> {
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 &mut self.remote
212 }
213}
214
215pub struct RegisterSuccess {
217 pub cid: u64,
218}
219
220#[async_trait]
221pub trait ProtocolRemoteExt<R: Ratchet>: Remote<R> {
223 async fn register<
226 T: std::net::ToSocketAddrs + Send,
227 P: Into<String> + Send,
228 V: Into<String> + Send,
229 K: Into<SecBuffer> + Send,
230 >(
231 &self,
232 addr: T,
233 full_name: P,
234 username: V,
235 proposed_password: K,
236 default_security_settings: SessionSecuritySettings,
237 server_password: Option<PreSharedKey>,
238 ) -> Result<RegisterSuccess, NetworkError> {
239 let creds =
240 ProposedCredentials::new_register(full_name, username, proposed_password.into())
241 .await?;
242 let register_request = NodeRequest::RegisterToHypernode(RegisterToHypernode {
243 remote_addr: addr
244 .to_socket_addrs()?
245 .next()
246 .ok_or(NetworkError::InternalError("Invalid socket addr"))?,
247 proposed_credentials: creds,
248 static_security_settings: default_security_settings,
249 session_password: server_password.unwrap_or_default(),
250 });
251
252 let mut subscription = self.send_callback_subscription(register_request).await?;
253 while let Some(status) = subscription.next().await {
254 match status.into_result()? {
255 NodeResult::RegisterOkay(RegisterOkay { cid, .. }) => {
256 return Ok(RegisterSuccess { cid });
257 }
258 NodeResult::RegisterFailure(err) => {
259 return Err(NetworkError::Generic(err.error_message));
260 }
261 NodeResult::Disconnect(err) => {
262 return Err(NetworkError::Generic(err.message));
263 }
264 evt => {
265 log::warn!(target: "citadel", "Invalid NodeResult for Register request received: {evt:?}");
266 }
267 }
268 }
269
270 Err(NetworkError::InternalError(
271 "Internal kernel stream died (register)",
272 ))
273 }
274
275 async fn register_with_defaults<
278 T: std::net::ToSocketAddrs + Send,
279 P: Into<String> + Send,
280 V: Into<String> + Send,
281 K: Into<SecBuffer> + Send,
282 >(
283 &self,
284 addr: T,
285 full_name: P,
286 username: V,
287 proposed_password: K,
288 ) -> Result<RegisterSuccess, NetworkError> {
289 self.register(
290 addr,
291 full_name,
292 username,
293 proposed_password,
294 Default::default(),
295 Default::default(),
296 )
297 .await
298 }
299
300 async fn connect(
303 &self,
304 auth: AuthenticationRequest,
305 connect_mode: ConnectMode,
306 udp_mode: UdpMode,
307 keep_alive_timeout: Option<Duration>,
308 session_security_settings: SessionSecuritySettings,
309 server_password: Option<PreSharedKey>,
310 ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
311 let connect_request = NodeRequest::ConnectToHypernode(ConnectToHypernode {
312 auth_request: auth,
313 connect_mode,
314 udp_mode,
315 keep_alive_timeout: keep_alive_timeout.map(|r| r.as_secs()),
316 session_security_settings,
317 session_password: server_password.unwrap_or_default(),
318 });
319
320 let mut subscription = self.send_callback_subscription(connect_request).await?;
321 let status = subscription
322 .next()
323 .await
324 .ok_or(NetworkError::InternalError(
325 "Internal kernel stream died (connect)",
326 ))?;
327
328 return match status.into_result()? {
329 NodeResult::ConnectSuccess(ConnectSuccess {
330 ticket: _,
331 session_cid: cid,
332 remote_addr: _,
333 is_personal: _,
334 v_conn_type,
335 services,
336 welcome_message: _,
337 channel,
338 udp_rx_opt: udp_channel_rx,
339 session_security_settings,
340 }) => Ok(CitadelClientServerConnection {
341 remote: ClientServerRemote::new(
342 v_conn_type,
343 self.remote_ref().clone(),
344 session_security_settings,
345 None,
346 None,
347 ),
348 channel: Some(*channel),
349 udp_channel_rx,
350 services,
351 cid,
352 session_security_settings,
353 }),
354 NodeResult::ConnectFail(ConnectFail {
355 ticket: _,
356 cid_opt: _,
357 error_message: err,
358 }) => Err(NetworkError::Generic(err)),
359 NodeResult::Disconnect(err) => {
360 return Err(NetworkError::Generic(err.message));
361 }
362 res => Err(NetworkError::msg(format!(
363 "[connect] An unexpected response occurred: {res:?}"
364 ))),
365 };
366 }
367
368 async fn connect_with_defaults(
371 &self,
372 auth: AuthenticationRequest,
373 ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
374 self.connect(
375 auth,
376 Default::default(),
377 Default::default(),
378 None,
379 Default::default(),
380 Default::default(),
381 )
382 .await
383 }
384
385 async fn find_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
398 &self,
399 local_user: T,
400 peer: P,
401 ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
402 let account_manager = self.account_manager();
403 account_manager
404 .find_target_information(local_user, peer)
405 .await?
406 .map(move |(cid, peer)| {
407 if peer.parent_icid != 0 {
408 SymmetricIdentifierHandleRef {
409 user: VirtualTargetType::ExternalGroupPeer {
410 session_cid: cid,
411 interserver_cid: peer.parent_icid,
412 peer_cid: peer.cid,
413 },
414 remote: self.remote_ref(),
415 target_username: None,
416 }
417 } else {
418 SymmetricIdentifierHandleRef {
419 user: VirtualTargetType::LocalGroupPeer {
420 session_cid: cid,
421 peer_cid: peer.cid,
422 },
423 remote: self.remote_ref(),
424 target_username: None,
425 }
426 }
427 })
428 .ok_or_else(|| NetworkError::msg("Target pair not found"))
429 }
430
431 async fn propose_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
434 &self,
435 local_user: T,
436 peer: P,
437 ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
438 let local_cid = self.get_session_cid(local_user).await?;
439 match peer.into() {
440 UserIdentifier::ID(peer_cid) => Ok(SymmetricIdentifierHandleRef {
441 user: VirtualTargetType::LocalGroupPeer {
442 session_cid: local_cid,
443 peer_cid,
444 },
445 remote: self.remote_ref(),
446 target_username: None,
447 }),
448 UserIdentifier::Username(uname) => {
449 let peer_cid = self
450 .remote_ref()
451 .account_manager()
452 .find_target_information(local_cid, uname.clone())
453 .await?
454 .map(|r| r.1.cid)
455 .unwrap_or(0);
456 Ok(SymmetricIdentifierHandleRef {
457 user: VirtualTargetType::LocalGroupPeer {
458 session_cid: local_cid,
459 peer_cid,
460 },
461 remote: self.remote_ref(),
462 target_username: Some(uname),
463 })
464 }
465 }
466 }
467
468 async fn get_local_group_peers<T: Into<UserIdentifier> + Send>(
471 &self,
472 local_user: T,
473 limit: Option<usize>,
474 ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
475 let local_cid = self.get_session_cid(local_user).await?;
476 let command = NodeRequest::PeerCommand(PeerCommand {
477 session_cid: local_cid,
478 command: PeerSignal::GetRegisteredPeers {
479 peer_conn_type: ClientConnectionType::Server {
480 session_cid: local_cid,
481 },
482 response: None,
483 limit: limit.map(|r| r as i32),
484 },
485 });
486
487 let mut stream = self.send_callback_subscription(command).await?;
488
489 while let Some(status) = stream.next().await {
490 if let NodeResult::PeerEvent(PeerEvent {
491 event:
492 PeerSignal::GetRegisteredPeers {
493 peer_conn_type: _,
494 response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
495 limit: _,
496 },
497 ticket: _,
498 ..
499 }) = status.into_result()?
500 {
501 return Ok(peer_info
502 .into_iter()
503 .zip(is_onlines.into_iter())
504 .filter_map(|(peer_info, is_online)| {
505 peer_info.map(|info| LocalGroupPeerFullInfo {
506 cid: info.cid,
507 username: Some(info.username),
508 full_name: Some(info.full_name),
509 is_online,
510 })
511 })
512 .collect());
513 }
514 }
515
516 Err(NetworkError::InternalError(
517 "Internal kernel stream died (get_local_group_peers)",
518 ))
519 }
520
521 async fn get_local_group_mutual_peers<T: Into<UserIdentifier> + Send>(
523 &self,
524 local_user: T,
525 ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
526 let local_cid = self.get_session_cid(local_user).await?;
527 let command = NodeRequest::PeerCommand(PeerCommand {
528 session_cid: local_cid,
529 command: PeerSignal::GetMutuals {
530 v_conn_type: ClientConnectionType::Server {
531 session_cid: local_cid,
532 },
533 response: None,
534 },
535 });
536
537 let mut stream = self.send_callback_subscription(command).await?;
538
539 while let Some(status) = stream.next().await {
540 if let NodeResult::PeerEvent(PeerEvent {
541 event:
542 PeerSignal::GetMutuals {
543 v_conn_type: _,
544 response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
545 },
546 ticket: _,
547 ..
548 }) = status.into_result()?
549 {
550 return Ok(peer_info
551 .into_iter()
552 .zip(is_onlines.into_iter())
553 .filter_map(|(peer_info, is_online)| {
554 peer_info.map(|info| LocalGroupPeerFullInfo {
555 cid: info.cid,
556 username: Some(info.username),
557 full_name: Some(info.full_name),
558 is_online,
559 })
560 })
561 .collect());
562 }
563 }
564
565 Err(NetworkError::msg("Stream died"))
566 }
567
568 async fn sessions(&self) -> Result<ActiveSessions, NetworkError> {
571 match self
572 .send_callback_subscription(NodeRequest::GetActiveSessions)
573 .await
574 {
575 Ok(mut stream) => {
576 while let Some(result) = stream.next().await {
577 match result.into_result()? {
578 NodeResult::SessionList(res) => return Ok(res.sessions),
579 res => {
580 citadel_logging::warn!("Received unexpected result: {res:?}");
581 }
582 }
583 }
584
585 citadel_logging::warn!("Failed to receive response from SDK (stream died)");
586 return Err(NetworkError::InternalError(
587 "Internal kernel stream died (get_active_sessions)",
588 ));
589 }
590 Err(e) => {
591 citadel_logging::warn!(target: "citadel", "Failed to query SDK sessions: {e}");
592 }
593 }
594
595 Err(NetworkError::msg("Failed to query SDK sessions"))
596 }
597
598 #[doc(hidden)]
599 fn remote_ref(&self) -> &NodeRemote<R>;
600
601 #[doc(hidden)]
602 async fn get_session_cid<T: Into<UserIdentifier> + Send>(
603 &self,
604 local_user: T,
605 ) -> Result<u64, NetworkError> {
606 let account_manager = self.account_manager();
607 Ok(account_manager
608 .find_local_user_information(local_user)
609 .await?
610 .ok_or(NetworkError::InvalidRequest("User does not exist"))?)
611 }
612}
613
614impl<R: Ratchet> ProtocolRemoteExt<R> for NodeRemote<R> {
615 fn remote_ref(&self) -> &NodeRemote<R> {
616 self
617 }
618}
619
620impl<R: Ratchet> ProtocolRemoteExt<R> for ClientServerRemote<R> {
621 fn remote_ref(&self) -> &NodeRemote<R> {
622 &self.inner
623 }
624}
625
626#[async_trait]
627pub trait ProtocolRemoteTargetExt<R: Ratchet>: TargetLockedRemote<R> {
629 async fn send_file_with_custom_opts<T: ObjectSource>(
631 &self,
632 source: T,
633 chunk_size: usize,
634 transfer_type: TransferType,
635 ) -> Result<(), NetworkError> {
636 let chunk_size = if chunk_size == 0 {
637 None
638 } else {
639 Some(chunk_size)
640 };
641 let session_cid = self.user().get_session_cid();
642 let user = *self.user();
643 let remote = self.remote();
644
645 let mut stream = remote
646 .send_callback_subscription(NodeRequest::SendObject(SendObject {
647 source: Box::new(source),
648 chunk_size,
649 session_cid,
650 v_conn_type: user,
651 transfer_type,
652 }))
653 .await?;
654
655 while let Some(event) = stream.next().await {
656 match event.into_result()? {
657 NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
658 return handle
659 .transfer_file()
660 .await
661 .map_err(|err| NetworkError::Generic(err.into_string()));
662 }
663
664 NodeResult::PeerEvent(PeerEvent {
665 event: PeerSignal::SignalReceived { .. },
666 ..
667 }) => {}
668
669 res => {
670 log::warn!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {res:?}")
671 }
672 }
673 }
674
675 Err(NetworkError::InternalError("File transfer stream died"))
676 }
677
678 async fn send_file<T: ObjectSource>(&self, source: T) -> Result<(), NetworkError> {
680 self.send_file_with_custom_opts(source, 0, TransferType::FileTransfer)
681 .await
682 }
683
684 async fn remote_encrypted_virtual_filesystem_push_custom_chunking<
687 T: ObjectSource,
688 P: Into<PathBuf> + Send,
689 >(
690 &self,
691 source: T,
692 virtual_directory: P,
693 chunk_size: usize,
694 security_level: SecurityLevel,
695 ) -> Result<(), NetworkError> {
696 self.can_use_revfs()?;
697 let mut virtual_path = virtual_directory.into();
698 virtual_path = prepare_virtual_path(virtual_path);
699 validate_virtual_path(&virtual_path)
700 .map_err(|err| NetworkError::Generic(err.into_string()))?;
701 let tx_type = TransferType::RemoteEncryptedVirtualFilesystem {
702 virtual_path,
703 security_level,
704 };
705 self.send_file_with_custom_opts(source, chunk_size, tx_type)
706 .await
707 }
708
709 async fn remote_encrypted_virtual_filesystem_push<T: ObjectSource, P: Into<PathBuf> + Send>(
712 &self,
713 source: T,
714 virtual_directory: P,
715 security_level: SecurityLevel,
716 ) -> Result<(), NetworkError> {
717 self.remote_encrypted_virtual_filesystem_push_custom_chunking(
718 source,
719 virtual_directory,
720 0,
721 security_level,
722 )
723 .await
724 }
725
726 async fn remote_encrypted_virtual_filesystem_pull<P: Into<PathBuf> + Send>(
729 &self,
730 virtual_directory: P,
731 transfer_security_level: SecurityLevel,
732 delete_on_pull: bool,
733 ) -> Result<PathBuf, NetworkError> {
734 self.can_use_revfs()?;
735 let request = NodeRequest::PullObject(PullObject {
736 v_conn: *self.user(),
737 virtual_dir: virtual_directory.into(),
738 delete_on_pull,
739 transfer_security_level,
740 });
741
742 let mut stream = self.remote().send_callback_subscription(request).await?;
743
744 while let Some(event) = stream.next().await {
745 match event.into_result()? {
746 NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
747 return handle
748 .receive_file()
749 .await
750 .map_err(|err| NetworkError::Generic(err.into_string()));
751 }
752
753 NodeResult::PeerEvent(PeerEvent {
754 event: PeerSignal::SignalReceived { .. },
755 ..
756 }) => {}
757
758 res => {
759 log::error!(target: "citadel", "Invalid NodeResult for REVFS FileTransfer request received: {res:?}");
760 return Err(NetworkError::InternalError(
761 "Received invalid response from protocol",
762 ));
763 }
764 }
765 }
766
767 Err(NetworkError::InternalError(
768 "REVFS File transfer stream died",
769 ))
770 }
771
772 async fn remote_encrypted_virtual_filesystem_delete<P: Into<PathBuf> + Send>(
776 &self,
777 virtual_directory: P,
778 ) -> Result<(), NetworkError> {
779 self.can_use_revfs()?;
780 let request = NodeRequest::DeleteObject(DeleteObject {
781 v_conn: *self.user(),
782 virtual_dir: virtual_directory.into(),
783 security_level: Default::default(),
784 });
785
786 let mut stream = self.remote().send_callback_subscription(request).await?;
787 while let Some(event) = stream.next().await {
788 match event.into_result()? {
789 NodeResult::ReVFS(result) => {
790 return if let Some(error) = result.error_message {
791 Err(NetworkError::Generic(error))
792 } else {
793 Ok(())
794 }
795 }
796
797 evt => {
798 log::error!(target: "citadel", "Invalid NodeResult for REVFS Delete request received: {evt:?}");
799 }
800 }
801 }
802
803 Err(NetworkError::InternalError("REVFS Delete stream died"))
804 }
805
806 async fn connect_to_peer_custom(
808 &self,
809 session_security_settings: SessionSecuritySettings,
810 udp_mode: UdpMode,
811 peer_session_password: Option<PreSharedKey>,
812 ) -> Result<PeerConnectSuccess<R>, NetworkError> {
813 use std::time::Duration;
814
815 const P2P_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
820
821 let session_cid = self.user().get_session_cid();
822 let peer_target = self.try_as_peer_connection().await?;
823
824 let mut stream = self
825 .remote()
826 .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
827 session_cid,
828 command: PeerSignal::PostConnect {
829 peer_conn_type: peer_target,
830 ticket_opt: None,
831 invitee_response: None,
832 session_security_settings,
833 udp_mode,
834 session_password: peer_session_password,
835 },
836 }))
837 .await?;
838
839 let connect_task = async {
840 while let Some(status) = stream.next().await {
841 match status.into_result()? {
842 NodeResult::PeerChannelCreated(PeerChannelCreated {
843 ticket: _,
844 channel,
845 udp_rx_opt,
846 }) => {
847 let username = self.target_username().map(ToString::to_string);
848 let remote = PeerRemote {
849 inner: self.remote().clone(),
850 peer: peer_target.as_virtual_connection(),
851 username,
852 session_security_settings,
853 };
854
855 return Ok(PeerConnectSuccess {
856 remote,
857 channel: *channel,
858 udp_channel_rx: udp_rx_opt,
859 incoming_object_transfer_handles: None,
860 });
861 }
862
863 NodeResult::PeerEvent(PeerEvent {
864 event:
865 PeerSignal::PostConnect {
866 invitee_response, ..
867 },
868 ..
869 }) => match invitee_response {
870 Some(PeerResponse::Timeout) => {
871 return Err(NetworkError::msg("Peer did not respond in time"))
872 }
873 Some(PeerResponse::Decline) => {
874 return Err(NetworkError::msg("Peer declined to connect"))
875 }
876 _ => {}
877 },
878
879 _ => {}
880 }
881 }
882
883 Err(NetworkError::InternalError(
884 "Internal kernel stream died (connect_to_peer_custom)",
885 ))
886 };
887
888 match citadel_io::tokio::time::timeout(P2P_CONNECT_TIMEOUT, connect_task).await {
889 Ok(result) => result,
890 Err(_elapsed) => Err(NetworkError::msg(format!(
891 "P2P connection timed out after {}s waiting for PeerChannelCreated",
892 P2P_CONNECT_TIMEOUT.as_secs()
893 ))),
894 }
895 }
896
897 async fn connect_to_peer(&self) -> Result<PeerConnectSuccess<R>, NetworkError> {
899 self.connect_to_peer_custom(Default::default(), Default::default(), Default::default())
900 .await
901 }
902
903 async fn register_to_peer(&self) -> Result<PeerRegisterStatus, NetworkError> {
905 let session_cid = self.user().get_session_cid();
906 let peer_target = self.try_as_peer_connection().await?;
907 let local_username = self
909 .remote()
910 .account_manager()
911 .get_username_by_cid(session_cid)
912 .await?
913 .ok_or_else(|| NetworkError::msg("Unable to find username for local user"))?;
914 let peer_username_opt = self.target_username().map(ToString::to_string);
915
916 let mut stream = self
917 .remote()
918 .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
919 session_cid,
920 command: PeerSignal::PostRegister {
921 peer_conn_type: peer_target,
922 inviter_username: local_username,
923 invitee_username: peer_username_opt,
924 ticket_opt: None,
925 invitee_response: None,
926 },
927 }))
928 .await?;
929
930 while let Some(status) = stream.next().await {
931 if let NodeResult::PeerEvent(PeerEvent {
932 event:
933 PeerSignal::PostRegister {
934 peer_conn_type: _,
935 inviter_username: _,
936 invitee_username: _,
937 ticket_opt: _,
938 invitee_response: Some(resp),
939 },
940 ticket: _,
941 ..
942 }) = status.into_result()?
943 {
944 match resp {
945 PeerResponse::Accept(..) => return Ok(PeerRegisterStatus::Accepted),
946 PeerResponse::Decline => return Ok(PeerRegisterStatus::Declined),
947 PeerResponse::Timeout => return Ok(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) }),
948 _ => {}
949 }
950 }
951 }
952
953 Err(NetworkError::Generic(format!(
954 "Internal kernel stream died (register_to_peer): {:?}",
955 stream.callback_key()
956 )))
957 }
958
959 async fn deregister(&self) -> Result<(), NetworkError> {
963 if let Ok(peer_conn) = self.try_as_peer_connection().await {
964 let peer_request = PeerSignal::Deregister {
965 peer_conn_type: peer_conn,
966 };
967 let session_cid = self.user().get_session_cid();
968 let request = NodeRequest::PeerCommand(PeerCommand {
969 session_cid,
970 command: peer_request,
971 });
972
973 let mut subscription = self.remote().send_callback_subscription(request).await?;
974 while let Some(result) = subscription.next().await {
975 if let NodeResult::PeerEvent(PeerEvent {
976 event: PeerSignal::DeregistrationSuccess { .. },
977 ticket: _,
978 ..
979 }) = result.into_result()?
980 {
981 return Ok(());
982 }
983 }
984 } else {
985 let cid = self.user().get_session_cid();
987 let request = NodeRequest::DeregisterFromHypernode(DeregisterFromHypernode {
988 session_cid: cid,
989 v_conn_type: *self.user(),
990 });
991 let mut subscription = self.remote().send_callback_subscription(request).await?;
992 while let Some(result) = subscription.next().await {
993 match result.into_result()? {
994 NodeResult::DeRegistration(DeRegistration {
995 session_cid: _,
996 ticket_opt: _,
997 success: true,
998 }) => return Ok(()),
999 NodeResult::DeRegistration(DeRegistration {
1000 session_cid: _,
1001 ticket_opt: _,
1002 success: false,
1003 }) => {
1004 return Err(NetworkError::msg(
1005 "Unable to deregister: status=false".to_string(),
1006 ))
1007 }
1008
1009 _ => {}
1010 }
1011 }
1012 }
1013
1014 Err(NetworkError::InternalError("Deregister ended unexpectedly"))
1015 }
1016
1017 async fn disconnect(&self) -> Result<(), NetworkError> {
1018 if let Ok(peer_conn) = self.try_as_peer_connection().await {
1019 if let PeerConnectionType::LocalGroupPeer {
1020 session_cid,
1021 peer_cid: _,
1022 } = peer_conn
1023 {
1024 let request = NodeRequest::PeerCommand(PeerCommand {
1025 session_cid,
1026 command: PeerSignal::Disconnect {
1027 peer_conn_type: peer_conn,
1028 disconnect_response: None,
1029 disconnect_token: None,
1030 },
1031 });
1032
1033 let mut subscription = self.remote().send_callback_subscription(request).await?;
1034
1035 while let Some(event) = subscription.next().await {
1036 if let NodeResult::PeerEvent(PeerEvent {
1037 event:
1038 PeerSignal::Disconnect {
1039 peer_conn_type: _,
1040 disconnect_response: Some(_),
1041 ..
1042 },
1043 ticket: _,
1044 ..
1045 }) = event.into_result()?
1046 {
1047 return Ok(());
1048 }
1049 }
1050
1051 Err(NetworkError::InternalError(
1052 "Unable to receive valid disconnect event",
1053 ))
1054 } else {
1055 Err(NetworkError::msg(
1056 "External group peer functionality not enabled",
1057 ))
1058 }
1059 } else {
1060 let cid = self.user().get_session_cid();
1062 let request =
1063 NodeRequest::DisconnectFromHypernode(DisconnectFromHypernode { session_cid: cid });
1064
1065 let mut subscription = self.remote().send_callback_subscription(request).await?;
1066 while let Some(event) = subscription.next().await {
1067 if let NodeResult::Disconnect(Disconnect {
1068 success, message, ..
1069 }) = event.into_result()?
1070 {
1071 return if success {
1072 Ok(())
1073 } else {
1074 Err(NetworkError::msg(message))
1075 };
1076 }
1077 }
1078
1079 Err(NetworkError::InternalError(
1080 "Unable to receive valid disconnect event",
1081 ))
1082 }
1083 }
1084
1085 async fn create_group(
1086 &self,
1087 initial_users_to_invite: Option<Vec<UserIdentifier>>,
1088 ) -> Result<GroupChannel, NetworkError> {
1089 let session_cid = self.user().get_session_cid();
1090
1091 let mut initial_users = vec![];
1092 let options = MessageGroupOptions::default();
1094 if let Some(initial_users_to_invite) = initial_users_to_invite {
1098 for user in initial_users_to_invite {
1099 initial_users.push(
1100 self.remote()
1101 .account_manager()
1102 .find_target_information(session_cid, user.clone())
1103 .await
1104 .map_err(|err| NetworkError::msg(err.into_string()))?
1105 .ok_or_else(|| {
1106 NetworkError::msg(format!(
1107 "Account {user:?} not found for local user {session_cid:?}"
1108 ))
1109 })
1110 .map(|r| r.1.cid)?,
1111 )
1112 }
1113 }
1114
1115 let group_request = GroupBroadcast::Create {
1116 initial_invitees: initial_users,
1117 options,
1118 };
1119 let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1120 session_cid,
1121 command: group_request,
1122 });
1123 let mut subscription = self.remote().send_callback_subscription(request).await?;
1124 while let Some(evt) = subscription.next().await {
1125 if let NodeResult::GroupChannelCreated(GroupChannelCreated {
1126 ticket: _,
1127 channel,
1128 session_cid: _,
1129 }) = evt
1130 {
1131 return Ok(channel);
1132 }
1133 }
1134
1135 Err(NetworkError::InternalError(
1136 "Create_group ended unexpectedly",
1137 ))
1138 }
1139
1140 async fn list_owned_groups(&self) -> Result<Vec<MessageGroupKey>, NetworkError> {
1142 let session_cid = self.user().get_session_cid();
1143 let cid_to_check_for = match self.try_as_peer_connection().await {
1144 Ok(res) => res.get_original_target_cid(),
1145 _ => session_cid,
1146 };
1147 let group_request = GroupBroadcast::ListGroupsFor {
1148 cid: cid_to_check_for,
1149 };
1150 let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1151 session_cid,
1152 command: group_request,
1153 });
1154
1155 let mut subscription = self.remote().send_callback_subscription(request).await?;
1156
1157 while let Some(evt) = subscription.next().await {
1158 if let NodeResult::GroupEvent(GroupEvent {
1159 session_cid: _,
1160 ticket: _,
1161 event: GroupBroadcast::ListResponse { groups },
1162 }) = evt.into_result()?
1163 {
1164 return Ok(groups);
1165 }
1166 }
1167
1168 Err(NetworkError::InternalError(
1169 "List_members ended unexpectedly",
1170 ))
1171 }
1172
1173 async fn list_sessions(&self) -> Result<ActiveSessions, NetworkError> {
1177 let request = NodeRequest::GetActiveSessions;
1178 let mut subscription = self.remote().send_callback_subscription(request).await?;
1179
1180 if let Some(NodeResult::SessionList(result)) = subscription.next().await {
1181 return Ok(result.sessions);
1182 }
1183
1184 Err(NetworkError::InternalError(
1185 "List_sessions ended unexpectedly",
1186 ))
1187 }
1188
1189 async fn rekey(&self) -> Result<Option<u32>, NetworkError> {
1193 let request = NodeRequest::ReKey(ReKey {
1194 v_conn_type: *self.user(),
1195 });
1196 let mut subscription = self.remote().send_callback_subscription(request).await?;
1197
1198 while let Some(evt) = subscription.next().await {
1199 if let NodeResult::ReKeyResult(result) = evt {
1200 return match result.status {
1201 ReKeyReturnType::Success { version } => Ok(Some(version)),
1202 ReKeyReturnType::AlreadyInProgress => Ok(None),
1203 ReKeyReturnType::Failure { err } => {
1204 Err(NetworkError::Generic(format!("Rekey failed: {err}")))
1205 }
1206 };
1207 }
1208 }
1209
1210 Err(NetworkError::InternalError("Rekey ended unexpectedly"))
1211 }
1212
1213 async fn is_peer_registered(&self) -> Result<bool, NetworkError> {
1215 let target = self.try_as_peer_connection().await?;
1216 if let PeerConnectionType::LocalGroupPeer {
1217 session_cid: local_cid,
1218 peer_cid,
1219 } = target
1220 {
1221 let peers = self.remote().get_local_group_peers(local_cid, None).await?;
1222 citadel_logging::info!(target: "citadel", "Checking to see if {target} is registered in {peers:?}");
1223 Ok(peers.iter().any(|p| p.cid == peer_cid))
1224 } else {
1225 Err(NetworkError::Generic(
1226 "External group peers are not supported yet".to_string(),
1227 ))
1228 }
1229 }
1230
1231 #[doc(hidden)]
1232 async fn try_as_peer_connection(&self) -> Result<PeerConnectionType, NetworkError> {
1233 let verified_return = |user: &VirtualTargetType| {
1234 user.try_as_peer_connection()
1235 .ok_or(NetworkError::InvalidRequest("Target is not a peer"))
1236 };
1237
1238 if self.user().get_target_cid() == 0 {
1239 let peer_username = self
1242 .target_username()
1243 .ok_or_else(|| NetworkError::msg("target_cid=0, yet, no username was provided"))?;
1244 let session_cid = self.user().get_session_cid();
1245 let expected_peer_cid = self
1246 .remote()
1247 .account_manager()
1248 .get_persistence_handler()
1249 .get_cid_by_username(peer_username);
1250 let peer_cid = self
1253 .remote()
1254 .account_manager()
1255 .find_target_information(session_cid, peer_username)
1256 .await
1257 .map_err(|err| NetworkError::Generic(err.into_string()))?
1258 .map(|r| r.1.cid)
1259 .unwrap_or(expected_peer_cid);
1260
1261 let mut user = *self.user();
1262 user.set_target_cid(peer_cid);
1263 verified_return(&user)
1264 } else {
1265 verified_return(self.user())
1266 }
1267 }
1268
1269 #[doc(hidden)]
1270 fn can_use_revfs(&self) -> Result<(), NetworkError> {
1271 if let Some(sess) = self.session_security_settings() {
1272 if sess.crypto_params.kem_algorithm == KemAlgorithm::MlKem {
1273 Ok(())
1274 } else {
1275 Err(NetworkError::InvalidRequest(
1276 "RE-VFS can only be used with Kyber KEM",
1277 ))
1278 }
1279 } else {
1280 Err(NetworkError::InvalidRequest(
1281 "RE-VFS cannot be used with this remote type",
1282 ))
1283 }
1284 }
1285}
1286
1287impl<T: TargetLockedRemote<R>, R: Ratchet> ProtocolRemoteTargetExt<R> for T {}
1288
1289pub mod results {
1290 use crate::prefabs::client::peer_connection::FileTransferHandleRx;
1291 use crate::prelude::{PeerChannel, UdpChannel};
1292 use crate::remote_ext::remote_specialization::PeerRemote;
1293 use citadel_io::tokio::sync::oneshot::Receiver;
1294 use citadel_proto::prelude::*;
1295 use std::fmt::Debug;
1296
1297 pub struct PeerConnectSuccess<R: Ratchet> {
1298 pub channel: PeerChannel<R>,
1299 pub udp_channel_rx: Option<Receiver<UdpChannel<R>>>,
1300 pub remote: PeerRemote<R>,
1301 pub(crate) incoming_object_transfer_handles: Option<FileTransferHandleRx>,
1304 }
1305
1306 impl<R: Ratchet> Debug for PeerConnectSuccess<R> {
1307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1308 f.debug_struct("PeerConnectSuccess")
1309 .field("channel", &self.channel)
1310 .field("udp_channel_rx", &self.udp_channel_rx)
1311 .finish()
1312 }
1313 }
1314
1315 impl<R: Ratchet> PeerConnectSuccess<R> {
1316 pub fn get_incoming_file_transfer_handle(
1318 &mut self,
1319 ) -> Result<FileTransferHandleRx, NetworkError> {
1320 self.incoming_object_transfer_handles
1321 .take()
1322 .ok_or(NetworkError::InternalError(
1323 "This function has already been called",
1324 ))
1325 }
1326 }
1327
1328 pub enum PeerRegisterStatus {
1329 Accepted,
1330 Declined,
1331 Failed { reason: Option<String> },
1332 }
1333
1334 #[derive(Clone, Debug)]
1335 pub struct LocalGroupPeer {
1336 pub cid: u64,
1337 pub is_online: bool,
1338 }
1339
1340 #[derive(Clone, Debug)]
1341 pub struct LocalGroupPeerFullInfo {
1342 pub cid: u64,
1343 pub username: Option<String>,
1344 pub full_name: Option<String>,
1345 pub is_online: bool,
1346 }
1347}
1348
1349pub mod remote_specialization {
1350 use crate::prelude::*;
1351 use std::ops::{Deref, DerefMut};
1352
1353 #[derive(Debug, Clone)]
1354 pub struct PeerRemote<R: Ratchet> {
1355 pub(crate) inner: NodeRemote<R>,
1356 pub(crate) peer: VirtualTargetType,
1357 pub(crate) username: Option<String>,
1358 pub(crate) session_security_settings: SessionSecuritySettings,
1359 }
1360
1361 impl<R: Ratchet> Deref for PeerRemote<R> {
1362 type Target = NodeRemote<R>;
1363 fn deref(&self) -> &Self::Target {
1364 &self.inner
1365 }
1366 }
1367
1368 impl<R: Ratchet> DerefMut for PeerRemote<R> {
1369 fn deref_mut(&mut self) -> &mut Self::Target {
1370 &mut self.inner
1371 }
1372 }
1373
1374 impl<R: Ratchet> TargetLockedRemote<R> for PeerRemote<R> {
1375 fn user(&self) -> &VirtualTargetType {
1376 &self.peer
1377 }
1378 fn remote(&self) -> &NodeRemote<R> {
1379 &self.inner
1380 }
1381 fn target_username(&self) -> Option<&str> {
1382 self.username.as_deref()
1383 }
1384 fn user_mut(&mut self) -> &mut VirtualTargetType {
1385 &mut self.peer
1386 }
1387
1388 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
1389 Some(&self.session_security_settings)
1390 }
1391 }
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396 use crate::prefabs::client::single_connection::SingleClientServerConnectionKernel;
1397 use crate::prefabs::client::DefaultServerConnectionSettingsBuilder;
1398 use crate::prelude::*;
1399 use citadel_io::tokio;
1400 use rstest::rstest;
1401 use std::net::SocketAddr;
1402 use std::sync::atomic::{AtomicBool, Ordering};
1403 use std::sync::Arc;
1404 use uuid::Uuid;
1405
1406 pub struct ReceiverFileTransferKernel<R: Ratchet>(
1407 pub Option<NodeRemote<R>>,
1408 pub Arc<AtomicBool>,
1409 );
1410
1411 #[async_trait]
1412 impl<R: Ratchet> NetKernel<R> for ReceiverFileTransferKernel<R> {
1413 fn load_remote(&mut self, node_remote: NodeRemote<R>) -> Result<(), NetworkError> {
1414 self.0 = Some(node_remote);
1415 Ok(())
1416 }
1417
1418 async fn on_start(&self) -> Result<(), NetworkError> {
1419 Ok(())
1420 }
1421
1422 async fn on_node_event_received(&self, message: NodeResult<R>) -> Result<(), NetworkError> {
1423 log::trace!(target: "citadel", "SERVER received {:?}", message);
1424 if let NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) =
1425 message.into_result()?
1426 {
1427 let mut path = None;
1428 handle
1430 .accept()
1431 .map_err(|err| NetworkError::msg(err.into_string()))?;
1432
1433 use citadel_types::proto::ObjectTransferStatus;
1434 use futures::StreamExt;
1435 while let Some(status) = handle.next().await {
1436 match status {
1437 ObjectTransferStatus::ReceptionComplete => {
1438 log::trace!(target: "citadel", "Server has finished receiving the file!");
1439 let cmp = include_bytes!("../../resources/TheBridge.pdf");
1440 let streamed_data = citadel_io::tokio::fs::read(path.clone().unwrap())
1441 .await
1442 .unwrap();
1443 assert_eq!(
1444 cmp,
1445 streamed_data.as_slice(),
1446 "Original data and streamed data does not match"
1447 );
1448
1449 self.1.store(true, Ordering::Relaxed);
1450 self.0.clone().unwrap().shutdown().await?;
1451 }
1452
1453 ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => {
1454 path = Some(file_path);
1455 assert_eq!(vfm.name, "TheBridge.pdf")
1456 }
1457
1458 _ => {}
1459 }
1460 }
1461 }
1462
1463 Ok(())
1464 }
1465
1466 async fn on_stop(&mut self) -> Result<(), NetworkError> {
1467 Ok(())
1468 }
1469 }
1470
1471 pub fn server_info<'a, R: Ratchet>(
1472 switch: Arc<AtomicBool>,
1473 ) -> (NodeFuture<'a, ReceiverFileTransferKernel<R>>, SocketAddr) {
1474 crate::test_common::server_test_node(ReceiverFileTransferKernel(None, switch), |_| {})
1475 }
1476
1477 #[rstest]
1478 #[case(
1479 EncryptionAlgorithm::AES_GCM_256,
1480 KemAlgorithm::MlKem,
1481 SigAlgorithm::None
1482 )]
1483 #[case(
1484 EncryptionAlgorithm::MlKemHybrid,
1485 KemAlgorithm::MlKem,
1486 SigAlgorithm::MlDsa65
1487 )]
1488 #[timeout(std::time::Duration::from_secs(90))]
1489 #[tokio::test]
1490 async fn test_c2s_file_transfer(
1491 #[case] enx: EncryptionAlgorithm,
1492 #[case] kem: KemAlgorithm,
1493 #[case] sig: SigAlgorithm,
1494 ) {
1495 citadel_logging::setup_log();
1496 let client_success = &AtomicBool::new(false);
1497 let server_success = &Arc::new(AtomicBool::new(false));
1498 let (server, server_addr) = server_info::<StackedRatchet>(server_success.clone());
1499 let uuid = Uuid::new_v4();
1500
1501 let session_security_settings = SessionSecuritySettingsBuilder::default()
1502 .with_crypto_params(enx + kem + sig)
1503 .build()
1504 .unwrap();
1505
1506 let server_connection_settings =
1507 DefaultServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid)
1508 .with_session_security_settings(session_security_settings)
1509 .disable_udp()
1510 .build()
1511 .unwrap();
1512
1513 let client_kernel = SingleClientServerConnectionKernel::new(
1514 server_connection_settings,
1515 |connection| async move {
1516 log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***");
1517 connection
1518 .send_file_with_custom_opts(
1519 "../resources/TheBridge.pdf",
1520 32 * 1024,
1521 TransferType::FileTransfer,
1522 )
1523 .await
1524 .unwrap();
1525 log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***");
1526 client_success.store(true, Ordering::Relaxed);
1527 connection.shutdown_kernel().await
1528 },
1529 );
1530
1531 let client = DefaultNodeBuilder::default().build(client_kernel).unwrap();
1532
1533 let joined = futures::future::try_join(server, client);
1534
1535 let _ = joined.await.unwrap();
1536
1537 assert!(client_success.load(Ordering::Relaxed));
1538 assert!(server_success.load(Ordering::Relaxed));
1539 }
1540}