1use crate::prefabs::ClientServerRemote;
64use crate::prelude::results::{PeerConnectSuccess, PeerRegisterStatus};
65use crate::prelude::*;
66use crate::remote_ext::remote_specialization::PeerRemote;
67use crate::remote_ext::results::LocalGroupPeerFullInfo;
68use std::ops::{Deref, DerefMut};
69
70use futures::StreamExt;
71use std::path::PathBuf;
72use std::time::Duration;
73
74pub(crate) mod user_ids {
75 use crate::prelude::*;
76 use std::ops::Deref;
77
78 #[derive(Debug)]
79 pub struct SymmetricIdentifierHandleRef<'a, R: Ratchet> {
81 pub(crate) user: VirtualTargetType,
82 pub(crate) remote: &'a NodeRemote<R>,
83 pub(crate) target_username: Option<String>,
84 }
85
86 impl<R: Ratchet> SymmetricIdentifierHandleRef<'_, R> {
87 pub fn into_owned(self) -> SymmetricIdentifierHandle<R> {
88 SymmetricIdentifierHandle {
89 user: self.user,
90 remote: self.remote.clone(),
91 target_username: self.target_username,
92 }
93 }
94 }
95
96 #[derive(Clone, Debug)]
97 pub struct SymmetricIdentifierHandle<R: Ratchet> {
99 user: VirtualTargetType,
100 remote: NodeRemote<R>,
101 target_username: Option<String>,
102 }
103
104 pub trait TargetLockedRemote<R: Ratchet>: Send + Sync {
105 fn user(&self) -> &VirtualTargetType;
106 fn remote(&self) -> &NodeRemote<R>;
107 fn target_username(&self) -> Option<&str>;
108 fn user_mut(&mut self) -> &mut VirtualTargetType;
109 fn session_security_settings(&self) -> Option<&SessionSecuritySettings>;
110 }
111
112 impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandleRef<'_, R> {
113 fn user(&self) -> &VirtualTargetType {
114 &self.user
115 }
116 fn remote(&self) -> &NodeRemote<R> {
117 self.remote
118 }
119 fn target_username(&self) -> Option<&str> {
120 self.target_username.as_deref()
121 }
122 fn user_mut(&mut self) -> &mut VirtualTargetType {
123 &mut self.user
124 }
125
126 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
127 None
128 }
129 }
130
131 impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandle<R> {
132 fn user(&self) -> &VirtualTargetType {
133 &self.user
134 }
135 fn remote(&self) -> &NodeRemote<R> {
136 &self.remote
137 }
138 fn target_username(&self) -> Option<&str> {
139 self.target_username.as_deref()
140 }
141 fn user_mut(&mut self) -> &mut VirtualTargetType {
142 &mut self.user
143 }
144
145 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
146 None
147 }
148 }
149
150 impl<R: Ratchet> From<SymmetricIdentifierHandleRef<'_, R>> for SymmetricIdentifierHandle<R> {
151 fn from(this: SymmetricIdentifierHandleRef<'_, R>) -> Self {
152 this.into_owned()
153 }
154 }
155
156 impl<R: Ratchet> Deref for SymmetricIdentifierHandle<R> {
157 type Target = NodeRemote<R>;
158
159 fn deref(&self) -> &Self::Target {
160 &self.remote
161 }
162 }
163
164 impl<R: Ratchet> Deref for SymmetricIdentifierHandleRef<'_, R> {
165 type Target = NodeRemote<R>;
166
167 fn deref(&self) -> &Self::Target {
168 self.remote
169 }
170 }
171}
172
173pub struct CitadelClientServerConnection<R: Ratchet> {
175 pub(crate) channel: Option<PeerChannel<R>>,
177 pub remote: ClientServerRemote<R>,
178 pub udp_channel_rx: Option<citadel_io::tokio::sync::oneshot::Receiver<UdpChannel<R>>>,
180 pub services: ServicesObject,
182 pub cid: u64,
183 pub session_security_settings: SessionSecuritySettings,
184}
185
186impl<R: Ratchet> CitadelClientServerConnection<R> {
187 pub fn split(self) -> (PeerChannelSendHalf<R>, PeerChannelRecvHalf<R>) {
193 self.channel.expect("Channel already taken").split()
194 }
195
196 pub fn take_channel(&mut self) -> Option<PeerChannel<R>> {
197 self.channel.take()
198 }
199}
200
201impl<R: Ratchet> Deref for CitadelClientServerConnection<R> {
202 type Target = ClientServerRemote<R>;
203
204 fn deref(&self) -> &Self::Target {
205 &self.remote
206 }
207}
208
209impl<R: Ratchet> DerefMut for CitadelClientServerConnection<R> {
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 &mut self.remote
212 }
213}
214
215pub struct RegisterSuccess {
217 pub cid: u64,
218}
219
220#[async_trait]
221pub trait ProtocolRemoteExt<R: Ratchet>: Remote<R> {
223 async fn register<
226 T: std::net::ToSocketAddrs + Send,
227 P: Into<String> + Send,
228 V: Into<String> + Send,
229 K: Into<SecBuffer> + Send,
230 >(
231 &self,
232 addr: T,
233 full_name: P,
234 username: V,
235 proposed_password: K,
236 default_security_settings: SessionSecuritySettings,
237 server_password: Option<PreSharedKey>,
238 ) -> Result<RegisterSuccess, NetworkError> {
239 let creds =
240 ProposedCredentials::new_register(full_name, username, proposed_password.into())
241 .await?;
242 let register_request = NodeRequest::RegisterToHypernode(RegisterToHypernode {
243 remote_addr: addr
244 .to_socket_addrs()?
245 .next()
246 .ok_or(NetworkError::InternalError("Invalid socket addr"))?,
247 proposed_credentials: creds,
248 static_security_settings: default_security_settings,
249 session_password: server_password.unwrap_or_default(),
250 });
251
252 let mut subscription = self.send_callback_subscription(register_request).await?;
253 while let Some(status) = subscription.next().await {
254 match status.into_result()? {
255 NodeResult::RegisterOkay(RegisterOkay { cid, .. }) => {
256 return Ok(RegisterSuccess { cid });
257 }
258 NodeResult::RegisterFailure(err) => {
259 return Err(NetworkError::Generic(err.error_message));
260 }
261 NodeResult::Disconnect(err) => {
262 return Err(NetworkError::Generic(err.message));
263 }
264 evt => {
265 log::warn!(target: "citadel", "Invalid NodeResult for Register request received: {evt:?}");
266 }
267 }
268 }
269
270 Err(NetworkError::InternalError(
271 "Internal kernel stream died (register)",
272 ))
273 }
274
275 async fn register_with_defaults<
278 T: std::net::ToSocketAddrs + Send,
279 P: Into<String> + Send,
280 V: Into<String> + Send,
281 K: Into<SecBuffer> + Send,
282 >(
283 &self,
284 addr: T,
285 full_name: P,
286 username: V,
287 proposed_password: K,
288 ) -> Result<RegisterSuccess, NetworkError> {
289 self.register(
290 addr,
291 full_name,
292 username,
293 proposed_password,
294 Default::default(),
295 Default::default(),
296 )
297 .await
298 }
299
300 async fn connect(
303 &self,
304 auth: AuthenticationRequest,
305 connect_mode: ConnectMode,
306 udp_mode: UdpMode,
307 keep_alive_timeout: Option<Duration>,
308 session_security_settings: SessionSecuritySettings,
309 server_password: Option<PreSharedKey>,
310 ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
311 let connect_request = NodeRequest::ConnectToHypernode(ConnectToHypernode {
312 auth_request: auth,
313 connect_mode,
314 udp_mode,
315 keep_alive_timeout: keep_alive_timeout.map(|r| r.as_secs()),
316 session_security_settings,
317 session_password: server_password.unwrap_or_default(),
318 });
319
320 let mut subscription = self.send_callback_subscription(connect_request).await?;
321 let status = subscription
322 .next()
323 .await
324 .ok_or(NetworkError::InternalError(
325 "Internal kernel stream died (connect)",
326 ))?;
327
328 return match status.into_result()? {
329 NodeResult::ConnectSuccess(ConnectSuccess {
330 ticket: _,
331 session_cid: cid,
332 remote_addr: _,
333 is_personal: _,
334 v_conn_type,
335 services,
336 welcome_message: _,
337 channel,
338 udp_rx_opt: udp_channel_rx,
339 session_security_settings,
340 }) => Ok(CitadelClientServerConnection {
341 remote: ClientServerRemote::new(
342 v_conn_type,
343 self.remote_ref().clone(),
344 session_security_settings,
345 None,
346 None,
347 ),
348 channel: Some(*channel),
349 udp_channel_rx,
350 services,
351 cid,
352 session_security_settings,
353 }),
354 NodeResult::ConnectFail(ConnectFail {
355 ticket: _,
356 cid_opt: _,
357 error_message: err,
358 }) => Err(NetworkError::Generic(err)),
359 NodeResult::Disconnect(err) => {
360 return Err(NetworkError::Generic(err.message));
361 }
362 res => Err(NetworkError::msg(format!(
363 "[connect] An unexpected response occurred: {res:?}"
364 ))),
365 };
366 }
367
368 async fn connect_with_defaults(
371 &self,
372 auth: AuthenticationRequest,
373 ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
374 self.connect(
375 auth,
376 Default::default(),
377 Default::default(),
378 None,
379 Default::default(),
380 Default::default(),
381 )
382 .await
383 }
384
385 async fn find_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
398 &self,
399 local_user: T,
400 peer: P,
401 ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
402 let account_manager = self.account_manager();
403 account_manager
404 .find_target_information(local_user, peer)
405 .await?
406 .map(move |(cid, peer)| {
407 if peer.parent_icid != 0 {
408 SymmetricIdentifierHandleRef {
409 user: VirtualTargetType::ExternalGroupPeer {
410 session_cid: cid,
411 interserver_cid: peer.parent_icid,
412 peer_cid: peer.cid,
413 },
414 remote: self.remote_ref(),
415 target_username: None,
416 }
417 } else {
418 SymmetricIdentifierHandleRef {
419 user: VirtualTargetType::LocalGroupPeer {
420 session_cid: cid,
421 peer_cid: peer.cid,
422 },
423 remote: self.remote_ref(),
424 target_username: None,
425 }
426 }
427 })
428 .ok_or_else(|| NetworkError::msg("Target pair not found"))
429 }
430
431 async fn propose_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
434 &self,
435 local_user: T,
436 peer: P,
437 ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
438 let local_cid = self.get_session_cid(local_user).await?;
439 match peer.into() {
440 UserIdentifier::ID(peer_cid) => Ok(SymmetricIdentifierHandleRef {
441 user: VirtualTargetType::LocalGroupPeer {
442 session_cid: local_cid,
443 peer_cid,
444 },
445 remote: self.remote_ref(),
446 target_username: None,
447 }),
448 UserIdentifier::Username(uname) => {
449 let peer_cid = self
450 .remote_ref()
451 .account_manager()
452 .find_target_information(local_cid, uname.clone())
453 .await?
454 .map(|r| r.1.cid)
455 .unwrap_or(0);
456 Ok(SymmetricIdentifierHandleRef {
457 user: VirtualTargetType::LocalGroupPeer {
458 session_cid: local_cid,
459 peer_cid,
460 },
461 remote: self.remote_ref(),
462 target_username: Some(uname),
463 })
464 }
465 }
466 }
467
468 async fn get_local_group_peers<T: Into<UserIdentifier> + Send>(
471 &self,
472 local_user: T,
473 limit: Option<usize>,
474 ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
475 let local_cid = self.get_session_cid(local_user).await?;
476 let command = NodeRequest::PeerCommand(PeerCommand {
477 session_cid: local_cid,
478 command: PeerSignal::GetRegisteredPeers {
479 peer_conn_type: ClientConnectionType::Server {
480 session_cid: local_cid,
481 },
482 response: None,
483 limit: limit.map(|r| r as i32),
484 },
485 });
486
487 let mut stream = self.send_callback_subscription(command).await?;
488
489 while let Some(status) = stream.next().await {
490 if let NodeResult::PeerEvent(PeerEvent {
491 event:
492 PeerSignal::GetRegisteredPeers {
493 peer_conn_type: _,
494 response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
495 limit: _,
496 },
497 ticket: _,
498 ..
499 }) = status.into_result()?
500 {
501 return Ok(peer_info
502 .into_iter()
503 .zip(is_onlines.into_iter())
504 .filter_map(|(peer_info, is_online)| {
505 peer_info.map(|info| LocalGroupPeerFullInfo {
506 cid: info.cid,
507 username: Some(info.username),
508 full_name: Some(info.full_name),
509 is_online,
510 })
511 })
512 .collect());
513 }
514 }
515
516 Err(NetworkError::InternalError(
517 "Internal kernel stream died (get_local_group_peers)",
518 ))
519 }
520
521 async fn get_local_group_mutual_peers<T: Into<UserIdentifier> + Send>(
523 &self,
524 local_user: T,
525 ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
526 let local_cid = self.get_session_cid(local_user).await?;
527 let command = NodeRequest::PeerCommand(PeerCommand {
528 session_cid: local_cid,
529 command: PeerSignal::GetMutuals {
530 v_conn_type: ClientConnectionType::Server {
531 session_cid: local_cid,
532 },
533 response: None,
534 },
535 });
536
537 let mut stream = self.send_callback_subscription(command).await?;
538
539 while let Some(status) = stream.next().await {
540 if let NodeResult::PeerEvent(PeerEvent {
541 event:
542 PeerSignal::GetMutuals {
543 v_conn_type: _,
544 response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
545 },
546 ticket: _,
547 ..
548 }) = status.into_result()?
549 {
550 return Ok(peer_info
551 .into_iter()
552 .zip(is_onlines.into_iter())
553 .filter_map(|(peer_info, is_online)| {
554 peer_info.map(|info| LocalGroupPeerFullInfo {
555 cid: info.cid,
556 username: Some(info.username),
557 full_name: Some(info.full_name),
558 is_online,
559 })
560 })
561 .collect());
562 }
563 }
564
565 Err(NetworkError::msg("Stream died"))
566 }
567
568 async fn sessions(&self) -> Result<ActiveSessions, NetworkError> {
571 match self
572 .send_callback_subscription(NodeRequest::GetActiveSessions)
573 .await
574 {
575 Ok(mut stream) => {
576 while let Some(result) = stream.next().await {
577 match result.into_result()? {
578 NodeResult::SessionList(res) => return Ok(res.sessions),
579 res => {
580 citadel_logging::warn!("Received unexpected result: {res:?}");
581 }
582 }
583 }
584
585 citadel_logging::warn!("Failed to receive response from SDK (stream died)");
586 return Err(NetworkError::InternalError(
587 "Internal kernel stream died (get_active_sessions)",
588 ));
589 }
590 Err(e) => {
591 citadel_logging::warn!(target: "citadel", "Failed to query SDK sessions: {e}");
592 }
593 }
594
595 Err(NetworkError::msg("Failed to query SDK sessions"))
596 }
597
598 #[doc(hidden)]
599 fn remote_ref(&self) -> &NodeRemote<R>;
600
601 #[doc(hidden)]
602 async fn get_session_cid<T: Into<UserIdentifier> + Send>(
603 &self,
604 local_user: T,
605 ) -> Result<u64, NetworkError> {
606 let account_manager = self.account_manager();
607 Ok(account_manager
608 .find_local_user_information(local_user)
609 .await?
610 .ok_or(NetworkError::InvalidRequest("User does not exist"))?)
611 }
612}
613
614impl<R: Ratchet> ProtocolRemoteExt<R> for NodeRemote<R> {
615 fn remote_ref(&self) -> &NodeRemote<R> {
616 self
617 }
618}
619
620impl<R: Ratchet> ProtocolRemoteExt<R> for ClientServerRemote<R> {
621 fn remote_ref(&self) -> &NodeRemote<R> {
622 &self.inner
623 }
624}
625
626#[async_trait]
627pub trait ProtocolRemoteTargetExt<R: Ratchet>: TargetLockedRemote<R> {
629 async fn send_file_with_custom_opts<T: ObjectSource>(
631 &self,
632 source: T,
633 chunk_size: usize,
634 transfer_type: TransferType,
635 ) -> Result<(), NetworkError> {
636 let chunk_size = if chunk_size == 0 {
637 None
638 } else {
639 Some(chunk_size)
640 };
641 let session_cid = self.user().get_session_cid();
642 let user = *self.user();
643 let remote = self.remote();
644
645 let mut stream = remote
646 .send_callback_subscription(NodeRequest::SendObject(SendObject {
647 source: Box::new(source),
648 chunk_size,
649 session_cid,
650 v_conn_type: user,
651 transfer_type,
652 }))
653 .await?;
654
655 while let Some(event) = stream.next().await {
656 match event.into_result()? {
657 NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
658 return handle
659 .transfer_file()
660 .await
661 .map_err(|err| NetworkError::Generic(err.into_string()));
662 }
663
664 NodeResult::PeerEvent(PeerEvent {
665 event: PeerSignal::SignalReceived { .. },
666 ..
667 }) => {}
668
669 res => {
670 log::warn!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {res:?}")
671 }
672 }
673 }
674
675 Err(NetworkError::InternalError("File transfer stream died"))
676 }
677
678 async fn send_file<T: ObjectSource>(&self, source: T) -> Result<(), NetworkError> {
680 self.send_file_with_custom_opts(source, 0, TransferType::FileTransfer)
681 .await
682 }
683
684 async fn remote_encrypted_virtual_filesystem_push_custom_chunking<
687 T: ObjectSource,
688 P: Into<PathBuf> + Send,
689 >(
690 &self,
691 source: T,
692 virtual_directory: P,
693 chunk_size: usize,
694 security_level: SecurityLevel,
695 ) -> Result<(), NetworkError> {
696 self.can_use_revfs()?;
697 let mut virtual_path = virtual_directory.into();
698 virtual_path = prepare_virtual_path(virtual_path);
699 validate_virtual_path(&virtual_path)
700 .map_err(|err| NetworkError::Generic(err.into_string()))?;
701 let tx_type = TransferType::RemoteEncryptedVirtualFilesystem {
702 virtual_path,
703 security_level,
704 };
705 self.send_file_with_custom_opts(source, chunk_size, tx_type)
706 .await
707 }
708
709 async fn remote_encrypted_virtual_filesystem_push<T: ObjectSource, P: Into<PathBuf> + Send>(
712 &self,
713 source: T,
714 virtual_directory: P,
715 security_level: SecurityLevel,
716 ) -> Result<(), NetworkError> {
717 self.remote_encrypted_virtual_filesystem_push_custom_chunking(
718 source,
719 virtual_directory,
720 0,
721 security_level,
722 )
723 .await
724 }
725
726 async fn remote_encrypted_virtual_filesystem_pull<P: Into<PathBuf> + Send>(
729 &self,
730 virtual_directory: P,
731 transfer_security_level: SecurityLevel,
732 delete_on_pull: bool,
733 ) -> Result<PathBuf, NetworkError> {
734 self.can_use_revfs()?;
735 let request = NodeRequest::PullObject(PullObject {
736 v_conn: *self.user(),
737 virtual_dir: virtual_directory.into(),
738 delete_on_pull,
739 transfer_security_level,
740 });
741
742 let mut stream = self.remote().send_callback_subscription(request).await?;
743
744 while let Some(event) = stream.next().await {
745 match event.into_result()? {
746 NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
747 return handle
748 .receive_file()
749 .await
750 .map_err(|err| NetworkError::Generic(err.into_string()));
751 }
752
753 NodeResult::PeerEvent(PeerEvent {
754 event: PeerSignal::SignalReceived { .. },
755 ..
756 }) => {}
757
758 res => {
759 log::error!(target: "citadel", "Invalid NodeResult for REVFS FileTransfer request received: {res:?}");
760 return Err(NetworkError::InternalError(
761 "Received invalid response from protocol",
762 ));
763 }
764 }
765 }
766
767 Err(NetworkError::InternalError(
768 "REVFS File transfer stream died",
769 ))
770 }
771
772 async fn remote_encrypted_virtual_filesystem_delete<P: Into<PathBuf> + Send>(
776 &self,
777 virtual_directory: P,
778 ) -> Result<(), NetworkError> {
779 self.can_use_revfs()?;
780 let request = NodeRequest::DeleteObject(DeleteObject {
781 v_conn: *self.user(),
782 virtual_dir: virtual_directory.into(),
783 security_level: Default::default(),
784 });
785
786 let mut stream = self.remote().send_callback_subscription(request).await?;
787 while let Some(event) = stream.next().await {
788 match event.into_result()? {
789 NodeResult::ReVFS(result) => {
790 return if let Some(error) = result.error_message {
791 Err(NetworkError::Generic(error))
792 } else {
793 Ok(())
794 }
795 }
796
797 evt => {
798 log::error!(target: "citadel", "Invalid NodeResult for REVFS Delete request received: {evt:?}");
799 }
800 }
801 }
802
803 Err(NetworkError::InternalError("REVFS Delete stream died"))
804 }
805
806 async fn connect_to_peer_custom(
808 &self,
809 session_security_settings: SessionSecuritySettings,
810 udp_mode: UdpMode,
811 peer_session_password: Option<PreSharedKey>,
812 ) -> Result<PeerConnectSuccess<R>, NetworkError> {
813 use std::time::Duration;
814
815 const P2P_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
820
821 let session_cid = self.user().get_session_cid();
822 let peer_target = self.try_as_peer_connection().await?;
823
824 let mut stream = self
825 .remote()
826 .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
827 session_cid,
828 command: PeerSignal::PostConnect {
829 peer_conn_type: peer_target,
830 ticket_opt: None,
831 invitee_response: None,
832 session_security_settings,
833 udp_mode,
834 session_password: peer_session_password,
835 },
836 }))
837 .await?;
838
839 let connect_task = async {
840 while let Some(status) = stream.next().await {
841 match status.into_result()? {
842 NodeResult::PeerChannelCreated(PeerChannelCreated {
843 ticket: _,
844 channel,
845 udp_rx_opt,
846 }) => {
847 let username = self.target_username().map(ToString::to_string);
848 let remote = PeerRemote {
849 inner: self.remote().clone(),
850 peer: peer_target.as_virtual_connection(),
851 username,
852 session_security_settings,
853 };
854
855 return Ok(PeerConnectSuccess {
856 remote,
857 channel: *channel,
858 udp_channel_rx: udp_rx_opt,
859 incoming_object_transfer_handles: None,
860 });
861 }
862
863 NodeResult::PeerEvent(PeerEvent {
864 event:
865 PeerSignal::PostConnect {
866 invitee_response, ..
867 },
868 ..
869 }) => match invitee_response {
870 Some(PeerResponse::Timeout) => {
871 return Err(NetworkError::msg("Peer did not respond in time"))
872 }
873 Some(PeerResponse::Decline) => {
874 return Err(NetworkError::msg("Peer declined to connect"))
875 }
876 _ => {}
877 },
878
879 _ => {}
880 }
881 }
882
883 Err(NetworkError::InternalError(
884 "Internal kernel stream died (connect_to_peer_custom)",
885 ))
886 };
887
888 match citadel_io::tokio::time::timeout(P2P_CONNECT_TIMEOUT, connect_task).await {
889 Ok(result) => result,
890 Err(_elapsed) => Err(NetworkError::msg(format!(
891 "P2P connection timed out after {}s waiting for PeerChannelCreated",
892 P2P_CONNECT_TIMEOUT.as_secs()
893 ))),
894 }
895 }
896
897 async fn connect_to_peer(&self) -> Result<PeerConnectSuccess<R>, NetworkError> {
899 self.connect_to_peer_custom(Default::default(), Default::default(), Default::default())
900 .await
901 }
902
903 async fn register_to_peer(&self) -> Result<PeerRegisterStatus, NetworkError> {
905 let session_cid = self.user().get_session_cid();
906 let peer_target = self.try_as_peer_connection().await?;
907 let local_username = self
909 .remote()
910 .account_manager()
911 .get_username_by_cid(session_cid)
912 .await?
913 .ok_or_else(|| NetworkError::msg("Unable to find username for local user"))?;
914 let peer_username_opt = self.target_username().map(ToString::to_string);
915
916 let mut stream = self
917 .remote()
918 .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
919 session_cid,
920 command: PeerSignal::PostRegister {
921 peer_conn_type: peer_target,
922 inviter_username: local_username,
923 invitee_username: peer_username_opt,
924 ticket_opt: None,
925 invitee_response: None,
926 },
927 }))
928 .await?;
929
930 while let Some(status) = stream.next().await {
931 if let NodeResult::PeerEvent(PeerEvent {
932 event:
933 PeerSignal::PostRegister {
934 peer_conn_type: _,
935 inviter_username: _,
936 invitee_username: _,
937 ticket_opt: _,
938 invitee_response: Some(resp),
939 },
940 ticket: _,
941 ..
942 }) = status.into_result()?
943 {
944 match resp {
945 PeerResponse::Accept(..) => return Ok(PeerRegisterStatus::Accepted),
946 PeerResponse::Decline => return Ok(PeerRegisterStatus::Declined),
947 PeerResponse::Timeout => return Ok(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) }),
948 _ => {}
949 }
950 }
951 }
952
953 Err(NetworkError::Generic(format!(
954 "Internal kernel stream died (register_to_peer): {:?}",
955 stream.callback_key()
956 )))
957 }
958
959 async fn deregister(&self) -> Result<(), NetworkError> {
963 if let Ok(peer_conn) = self.try_as_peer_connection().await {
964 let peer_request = PeerSignal::Deregister {
965 peer_conn_type: peer_conn,
966 };
967 let session_cid = self.user().get_session_cid();
968 let request = NodeRequest::PeerCommand(PeerCommand {
969 session_cid,
970 command: peer_request,
971 });
972
973 let mut subscription = self.remote().send_callback_subscription(request).await?;
974 while let Some(result) = subscription.next().await {
975 if let NodeResult::PeerEvent(PeerEvent {
976 event: PeerSignal::DeregistrationSuccess { .. },
977 ticket: _,
978 ..
979 }) = result.into_result()?
980 {
981 return Ok(());
982 }
983 }
984 } else {
985 let cid = self.user().get_session_cid();
987 let request = NodeRequest::DeregisterFromHypernode(DeregisterFromHypernode {
988 session_cid: cid,
989 v_conn_type: *self.user(),
990 });
991 let mut subscription = self.remote().send_callback_subscription(request).await?;
992 while let Some(result) = subscription.next().await {
993 match result.into_result()? {
994 NodeResult::DeRegistration(DeRegistration {
995 session_cid: _,
996 ticket_opt: _,
997 success: true,
998 }) => return Ok(()),
999 NodeResult::DeRegistration(DeRegistration {
1000 session_cid: _,
1001 ticket_opt: _,
1002 success: false,
1003 }) => {
1004 return Err(NetworkError::msg(
1005 "Unable to deregister: status=false".to_string(),
1006 ))
1007 }
1008
1009 _ => {}
1010 }
1011 }
1012 }
1013
1014 Err(NetworkError::InternalError("Deregister ended unexpectedly"))
1015 }
1016
1017 async fn disconnect(&self) -> Result<(), NetworkError> {
1018 if let Ok(peer_conn) = self.try_as_peer_connection().await {
1019 if let PeerConnectionType::LocalGroupPeer {
1020 session_cid,
1021 peer_cid: _,
1022 } = peer_conn
1023 {
1024 let request = NodeRequest::PeerCommand(PeerCommand {
1025 session_cid,
1026 command: PeerSignal::Disconnect {
1027 peer_conn_type: peer_conn,
1028 disconnect_response: None,
1029 },
1030 });
1031
1032 let mut subscription = self.remote().send_callback_subscription(request).await?;
1033
1034 while let Some(event) = subscription.next().await {
1035 if let NodeResult::PeerEvent(PeerEvent {
1036 event:
1037 PeerSignal::Disconnect {
1038 peer_conn_type: _,
1039 disconnect_response: Some(_),
1040 },
1041 ticket: _,
1042 ..
1043 }) = event.into_result()?
1044 {
1045 return Ok(());
1046 }
1047 }
1048
1049 Err(NetworkError::InternalError(
1050 "Unable to receive valid disconnect event",
1051 ))
1052 } else {
1053 Err(NetworkError::msg(
1054 "External group peer functionality not enabled",
1055 ))
1056 }
1057 } else {
1058 let cid = self.user().get_session_cid();
1060 let request =
1061 NodeRequest::DisconnectFromHypernode(DisconnectFromHypernode { session_cid: cid });
1062
1063 let mut subscription = self.remote().send_callback_subscription(request).await?;
1064 while let Some(event) = subscription.next().await {
1065 if let NodeResult::Disconnect(Disconnect {
1066 success, message, ..
1067 }) = event.into_result()?
1068 {
1069 return if success {
1070 Ok(())
1071 } else {
1072 Err(NetworkError::msg(message))
1073 };
1074 }
1075 }
1076
1077 Err(NetworkError::InternalError(
1078 "Unable to receive valid disconnect event",
1079 ))
1080 }
1081 }
1082
1083 async fn create_group(
1084 &self,
1085 initial_users_to_invite: Option<Vec<UserIdentifier>>,
1086 ) -> Result<GroupChannel, NetworkError> {
1087 let session_cid = self.user().get_session_cid();
1088
1089 let mut initial_users = vec![];
1090 let options = MessageGroupOptions::default();
1092 if let Some(initial_users_to_invite) = initial_users_to_invite {
1096 for user in initial_users_to_invite {
1097 initial_users.push(
1098 self.remote()
1099 .account_manager()
1100 .find_target_information(session_cid, user.clone())
1101 .await
1102 .map_err(|err| NetworkError::msg(err.into_string()))?
1103 .ok_or_else(|| {
1104 NetworkError::msg(format!(
1105 "Account {user:?} not found for local user {session_cid:?}"
1106 ))
1107 })
1108 .map(|r| r.1.cid)?,
1109 )
1110 }
1111 }
1112
1113 let group_request = GroupBroadcast::Create {
1114 initial_invitees: initial_users,
1115 options,
1116 };
1117 let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1118 session_cid,
1119 command: group_request,
1120 });
1121 let mut subscription = self.remote().send_callback_subscription(request).await?;
1122 while let Some(evt) = subscription.next().await {
1123 if let NodeResult::GroupChannelCreated(GroupChannelCreated {
1124 ticket: _,
1125 channel,
1126 session_cid: _,
1127 }) = evt
1128 {
1129 return Ok(channel);
1130 }
1131 }
1132
1133 Err(NetworkError::InternalError(
1134 "Create_group ended unexpectedly",
1135 ))
1136 }
1137
1138 async fn list_owned_groups(&self) -> Result<Vec<MessageGroupKey>, NetworkError> {
1140 let session_cid = self.user().get_session_cid();
1141 let cid_to_check_for = match self.try_as_peer_connection().await {
1142 Ok(res) => res.get_original_target_cid(),
1143 _ => session_cid,
1144 };
1145 let group_request = GroupBroadcast::ListGroupsFor {
1146 cid: cid_to_check_for,
1147 };
1148 let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1149 session_cid,
1150 command: group_request,
1151 });
1152
1153 let mut subscription = self.remote().send_callback_subscription(request).await?;
1154
1155 while let Some(evt) = subscription.next().await {
1156 if let NodeResult::GroupEvent(GroupEvent {
1157 session_cid: _,
1158 ticket: _,
1159 event: GroupBroadcast::ListResponse { groups },
1160 }) = evt.into_result()?
1161 {
1162 return Ok(groups);
1163 }
1164 }
1165
1166 Err(NetworkError::InternalError(
1167 "List_members ended unexpectedly",
1168 ))
1169 }
1170
1171 async fn list_sessions(&self) -> Result<ActiveSessions, NetworkError> {
1175 let request = NodeRequest::GetActiveSessions;
1176 let mut subscription = self.remote().send_callback_subscription(request).await?;
1177
1178 if let Some(NodeResult::SessionList(result)) = subscription.next().await {
1179 return Ok(result.sessions);
1180 }
1181
1182 Err(NetworkError::InternalError(
1183 "List_sessions ended unexpectedly",
1184 ))
1185 }
1186
1187 async fn rekey(&self) -> Result<Option<u32>, NetworkError> {
1191 let request = NodeRequest::ReKey(ReKey {
1192 v_conn_type: *self.user(),
1193 });
1194 let mut subscription = self.remote().send_callback_subscription(request).await?;
1195
1196 while let Some(evt) = subscription.next().await {
1197 if let NodeResult::ReKeyResult(result) = evt {
1198 return match result.status {
1199 ReKeyReturnType::Success { version } => Ok(Some(version)),
1200 ReKeyReturnType::AlreadyInProgress => Ok(None),
1201 ReKeyReturnType::Failure { err } => {
1202 Err(NetworkError::Generic(format!("Rekey failed: {err}")))
1203 }
1204 };
1205 }
1206 }
1207
1208 Err(NetworkError::InternalError("Rekey ended unexpectedly"))
1209 }
1210
1211 async fn is_peer_registered(&self) -> Result<bool, NetworkError> {
1213 let target = self.try_as_peer_connection().await?;
1214 if let PeerConnectionType::LocalGroupPeer {
1215 session_cid: local_cid,
1216 peer_cid,
1217 } = target
1218 {
1219 let peers = self.remote().get_local_group_peers(local_cid, None).await?;
1220 citadel_logging::info!(target: "citadel", "Checking to see if {target} is registered in {peers:?}");
1221 Ok(peers.iter().any(|p| p.cid == peer_cid))
1222 } else {
1223 Err(NetworkError::Generic(
1224 "External group peers are not supported yet".to_string(),
1225 ))
1226 }
1227 }
1228
1229 #[doc(hidden)]
1230 async fn try_as_peer_connection(&self) -> Result<PeerConnectionType, NetworkError> {
1231 let verified_return = |user: &VirtualTargetType| {
1232 user.try_as_peer_connection()
1233 .ok_or(NetworkError::InvalidRequest("Target is not a peer"))
1234 };
1235
1236 if self.user().get_target_cid() == 0 {
1237 let peer_username = self
1240 .target_username()
1241 .ok_or_else(|| NetworkError::msg("target_cid=0, yet, no username was provided"))?;
1242 let session_cid = self.user().get_session_cid();
1243 let expected_peer_cid = self
1244 .remote()
1245 .account_manager()
1246 .get_persistence_handler()
1247 .get_cid_by_username(peer_username);
1248 let peer_cid = self
1251 .remote()
1252 .account_manager()
1253 .find_target_information(session_cid, peer_username)
1254 .await
1255 .map_err(|err| NetworkError::Generic(err.into_string()))?
1256 .map(|r| r.1.cid)
1257 .unwrap_or(expected_peer_cid);
1258
1259 let mut user = *self.user();
1260 user.set_target_cid(peer_cid);
1261 verified_return(&user)
1262 } else {
1263 verified_return(self.user())
1264 }
1265 }
1266
1267 #[doc(hidden)]
1268 fn can_use_revfs(&self) -> Result<(), NetworkError> {
1269 if let Some(sess) = self.session_security_settings() {
1270 if sess.crypto_params.kem_algorithm == KemAlgorithm::Kyber {
1271 Ok(())
1272 } else {
1273 Err(NetworkError::InvalidRequest(
1274 "RE-VFS can only be used with Kyber KEM",
1275 ))
1276 }
1277 } else {
1278 Err(NetworkError::InvalidRequest(
1279 "RE-VFS cannot be used with this remote type",
1280 ))
1281 }
1282 }
1283}
1284
1285impl<T: TargetLockedRemote<R>, R: Ratchet> ProtocolRemoteTargetExt<R> for T {}
1286
1287pub mod results {
1288 use crate::prefabs::client::peer_connection::FileTransferHandleRx;
1289 use crate::prelude::{PeerChannel, UdpChannel};
1290 use crate::remote_ext::remote_specialization::PeerRemote;
1291 use citadel_io::tokio::sync::oneshot::Receiver;
1292 use citadel_proto::prelude::*;
1293 use std::fmt::Debug;
1294
1295 pub struct PeerConnectSuccess<R: Ratchet> {
1296 pub channel: PeerChannel<R>,
1297 pub udp_channel_rx: Option<Receiver<UdpChannel<R>>>,
1298 pub remote: PeerRemote<R>,
1299 pub(crate) incoming_object_transfer_handles: Option<FileTransferHandleRx>,
1302 }
1303
1304 impl<R: Ratchet> Debug for PeerConnectSuccess<R> {
1305 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1306 f.debug_struct("PeerConnectSuccess")
1307 .field("channel", &self.channel)
1308 .field("udp_channel_rx", &self.udp_channel_rx)
1309 .finish()
1310 }
1311 }
1312
1313 impl<R: Ratchet> PeerConnectSuccess<R> {
1314 pub fn get_incoming_file_transfer_handle(
1316 &mut self,
1317 ) -> Result<FileTransferHandleRx, NetworkError> {
1318 self.incoming_object_transfer_handles
1319 .take()
1320 .ok_or(NetworkError::InternalError(
1321 "This function has already been called",
1322 ))
1323 }
1324 }
1325
1326 pub enum PeerRegisterStatus {
1327 Accepted,
1328 Declined,
1329 Failed { reason: Option<String> },
1330 }
1331
1332 #[derive(Clone, Debug)]
1333 pub struct LocalGroupPeer {
1334 pub cid: u64,
1335 pub is_online: bool,
1336 }
1337
1338 #[derive(Clone, Debug)]
1339 pub struct LocalGroupPeerFullInfo {
1340 pub cid: u64,
1341 pub username: Option<String>,
1342 pub full_name: Option<String>,
1343 pub is_online: bool,
1344 }
1345}
1346
1347pub mod remote_specialization {
1348 use crate::prelude::*;
1349 use std::ops::{Deref, DerefMut};
1350
1351 #[derive(Debug, Clone)]
1352 pub struct PeerRemote<R: Ratchet> {
1353 pub(crate) inner: NodeRemote<R>,
1354 pub(crate) peer: VirtualTargetType,
1355 pub(crate) username: Option<String>,
1356 pub(crate) session_security_settings: SessionSecuritySettings,
1357 }
1358
1359 impl<R: Ratchet> Deref for PeerRemote<R> {
1360 type Target = NodeRemote<R>;
1361 fn deref(&self) -> &Self::Target {
1362 &self.inner
1363 }
1364 }
1365
1366 impl<R: Ratchet> DerefMut for PeerRemote<R> {
1367 fn deref_mut(&mut self) -> &mut Self::Target {
1368 &mut self.inner
1369 }
1370 }
1371
1372 impl<R: Ratchet> TargetLockedRemote<R> for PeerRemote<R> {
1373 fn user(&self) -> &VirtualTargetType {
1374 &self.peer
1375 }
1376 fn remote(&self) -> &NodeRemote<R> {
1377 &self.inner
1378 }
1379 fn target_username(&self) -> Option<&str> {
1380 self.username.as_deref()
1381 }
1382 fn user_mut(&mut self) -> &mut VirtualTargetType {
1383 &mut self.peer
1384 }
1385
1386 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
1387 Some(&self.session_security_settings)
1388 }
1389 }
1390}
1391
1392#[cfg(test)]
1393mod tests {
1394 use crate::prefabs::client::single_connection::SingleClientServerConnectionKernel;
1395 use crate::prefabs::client::DefaultServerConnectionSettingsBuilder;
1396 use crate::prelude::*;
1397 use citadel_io::tokio;
1398 use rstest::rstest;
1399 use std::net::SocketAddr;
1400 use std::sync::atomic::{AtomicBool, Ordering};
1401 use std::sync::Arc;
1402 use uuid::Uuid;
1403
1404 pub struct ReceiverFileTransferKernel<R: Ratchet>(
1405 pub Option<NodeRemote<R>>,
1406 pub Arc<AtomicBool>,
1407 );
1408
1409 #[async_trait]
1410 impl<R: Ratchet> NetKernel<R> for ReceiverFileTransferKernel<R> {
1411 fn load_remote(&mut self, node_remote: NodeRemote<R>) -> Result<(), NetworkError> {
1412 self.0 = Some(node_remote);
1413 Ok(())
1414 }
1415
1416 async fn on_start(&self) -> Result<(), NetworkError> {
1417 Ok(())
1418 }
1419
1420 async fn on_node_event_received(&self, message: NodeResult<R>) -> Result<(), NetworkError> {
1421 log::trace!(target: "citadel", "SERVER received {:?}", message);
1422 if let NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) =
1423 message.into_result()?
1424 {
1425 let mut path = None;
1426 handle
1428 .accept()
1429 .map_err(|err| NetworkError::msg(err.into_string()))?;
1430
1431 use citadel_types::proto::ObjectTransferStatus;
1432 use futures::StreamExt;
1433 while let Some(status) = handle.next().await {
1434 match status {
1435 ObjectTransferStatus::ReceptionComplete => {
1436 log::trace!(target: "citadel", "Server has finished receiving the file!");
1437 let cmp = include_bytes!("../../resources/TheBridge.pdf");
1438 let streamed_data = citadel_io::tokio::fs::read(path.clone().unwrap())
1439 .await
1440 .unwrap();
1441 assert_eq!(
1442 cmp,
1443 streamed_data.as_slice(),
1444 "Original data and streamed data does not match"
1445 );
1446
1447 self.1.store(true, Ordering::Relaxed);
1448 self.0.clone().unwrap().shutdown().await?;
1449 }
1450
1451 ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => {
1452 path = Some(file_path);
1453 assert_eq!(vfm.name, "TheBridge.pdf")
1454 }
1455
1456 _ => {}
1457 }
1458 }
1459 }
1460
1461 Ok(())
1462 }
1463
1464 async fn on_stop(&mut self) -> Result<(), NetworkError> {
1465 Ok(())
1466 }
1467 }
1468
1469 pub fn server_info<'a, R: Ratchet>(
1470 switch: Arc<AtomicBool>,
1471 ) -> (NodeFuture<'a, ReceiverFileTransferKernel<R>>, SocketAddr) {
1472 crate::test_common::server_test_node(ReceiverFileTransferKernel(None, switch), |_| {})
1473 }
1474
1475 #[rstest]
1476 #[case(
1477 EncryptionAlgorithm::AES_GCM_256,
1478 KemAlgorithm::Kyber,
1479 SigAlgorithm::None
1480 )]
1481 #[case(
1482 EncryptionAlgorithm::KyberHybrid,
1483 KemAlgorithm::Kyber,
1484 SigAlgorithm::Dilithium65
1485 )]
1486 #[timeout(std::time::Duration::from_secs(90))]
1487 #[tokio::test]
1488 async fn test_c2s_file_transfer(
1489 #[case] enx: EncryptionAlgorithm,
1490 #[case] kem: KemAlgorithm,
1491 #[case] sig: SigAlgorithm,
1492 ) {
1493 citadel_logging::setup_log();
1494 let client_success = &AtomicBool::new(false);
1495 let server_success = &Arc::new(AtomicBool::new(false));
1496 let (server, server_addr) = server_info::<StackedRatchet>(server_success.clone());
1497 let uuid = Uuid::new_v4();
1498
1499 let session_security_settings = SessionSecuritySettingsBuilder::default()
1500 .with_crypto_params(enx + kem + sig)
1501 .build()
1502 .unwrap();
1503
1504 let server_connection_settings =
1505 DefaultServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid)
1506 .with_session_security_settings(session_security_settings)
1507 .disable_udp()
1508 .build()
1509 .unwrap();
1510
1511 let client_kernel = SingleClientServerConnectionKernel::new(
1512 server_connection_settings,
1513 |connection| async move {
1514 log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***");
1515 connection
1516 .send_file_with_custom_opts(
1517 "../resources/TheBridge.pdf",
1518 32 * 1024,
1519 TransferType::FileTransfer,
1520 )
1521 .await
1522 .unwrap();
1523 log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***");
1524 client_success.store(true, Ordering::Relaxed);
1525 connection.shutdown_kernel().await
1526 },
1527 );
1528
1529 let client = DefaultNodeBuilder::default().build(client_kernel).unwrap();
1530
1531 let joined = futures::future::try_join(server, client);
1532
1533 let _ = joined.await.unwrap();
1534
1535 assert!(client_success.load(Ordering::Relaxed));
1536 assert!(server_success.load(Ordering::Relaxed));
1537 }
1538}