1use crate::prefabs::ClientServerRemote;
64use crate::prelude::results::{PeerConnectSuccess, PeerRegisterStatus};
65use crate::prelude::*;
66use crate::remote_ext::remote_specialization::PeerRemote;
67use crate::remote_ext::results::LocalGroupPeerFullInfo;
68use std::ops::{Deref, DerefMut};
69
70use futures::StreamExt;
71use std::path::PathBuf;
72use std::time::Duration;
73
74pub(crate) mod user_ids {
75 use crate::prelude::*;
76 use std::ops::Deref;
77
78 #[derive(Debug)]
79 pub struct SymmetricIdentifierHandleRef<'a, R: Ratchet> {
81 pub(crate) user: VirtualTargetType,
82 pub(crate) remote: &'a NodeRemote<R>,
83 pub(crate) target_username: Option<String>,
84 }
85
86 impl<R: Ratchet> SymmetricIdentifierHandleRef<'_, R> {
87 pub fn into_owned(self) -> SymmetricIdentifierHandle<R> {
88 SymmetricIdentifierHandle {
89 user: self.user,
90 remote: self.remote.clone(),
91 target_username: self.target_username,
92 }
93 }
94 }
95
96 #[derive(Clone, Debug)]
97 pub struct SymmetricIdentifierHandle<R: Ratchet> {
99 user: VirtualTargetType,
100 remote: NodeRemote<R>,
101 target_username: Option<String>,
102 }
103
104 pub trait TargetLockedRemote<R: Ratchet>: Send + Sync {
105 fn user(&self) -> &VirtualTargetType;
106 fn remote(&self) -> &NodeRemote<R>;
107 fn target_username(&self) -> Option<&str>;
108 fn user_mut(&mut self) -> &mut VirtualTargetType;
109 fn session_security_settings(&self) -> Option<&SessionSecuritySettings>;
110 }
111
112 impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandleRef<'_, R> {
113 fn user(&self) -> &VirtualTargetType {
114 &self.user
115 }
116 fn remote(&self) -> &NodeRemote<R> {
117 self.remote
118 }
119 fn target_username(&self) -> Option<&str> {
120 self.target_username.as_deref()
121 }
122 fn user_mut(&mut self) -> &mut VirtualTargetType {
123 &mut self.user
124 }
125
126 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
127 None
128 }
129 }
130
131 impl<R: Ratchet> TargetLockedRemote<R> for SymmetricIdentifierHandle<R> {
132 fn user(&self) -> &VirtualTargetType {
133 &self.user
134 }
135 fn remote(&self) -> &NodeRemote<R> {
136 &self.remote
137 }
138 fn target_username(&self) -> Option<&str> {
139 self.target_username.as_deref()
140 }
141 fn user_mut(&mut self) -> &mut VirtualTargetType {
142 &mut self.user
143 }
144
145 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
146 None
147 }
148 }
149
150 impl<R: Ratchet> From<SymmetricIdentifierHandleRef<'_, R>> for SymmetricIdentifierHandle<R> {
151 fn from(this: SymmetricIdentifierHandleRef<'_, R>) -> Self {
152 this.into_owned()
153 }
154 }
155
156 impl<R: Ratchet> Deref for SymmetricIdentifierHandle<R> {
157 type Target = NodeRemote<R>;
158
159 fn deref(&self) -> &Self::Target {
160 &self.remote
161 }
162 }
163
164 impl<R: Ratchet> Deref for SymmetricIdentifierHandleRef<'_, R> {
165 type Target = NodeRemote<R>;
166
167 fn deref(&self) -> &Self::Target {
168 self.remote
169 }
170 }
171}
172
173pub struct CitadelClientServerConnection<R: Ratchet> {
175 pub(crate) channel: Option<PeerChannel<R>>,
177 pub remote: ClientServerRemote<R>,
178 pub udp_channel_rx: Option<citadel_io::tokio::sync::oneshot::Receiver<UdpChannel<R>>>,
180 pub services: ServicesObject,
182 pub cid: u64,
183 pub session_security_settings: SessionSecuritySettings,
184}
185
186impl<R: Ratchet> CitadelClientServerConnection<R> {
187 pub fn split(self) -> (PeerChannelSendHalf<R>, PeerChannelRecvHalf<R>) {
193 self.channel.expect("Channel already taken").split()
194 }
195
196 pub fn take_channel(&mut self) -> Option<PeerChannel<R>> {
197 self.channel.take()
198 }
199}
200
201impl<R: Ratchet> Deref for CitadelClientServerConnection<R> {
202 type Target = ClientServerRemote<R>;
203
204 fn deref(&self) -> &Self::Target {
205 &self.remote
206 }
207}
208
209impl<R: Ratchet> DerefMut for CitadelClientServerConnection<R> {
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 &mut self.remote
212 }
213}
214
215pub struct RegisterSuccess {
217 pub cid: u64,
218}
219
220#[async_trait]
221pub trait ProtocolRemoteExt<R: Ratchet>: Remote<R> {
223 async fn register<
226 T: std::net::ToSocketAddrs + Send,
227 P: Into<String> + Send,
228 V: Into<String> + Send,
229 K: Into<SecBuffer> + Send,
230 >(
231 &self,
232 addr: T,
233 full_name: P,
234 username: V,
235 proposed_password: K,
236 default_security_settings: SessionSecuritySettings,
237 server_password: Option<PreSharedKey>,
238 ) -> Result<RegisterSuccess, NetworkError> {
239 let creds =
240 ProposedCredentials::new_register(full_name, username, proposed_password.into())
241 .await?;
242 let register_request = NodeRequest::RegisterToHypernode(RegisterToHypernode {
243 remote_addr: addr
244 .to_socket_addrs()?
245 .next()
246 .ok_or(NetworkError::InternalError("Invalid socket addr"))?,
247 proposed_credentials: creds,
248 static_security_settings: default_security_settings,
249 session_password: server_password.unwrap_or_default(),
250 });
251
252 let mut subscription = self.send_callback_subscription(register_request).await?;
253 while let Some(status) = subscription.next().await {
254 match map_errors(status)? {
255 NodeResult::RegisterOkay(RegisterOkay { cid, .. }) => {
256 return Ok(RegisterSuccess { cid });
257 }
258 NodeResult::RegisterFailure(err) => {
259 return Err(NetworkError::Generic(err.error_message));
260 }
261 NodeResult::Disconnect(err) => {
262 return Err(NetworkError::Generic(err.message));
263 }
264 evt => {
265 log::warn!(target: "citadel", "Invalid NodeResult for Register request received: {evt:?}");
266 }
267 }
268 }
269
270 Err(NetworkError::InternalError(
271 "Internal kernel stream died (register)",
272 ))
273 }
274
275 async fn register_with_defaults<
278 T: std::net::ToSocketAddrs + Send,
279 P: Into<String> + Send,
280 V: Into<String> + Send,
281 K: Into<SecBuffer> + Send,
282 >(
283 &self,
284 addr: T,
285 full_name: P,
286 username: V,
287 proposed_password: K,
288 ) -> Result<RegisterSuccess, NetworkError> {
289 self.register(
290 addr,
291 full_name,
292 username,
293 proposed_password,
294 Default::default(),
295 Default::default(),
296 )
297 .await
298 }
299
300 async fn connect(
303 &self,
304 auth: AuthenticationRequest,
305 connect_mode: ConnectMode,
306 udp_mode: UdpMode,
307 keep_alive_timeout: Option<Duration>,
308 session_security_settings: SessionSecuritySettings,
309 server_password: Option<PreSharedKey>,
310 ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
311 let connect_request = NodeRequest::ConnectToHypernode(ConnectToHypernode {
312 auth_request: auth,
313 connect_mode,
314 udp_mode,
315 keep_alive_timeout: keep_alive_timeout.map(|r| r.as_secs()),
316 session_security_settings,
317 session_password: server_password.unwrap_or_default(),
318 });
319
320 let mut subscription = self.send_callback_subscription(connect_request).await?;
321 let status = subscription
322 .next()
323 .await
324 .ok_or(NetworkError::InternalError(
325 "Internal kernel stream died (connect)",
326 ))?;
327
328 return match map_errors(status)? {
329 NodeResult::ConnectSuccess(ConnectSuccess {
330 ticket: _,
331 session_cid: cid,
332 remote_addr: _,
333 is_personal: _,
334 v_conn_type,
335 services,
336 welcome_message: _,
337 channel,
338 udp_rx_opt: udp_channel_rx,
339 session_security_settings,
340 }) => Ok(CitadelClientServerConnection {
341 remote: ClientServerRemote::new(
342 v_conn_type,
343 self.remote_ref().clone(),
344 session_security_settings,
345 None,
346 None,
347 ),
348 channel: Some(*channel),
349 udp_channel_rx,
350 services,
351 cid,
352 session_security_settings,
353 }),
354 NodeResult::ConnectFail(ConnectFail {
355 ticket: _,
356 cid_opt: _,
357 error_message: err,
358 }) => Err(NetworkError::Generic(err)),
359 NodeResult::Disconnect(err) => {
360 return Err(NetworkError::Generic(err.message));
361 }
362 res => Err(NetworkError::msg(format!(
363 "[connect] An unexpected response occurred: {res:?}"
364 ))),
365 };
366 }
367
368 async fn connect_with_defaults(
371 &self,
372 auth: AuthenticationRequest,
373 ) -> Result<CitadelClientServerConnection<R>, NetworkError> {
374 self.connect(
375 auth,
376 Default::default(),
377 Default::default(),
378 None,
379 Default::default(),
380 Default::default(),
381 )
382 .await
383 }
384
385 async fn find_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
398 &self,
399 local_user: T,
400 peer: P,
401 ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
402 let account_manager = self.account_manager();
403 account_manager
404 .find_target_information(local_user, peer)
405 .await?
406 .map(move |(cid, peer)| {
407 if peer.parent_icid != 0 {
408 SymmetricIdentifierHandleRef {
409 user: VirtualTargetType::ExternalGroupPeer {
410 session_cid: cid,
411 interserver_cid: peer.parent_icid,
412 peer_cid: peer.cid,
413 },
414 remote: self.remote_ref(),
415 target_username: None,
416 }
417 } else {
418 SymmetricIdentifierHandleRef {
419 user: VirtualTargetType::LocalGroupPeer {
420 session_cid: cid,
421 peer_cid: peer.cid,
422 },
423 remote: self.remote_ref(),
424 target_username: None,
425 }
426 }
427 })
428 .ok_or_else(|| NetworkError::msg("Target pair not found"))
429 }
430
431 async fn propose_target<T: Into<UserIdentifier> + Send, P: Into<UserIdentifier> + Send>(
434 &self,
435 local_user: T,
436 peer: P,
437 ) -> Result<SymmetricIdentifierHandleRef<'_, R>, NetworkError> {
438 let local_cid = self.get_session_cid(local_user).await?;
439 match peer.into() {
440 UserIdentifier::ID(peer_cid) => Ok(SymmetricIdentifierHandleRef {
441 user: VirtualTargetType::LocalGroupPeer {
442 session_cid: local_cid,
443 peer_cid,
444 },
445 remote: self.remote_ref(),
446 target_username: None,
447 }),
448 UserIdentifier::Username(uname) => {
449 let peer_cid = self
450 .remote_ref()
451 .account_manager()
452 .find_target_information(local_cid, uname.clone())
453 .await?
454 .map(|r| r.1.cid)
455 .unwrap_or(0);
456 Ok(SymmetricIdentifierHandleRef {
457 user: VirtualTargetType::LocalGroupPeer {
458 session_cid: local_cid,
459 peer_cid,
460 },
461 remote: self.remote_ref(),
462 target_username: Some(uname),
463 })
464 }
465 }
466 }
467
468 async fn get_local_group_peers<T: Into<UserIdentifier> + Send>(
471 &self,
472 local_user: T,
473 limit: Option<usize>,
474 ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
475 let local_cid = self.get_session_cid(local_user).await?;
476 let command = NodeRequest::PeerCommand(PeerCommand {
477 session_cid: local_cid,
478 command: PeerSignal::GetRegisteredPeers {
479 peer_conn_type: NodeConnectionType::LocalGroupPeerToLocalGroupServer(local_cid),
480 response: None,
481 limit: limit.map(|r| r as i32),
482 },
483 });
484
485 let mut stream = self.send_callback_subscription(command).await?;
486
487 while let Some(status) = stream.next().await {
488 if let NodeResult::PeerEvent(PeerEvent {
489 event:
490 PeerSignal::GetRegisteredPeers {
491 peer_conn_type: _,
492 response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
493 limit: _,
494 },
495 ticket: _,
496 ..
497 }) = map_errors(status)?
498 {
499 return Ok(peer_info
500 .into_iter()
501 .zip(is_onlines.into_iter())
502 .filter_map(|(peer_info, is_online)| {
503 peer_info.map(|info| LocalGroupPeerFullInfo {
504 cid: info.cid,
505 username: Some(info.username),
506 full_name: Some(info.full_name),
507 is_online,
508 })
509 })
510 .collect());
511 }
512 }
513
514 Err(NetworkError::InternalError(
515 "Internal kernel stream died (get_local_group_peers)",
516 ))
517 }
518
519 async fn get_local_group_mutual_peers<T: Into<UserIdentifier> + Send>(
521 &self,
522 local_user: T,
523 ) -> Result<Vec<LocalGroupPeerFullInfo>, NetworkError> {
524 let local_cid = self.get_session_cid(local_user).await?;
525 let command = NodeRequest::PeerCommand(PeerCommand {
526 session_cid: local_cid,
527 command: PeerSignal::GetMutuals {
528 v_conn_type: NodeConnectionType::LocalGroupPeerToLocalGroupServer(local_cid),
529 response: None,
530 },
531 });
532
533 let mut stream = self.send_callback_subscription(command).await?;
534
535 while let Some(status) = stream.next().await {
536 if let NodeResult::PeerEvent(PeerEvent {
537 event:
538 PeerSignal::GetMutuals {
539 v_conn_type: _,
540 response: Some(PeerResponse::RegisteredCids(peer_info, is_onlines)),
541 },
542 ticket: _,
543 ..
544 }) = map_errors(status)?
545 {
546 return Ok(peer_info
547 .into_iter()
548 .zip(is_onlines.into_iter())
549 .filter_map(|(peer_info, is_online)| {
550 peer_info.map(|info| LocalGroupPeerFullInfo {
551 cid: info.cid,
552 username: Some(info.username),
553 full_name: Some(info.full_name),
554 is_online,
555 })
556 })
557 .collect());
558 }
559 }
560
561 Err(NetworkError::InternalError(
562 "Internal kernel stream died (get_local_group_mutual_peers)",
563 ))
564 }
565
566 #[doc(hidden)]
567 fn remote_ref(&self) -> &NodeRemote<R>;
568
569 #[doc(hidden)]
570 async fn get_session_cid<T: Into<UserIdentifier> + Send>(
571 &self,
572 local_user: T,
573 ) -> Result<u64, NetworkError> {
574 let account_manager = self.account_manager();
575 Ok(account_manager
576 .find_local_user_information(local_user)
577 .await?
578 .ok_or(NetworkError::InvalidRequest("User does not exist"))?)
579 }
580}
581
582pub fn map_errors<R: Ratchet>(result: NodeResult<R>) -> Result<NodeResult<R>, NetworkError> {
583 match result {
584 NodeResult::ConnectFail(ConnectFail {
585 ticket: _,
586 cid_opt: _,
587 error_message: err,
588 }) => Err(NetworkError::Generic(err)),
589 NodeResult::RegisterFailure(RegisterFailure {
590 ticket: _,
591 error_message: err,
592 }) => Err(NetworkError::Generic(err)),
593 NodeResult::InternalServerError(InternalServerError {
594 ticket_opt: _,
595 cid_opt: _,
596 message: err,
597 }) => Err(NetworkError::Generic(err)),
598 NodeResult::PeerEvent(PeerEvent {
599 event:
600 PeerSignal::SignalError {
601 ticket: _,
602 error: err,
603 peer_connection_type: _,
604 },
605 ticket: _,
606 ..
607 }) => Err(NetworkError::Generic(err)),
608 res => Ok(res),
609 }
610}
611
612impl<R: Ratchet> ProtocolRemoteExt<R> for NodeRemote<R> {
613 fn remote_ref(&self) -> &NodeRemote<R> {
614 self
615 }
616}
617
618impl<R: Ratchet> ProtocolRemoteExt<R> for ClientServerRemote<R> {
619 fn remote_ref(&self) -> &NodeRemote<R> {
620 &self.inner
621 }
622}
623
624#[async_trait]
625pub trait ProtocolRemoteTargetExt<R: Ratchet>: TargetLockedRemote<R> {
627 async fn send_file_with_custom_opts<T: ObjectSource>(
629 &self,
630 source: T,
631 chunk_size: usize,
632 transfer_type: TransferType,
633 ) -> Result<(), NetworkError> {
634 let chunk_size = if chunk_size == 0 {
635 None
636 } else {
637 Some(chunk_size)
638 };
639 let session_cid = self.user().get_session_cid();
640 let user = *self.user();
641 let remote = self.remote();
642
643 let mut stream = remote
644 .send_callback_subscription(NodeRequest::SendObject(SendObject {
645 source: Box::new(source),
646 chunk_size,
647 session_cid,
648 v_conn_type: user,
649 transfer_type,
650 }))
651 .await?;
652
653 while let Some(event) = stream.next().await {
654 match map_errors(event)? {
655 NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
656 return handle
657 .transfer_file()
658 .await
659 .map_err(|err| NetworkError::Generic(err.into_string()));
660 }
661
662 NodeResult::PeerEvent(PeerEvent {
663 event: PeerSignal::SignalReceived { .. },
664 ..
665 }) => {}
666
667 res => {
668 log::warn!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {res:?}")
669 }
670 }
671 }
672
673 Err(NetworkError::InternalError("File transfer stream died"))
674 }
675
676 async fn send_file<T: ObjectSource>(&self, source: T) -> Result<(), NetworkError> {
678 self.send_file_with_custom_opts(source, 0, TransferType::FileTransfer)
679 .await
680 }
681
682 async fn remote_encrypted_virtual_filesystem_push_custom_chunking<
685 T: ObjectSource,
686 P: Into<PathBuf> + Send,
687 >(
688 &self,
689 source: T,
690 virtual_directory: P,
691 chunk_size: usize,
692 security_level: SecurityLevel,
693 ) -> Result<(), NetworkError> {
694 self.can_use_revfs()?;
695 let mut virtual_path = virtual_directory.into();
696 virtual_path = prepare_virtual_path(virtual_path);
697 validate_virtual_path(&virtual_path)
698 .map_err(|err| NetworkError::Generic(err.into_string()))?;
699 let tx_type = TransferType::RemoteEncryptedVirtualFilesystem {
700 virtual_path,
701 security_level,
702 };
703 self.send_file_with_custom_opts(source, chunk_size, tx_type)
704 .await
705 }
706
707 async fn remote_encrypted_virtual_filesystem_push<T: ObjectSource, P: Into<PathBuf> + Send>(
710 &self,
711 source: T,
712 virtual_directory: P,
713 security_level: SecurityLevel,
714 ) -> Result<(), NetworkError> {
715 self.remote_encrypted_virtual_filesystem_push_custom_chunking(
716 source,
717 virtual_directory,
718 0,
719 security_level,
720 )
721 .await
722 }
723
724 async fn remote_encrypted_virtual_filesystem_pull<P: Into<PathBuf> + Send>(
727 &self,
728 virtual_directory: P,
729 transfer_security_level: SecurityLevel,
730 delete_on_pull: bool,
731 ) -> Result<PathBuf, NetworkError> {
732 self.can_use_revfs()?;
733 let request = NodeRequest::PullObject(PullObject {
734 v_conn: *self.user(),
735 virtual_dir: virtual_directory.into(),
736 delete_on_pull,
737 transfer_security_level,
738 });
739
740 let mut stream = self.remote().send_callback_subscription(request).await?;
741
742 while let Some(event) = stream.next().await {
743 match map_errors(event)? {
744 NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => {
745 return handle
746 .receive_file()
747 .await
748 .map_err(|err| NetworkError::Generic(err.into_string()));
749 }
750
751 NodeResult::PeerEvent(PeerEvent {
752 event: PeerSignal::SignalReceived { .. },
753 ..
754 }) => {}
755
756 res => {
757 log::error!(target: "citadel", "Invalid NodeResult for REVFS FileTransfer request received: {res:?}");
758 return Err(NetworkError::InternalError(
759 "Received invalid response from protocol",
760 ));
761 }
762 }
763 }
764
765 Err(NetworkError::InternalError(
766 "REVFS File transfer stream died",
767 ))
768 }
769
770 async fn remote_encrypted_virtual_filesystem_delete<P: Into<PathBuf> + Send>(
774 &self,
775 virtual_directory: P,
776 ) -> Result<(), NetworkError> {
777 self.can_use_revfs()?;
778 let request = NodeRequest::DeleteObject(DeleteObject {
779 v_conn: *self.user(),
780 virtual_dir: virtual_directory.into(),
781 security_level: Default::default(),
782 });
783
784 let mut stream = self.remote().send_callback_subscription(request).await?;
785 while let Some(event) = stream.next().await {
786 match map_errors(event)? {
787 NodeResult::ReVFS(result) => {
788 return if let Some(error) = result.error_message {
789 Err(NetworkError::Generic(error))
790 } else {
791 Ok(())
792 }
793 }
794
795 evt => {
796 log::error!(target: "citadel", "Invalid NodeResult for REVFS Delete request received: {evt:?}");
797 }
798 }
799 }
800
801 Err(NetworkError::InternalError("REVFS Delete stream died"))
802 }
803
804 async fn connect_to_peer_custom(
806 &self,
807 session_security_settings: SessionSecuritySettings,
808 udp_mode: UdpMode,
809 peer_session_password: Option<PreSharedKey>,
810 ) -> Result<PeerConnectSuccess<R>, NetworkError> {
811 let session_cid = self.user().get_session_cid();
812 let peer_target = self.try_as_peer_connection().await?;
813
814 let mut stream = self
815 .remote()
816 .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
817 session_cid,
818 command: PeerSignal::PostConnect {
819 peer_conn_type: peer_target,
820 ticket_opt: None,
821 invitee_response: None,
822 session_security_settings,
823 udp_mode,
824 session_password: peer_session_password,
825 },
826 }))
827 .await?;
828
829 while let Some(status) = stream.next().await {
830 match map_errors(status)? {
831 NodeResult::PeerChannelCreated(PeerChannelCreated {
832 ticket: _,
833 channel,
834 udp_rx_opt,
835 }) => {
836 let username = self.target_username().map(ToString::to_string);
837 let remote = PeerRemote {
838 inner: self.remote().clone(),
839 peer: peer_target.as_virtual_connection(),
840 username,
841 session_security_settings,
842 };
843
844 return Ok(PeerConnectSuccess {
845 remote,
846 channel: *channel,
847 udp_channel_rx: udp_rx_opt,
848 incoming_object_transfer_handles: None,
849 });
850 }
851
852 NodeResult::PeerEvent(PeerEvent {
853 event:
854 PeerSignal::PostConnect {
855 invitee_response, ..
856 },
857 ..
858 }) => match invitee_response {
859 Some(PeerResponse::Timeout) => {
860 return Err(NetworkError::msg("Peer did not respond in time"))
861 }
862 Some(PeerResponse::Decline) => {
863 return Err(NetworkError::msg("Peer declined to connect"))
864 }
865 _ => {}
866 },
867
868 _ => {}
869 }
870 }
871
872 Err(NetworkError::InternalError(
873 "Internal kernel stream died (connect_to_peer_custom)",
874 ))
875 }
876
877 async fn connect_to_peer(&self) -> Result<PeerConnectSuccess<R>, NetworkError> {
879 self.connect_to_peer_custom(Default::default(), Default::default(), Default::default())
880 .await
881 }
882
883 async fn register_to_peer(&self) -> Result<PeerRegisterStatus, NetworkError> {
885 let session_cid = self.user().get_session_cid();
886 let peer_target = self.try_as_peer_connection().await?;
887 let local_username = self
889 .remote()
890 .account_manager()
891 .get_username_by_cid(session_cid)
892 .await?
893 .ok_or_else(|| NetworkError::msg("Unable to find username for local user"))?;
894 let peer_username_opt = self.target_username().map(ToString::to_string);
895
896 let mut stream = self
897 .remote()
898 .send_callback_subscription(NodeRequest::PeerCommand(PeerCommand {
899 session_cid,
900 command: PeerSignal::PostRegister {
901 peer_conn_type: peer_target,
902 inviter_username: local_username,
903 invitee_username: peer_username_opt,
904 ticket_opt: None,
905 invitee_response: None,
906 },
907 }))
908 .await?;
909
910 while let Some(status) = stream.next().await {
911 if let NodeResult::PeerEvent(PeerEvent {
912 event:
913 PeerSignal::PostRegister {
914 peer_conn_type: _,
915 inviter_username: _,
916 invitee_username: _,
917 ticket_opt: _,
918 invitee_response: Some(resp),
919 },
920 ticket: _,
921 ..
922 }) = map_errors(status)?
923 {
924 match resp {
925 PeerResponse::Accept(..) => return Ok(PeerRegisterStatus::Accepted),
926 PeerResponse::Decline => return Ok(PeerRegisterStatus::Declined),
927 PeerResponse::Timeout => return Ok(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) }),
928 _ => {}
929 }
930 }
931 }
932
933 Err(NetworkError::Generic(format!(
934 "Internal kernel stream died (register_to_peer): {:?}",
935 stream.callback_key()
936 )))
937 }
938
939 async fn deregister(&self) -> Result<(), NetworkError> {
943 if let Ok(peer_conn) = self.try_as_peer_connection().await {
944 let peer_request = PeerSignal::Deregister {
945 peer_conn_type: peer_conn,
946 };
947 let session_cid = self.user().get_session_cid();
948 let request = NodeRequest::PeerCommand(PeerCommand {
949 session_cid,
950 command: peer_request,
951 });
952
953 let mut subscription = self.remote().send_callback_subscription(request).await?;
954 while let Some(result) = subscription.next().await {
955 if let NodeResult::PeerEvent(PeerEvent {
956 event: PeerSignal::DeregistrationSuccess { .. },
957 ticket: _,
958 ..
959 }) = map_errors(result)?
960 {
961 return Ok(());
962 }
963 }
964 } else {
965 let cid = self.user().get_session_cid();
967 let request = NodeRequest::DeregisterFromHypernode(DeregisterFromHypernode {
968 session_cid: cid,
969 v_conn_type: *self.user(),
970 });
971 let mut subscription = self.remote().send_callback_subscription(request).await?;
972 while let Some(result) = subscription.next().await {
973 match map_errors(result)? {
974 NodeResult::DeRegistration(DeRegistration {
975 session_cid: _,
976 ticket_opt: _,
977 success: true,
978 }) => return Ok(()),
979 NodeResult::DeRegistration(DeRegistration {
980 session_cid: _,
981 ticket_opt: _,
982 success: false,
983 }) => {
984 return Err(NetworkError::msg(
985 "Unable to deregister: status=false".to_string(),
986 ))
987 }
988
989 _ => {}
990 }
991 }
992 }
993
994 Err(NetworkError::InternalError("Deregister ended unexpectedly"))
995 }
996
997 async fn disconnect(&self) -> Result<(), NetworkError> {
998 if let Ok(peer_conn) = self.try_as_peer_connection().await {
999 if let PeerConnectionType::LocalGroupPeer {
1000 session_cid,
1001 peer_cid: _,
1002 } = peer_conn
1003 {
1004 let request = NodeRequest::PeerCommand(PeerCommand {
1005 session_cid,
1006 command: PeerSignal::Disconnect {
1007 peer_conn_type: peer_conn,
1008 disconnect_response: None,
1009 },
1010 });
1011
1012 let mut subscription = self.remote().send_callback_subscription(request).await?;
1013
1014 while let Some(event) = subscription.next().await {
1015 if let NodeResult::PeerEvent(PeerEvent {
1016 event:
1017 PeerSignal::Disconnect {
1018 peer_conn_type: _,
1019 disconnect_response: Some(_),
1020 },
1021 ticket: _,
1022 ..
1023 }) = map_errors(event)?
1024 {
1025 return Ok(());
1026 }
1027 }
1028
1029 Err(NetworkError::InternalError(
1030 "Unable to receive valid disconnect event",
1031 ))
1032 } else {
1033 Err(NetworkError::msg(
1034 "External group peer functionality not enabled",
1035 ))
1036 }
1037 } else {
1038 let cid = self.user().get_session_cid();
1040 let request =
1041 NodeRequest::DisconnectFromHypernode(DisconnectFromHypernode { session_cid: cid });
1042
1043 let mut subscription = self.remote().send_callback_subscription(request).await?;
1044 while let Some(event) = subscription.next().await {
1045 if let NodeResult::Disconnect(Disconnect {
1046 success, message, ..
1047 }) = map_errors(event)?
1048 {
1049 return if success {
1050 Ok(())
1051 } else {
1052 Err(NetworkError::msg(message))
1053 };
1054 }
1055 }
1056
1057 Err(NetworkError::InternalError(
1058 "Unable to receive valid disconnect event",
1059 ))
1060 }
1061 }
1062
1063 async fn create_group(
1064 &self,
1065 initial_users_to_invite: Option<Vec<UserIdentifier>>,
1066 ) -> Result<GroupChannel, NetworkError> {
1067 let session_cid = self.user().get_session_cid();
1068
1069 let mut initial_users = vec![];
1070 let options = MessageGroupOptions::default();
1072 if let Some(initial_users_to_invite) = initial_users_to_invite {
1076 for user in initial_users_to_invite {
1077 initial_users.push(
1078 self.remote()
1079 .account_manager()
1080 .find_target_information(session_cid, user.clone())
1081 .await
1082 .map_err(|err| NetworkError::msg(err.into_string()))?
1083 .ok_or_else(|| {
1084 NetworkError::msg(format!(
1085 "Account {user:?} not found for local user {session_cid:?}"
1086 ))
1087 })
1088 .map(|r| r.1.cid)?,
1089 )
1090 }
1091 }
1092
1093 let group_request = GroupBroadcast::Create {
1094 initial_invitees: initial_users,
1095 options,
1096 };
1097 let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1098 session_cid,
1099 command: group_request,
1100 });
1101 let mut subscription = self.remote().send_callback_subscription(request).await?;
1102 while let Some(evt) = subscription.next().await {
1103 if let NodeResult::GroupChannelCreated(GroupChannelCreated {
1104 ticket: _,
1105 channel,
1106 session_cid: _,
1107 }) = evt
1108 {
1109 return Ok(channel);
1110 }
1111 }
1112
1113 Err(NetworkError::InternalError(
1114 "Create_group ended unexpectedly",
1115 ))
1116 }
1117
1118 async fn list_owned_groups(&self) -> Result<Vec<MessageGroupKey>, NetworkError> {
1120 let session_cid = self.user().get_session_cid();
1121 let cid_to_check_for = match self.try_as_peer_connection().await {
1122 Ok(res) => res.get_original_target_cid(),
1123 _ => session_cid,
1124 };
1125 let group_request = GroupBroadcast::ListGroupsFor {
1126 cid: cid_to_check_for,
1127 };
1128 let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand {
1129 session_cid,
1130 command: group_request,
1131 });
1132
1133 let mut subscription = self.remote().send_callback_subscription(request).await?;
1134
1135 while let Some(evt) = subscription.next().await {
1136 if let NodeResult::GroupEvent(GroupEvent {
1137 session_cid: _,
1138 ticket: _,
1139 event: GroupBroadcast::ListResponse { groups },
1140 }) = map_errors(evt)?
1141 {
1142 return Ok(groups);
1143 }
1144 }
1145
1146 Err(NetworkError::InternalError(
1147 "List_members ended unexpectedly",
1148 ))
1149 }
1150
1151 async fn rekey(&self) -> Result<Option<u32>, NetworkError> {
1155 let request = NodeRequest::ReKey(ReKey {
1156 v_conn_type: *self.user(),
1157 });
1158 let mut subscription = self.remote().send_callback_subscription(request).await?;
1159
1160 while let Some(evt) = subscription.next().await {
1161 if let NodeResult::ReKeyResult(result) = evt {
1162 return match result.status {
1163 ReKeyReturnType::Success { version } => Ok(Some(version)),
1164 ReKeyReturnType::AlreadyInProgress => Ok(None),
1165 ReKeyReturnType::Failure { err } => {
1166 Err(NetworkError::Generic(format!("Rekey failed: {err}")))
1167 }
1168 };
1169 }
1170 }
1171
1172 Err(NetworkError::InternalError("Rekey ended unexpectedly"))
1173 }
1174
1175 async fn is_peer_registered(&self) -> Result<bool, NetworkError> {
1177 let target = self.try_as_peer_connection().await?;
1178 if let PeerConnectionType::LocalGroupPeer {
1179 session_cid: local_cid,
1180 peer_cid,
1181 } = target
1182 {
1183 let peers = self.remote().get_local_group_peers(local_cid, None).await?;
1184 citadel_logging::info!(target: "citadel", "Checking to see if {target} is registered in {peers:?}");
1185 Ok(peers.iter().any(|p| p.cid == peer_cid))
1186 } else {
1187 Err(NetworkError::Generic(
1188 "External group peers are not supported yet".to_string(),
1189 ))
1190 }
1191 }
1192
1193 #[doc(hidden)]
1194 async fn try_as_peer_connection(&self) -> Result<PeerConnectionType, NetworkError> {
1195 let verified_return = |user: &VirtualTargetType| {
1196 user.try_as_peer_connection()
1197 .ok_or(NetworkError::InvalidRequest("Target is not a peer"))
1198 };
1199
1200 if self.user().get_target_cid() == 0 {
1201 let peer_username = self
1204 .target_username()
1205 .ok_or_else(|| NetworkError::msg("target_cid=0, yet, no username was provided"))?;
1206 let session_cid = self.user().get_session_cid();
1207 let expected_peer_cid = self
1208 .remote()
1209 .account_manager()
1210 .get_persistence_handler()
1211 .get_cid_by_username(peer_username);
1212 let peer_cid = self
1215 .remote()
1216 .account_manager()
1217 .find_target_information(session_cid, peer_username)
1218 .await
1219 .map_err(|err| NetworkError::Generic(err.into_string()))?
1220 .map(|r| r.1.cid)
1221 .unwrap_or(expected_peer_cid);
1222
1223 let mut user = *self.user();
1224 user.set_target_cid(peer_cid);
1225 verified_return(&user)
1226 } else {
1227 verified_return(self.user())
1228 }
1229 }
1230
1231 #[doc(hidden)]
1232 fn can_use_revfs(&self) -> Result<(), NetworkError> {
1233 if let Some(sess) = self.session_security_settings() {
1234 if sess.crypto_params.kem_algorithm == KemAlgorithm::Kyber {
1235 Ok(())
1236 } else {
1237 Err(NetworkError::InvalidRequest(
1238 "RE-VFS can only be used with Kyber KEM",
1239 ))
1240 }
1241 } else {
1242 Err(NetworkError::InvalidRequest(
1243 "RE-VFS cannot be used with this remote type",
1244 ))
1245 }
1246 }
1247}
1248
1249impl<T: TargetLockedRemote<R>, R: Ratchet> ProtocolRemoteTargetExt<R> for T {}
1250
1251pub mod results {
1252 use crate::prefabs::client::peer_connection::FileTransferHandleRx;
1253 use crate::prelude::{PeerChannel, UdpChannel};
1254 use crate::remote_ext::remote_specialization::PeerRemote;
1255 use citadel_io::tokio::sync::oneshot::Receiver;
1256 use citadel_proto::prelude::*;
1257 use std::fmt::Debug;
1258
1259 pub struct PeerConnectSuccess<R: Ratchet> {
1260 pub channel: PeerChannel<R>,
1261 pub udp_channel_rx: Option<Receiver<UdpChannel<R>>>,
1262 pub remote: PeerRemote<R>,
1263 pub(crate) incoming_object_transfer_handles: Option<FileTransferHandleRx>,
1266 }
1267
1268 impl<R: Ratchet> Debug for PeerConnectSuccess<R> {
1269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1270 f.debug_struct("PeerConnectSuccess")
1271 .field("channel", &self.channel)
1272 .field("udp_channel_rx", &self.udp_channel_rx)
1273 .finish()
1274 }
1275 }
1276
1277 impl<R: Ratchet> PeerConnectSuccess<R> {
1278 pub fn get_incoming_file_transfer_handle(
1280 &mut self,
1281 ) -> Result<FileTransferHandleRx, NetworkError> {
1282 self.incoming_object_transfer_handles
1283 .take()
1284 .ok_or(NetworkError::InternalError(
1285 "This function has already been called",
1286 ))
1287 }
1288 }
1289
1290 pub enum PeerRegisterStatus {
1291 Accepted,
1292 Declined,
1293 Failed { reason: Option<String> },
1294 }
1295
1296 #[derive(Clone, Debug)]
1297 pub struct LocalGroupPeer {
1298 pub cid: u64,
1299 pub is_online: bool,
1300 }
1301
1302 #[derive(Clone, Debug)]
1303 pub struct LocalGroupPeerFullInfo {
1304 pub cid: u64,
1305 pub username: Option<String>,
1306 pub full_name: Option<String>,
1307 pub is_online: bool,
1308 }
1309}
1310
1311pub mod remote_specialization {
1312 use crate::prelude::*;
1313 use std::ops::{Deref, DerefMut};
1314
1315 #[derive(Debug, Clone)]
1316 pub struct PeerRemote<R: Ratchet> {
1317 pub(crate) inner: NodeRemote<R>,
1318 pub(crate) peer: VirtualTargetType,
1319 pub(crate) username: Option<String>,
1320 pub(crate) session_security_settings: SessionSecuritySettings,
1321 }
1322
1323 impl<R: Ratchet> Deref for PeerRemote<R> {
1324 type Target = NodeRemote<R>;
1325 fn deref(&self) -> &Self::Target {
1326 &self.inner
1327 }
1328 }
1329
1330 impl<R: Ratchet> DerefMut for PeerRemote<R> {
1331 fn deref_mut(&mut self) -> &mut Self::Target {
1332 &mut self.inner
1333 }
1334 }
1335
1336 impl<R: Ratchet> TargetLockedRemote<R> for PeerRemote<R> {
1337 fn user(&self) -> &VirtualTargetType {
1338 &self.peer
1339 }
1340 fn remote(&self) -> &NodeRemote<R> {
1341 &self.inner
1342 }
1343 fn target_username(&self) -> Option<&str> {
1344 self.username.as_deref()
1345 }
1346 fn user_mut(&mut self) -> &mut VirtualTargetType {
1347 &mut self.peer
1348 }
1349
1350 fn session_security_settings(&self) -> Option<&SessionSecuritySettings> {
1351 Some(&self.session_security_settings)
1352 }
1353 }
1354}
1355
1356#[cfg(test)]
1357mod tests {
1358 use crate::prefabs::client::single_connection::SingleClientServerConnectionKernel;
1359 use crate::prefabs::client::DefaultServerConnectionSettingsBuilder;
1360 use crate::prelude::*;
1361 use citadel_io::tokio;
1362 use rstest::rstest;
1363 use std::net::SocketAddr;
1364 use std::sync::atomic::{AtomicBool, Ordering};
1365 use std::sync::Arc;
1366 use uuid::Uuid;
1367
1368 pub struct ReceiverFileTransferKernel<R: Ratchet>(
1369 pub Option<NodeRemote<R>>,
1370 pub Arc<AtomicBool>,
1371 );
1372
1373 #[async_trait]
1374 impl<R: Ratchet> NetKernel<R> for ReceiverFileTransferKernel<R> {
1375 fn load_remote(&mut self, node_remote: NodeRemote<R>) -> Result<(), NetworkError> {
1376 self.0 = Some(node_remote);
1377 Ok(())
1378 }
1379
1380 async fn on_start(&self) -> Result<(), NetworkError> {
1381 Ok(())
1382 }
1383
1384 async fn on_node_event_received(&self, message: NodeResult<R>) -> Result<(), NetworkError> {
1385 log::trace!(target: "citadel", "SERVER received {:?}", message);
1386 if let NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) =
1387 map_errors(message)?
1388 {
1389 let mut path = None;
1390 handle
1392 .accept()
1393 .map_err(|err| NetworkError::msg(err.into_string()))?;
1394
1395 use citadel_types::proto::ObjectTransferStatus;
1396 use futures::StreamExt;
1397 while let Some(status) = handle.next().await {
1398 match status {
1399 ObjectTransferStatus::ReceptionComplete => {
1400 log::trace!(target: "citadel", "Server has finished receiving the file!");
1401 let cmp = include_bytes!("../../resources/TheBridge.pdf");
1402 let streamed_data = citadel_io::tokio::fs::read(path.clone().unwrap())
1403 .await
1404 .unwrap();
1405 assert_eq!(
1406 cmp,
1407 streamed_data.as_slice(),
1408 "Original data and streamed data does not match"
1409 );
1410
1411 self.1.store(true, Ordering::Relaxed);
1412 self.0.clone().unwrap().shutdown().await?;
1413 }
1414
1415 ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => {
1416 path = Some(file_path);
1417 assert_eq!(vfm.name, "TheBridge.pdf")
1418 }
1419
1420 _ => {}
1421 }
1422 }
1423 }
1424
1425 Ok(())
1426 }
1427
1428 async fn on_stop(&mut self) -> Result<(), NetworkError> {
1429 Ok(())
1430 }
1431 }
1432
1433 pub fn server_info<'a, R: Ratchet>(
1434 switch: Arc<AtomicBool>,
1435 ) -> (NodeFuture<'a, ReceiverFileTransferKernel<R>>, SocketAddr) {
1436 crate::test_common::server_test_node(ReceiverFileTransferKernel(None, switch), |_| {})
1437 }
1438
1439 #[rstest]
1440 #[case(
1441 EncryptionAlgorithm::AES_GCM_256,
1442 KemAlgorithm::Kyber,
1443 SigAlgorithm::None
1444 )]
1445 #[case(
1446 EncryptionAlgorithm::KyberHybrid,
1447 KemAlgorithm::Kyber,
1448 SigAlgorithm::Dilithium65
1449 )]
1450 #[timeout(std::time::Duration::from_secs(90))]
1451 #[tokio::test]
1452 async fn test_c2s_file_transfer(
1453 #[case] enx: EncryptionAlgorithm,
1454 #[case] kem: KemAlgorithm,
1455 #[case] sig: SigAlgorithm,
1456 ) {
1457 citadel_logging::setup_log();
1458 let client_success = &AtomicBool::new(false);
1459 let server_success = &Arc::new(AtomicBool::new(false));
1460 let (server, server_addr) = server_info::<StackedRatchet>(server_success.clone());
1461 let uuid = Uuid::new_v4();
1462
1463 let session_security_settings = SessionSecuritySettingsBuilder::default()
1464 .with_crypto_params(enx + kem + sig)
1465 .build()
1466 .unwrap();
1467
1468 let server_connection_settings =
1469 DefaultServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid)
1470 .with_session_security_settings(session_security_settings)
1471 .disable_udp()
1472 .build()
1473 .unwrap();
1474
1475 let client_kernel = SingleClientServerConnectionKernel::new(
1476 server_connection_settings,
1477 |connection| async move {
1478 log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***");
1479 connection
1480 .send_file_with_custom_opts(
1481 "../resources/TheBridge.pdf",
1482 32 * 1024,
1483 TransferType::FileTransfer,
1484 )
1485 .await
1486 .unwrap();
1487 log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***");
1488 client_success.store(true, Ordering::Relaxed);
1489 connection.shutdown_kernel().await
1490 },
1491 );
1492
1493 let client = DefaultNodeBuilder::default().build(client_kernel).unwrap();
1494
1495 let joined = futures::future::try_join(server, client);
1496
1497 let _ = joined.await.unwrap();
1498
1499 assert!(client_success.load(Ordering::Relaxed));
1500 assert!(server_success.load(Ordering::Relaxed));
1501 }
1502}