import importlib.util import queue import time import unittest from pathlib import Path import RNS BRIDGE_PATH = Path(__file__).with_name("presence_bridge.py") def load_bridge(): spec = importlib.util.spec_from_file_location("presence_bridge_under_test", BRIDGE_PATH) module = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(module) return module class FakeLink: def __init__(self): self.closed_callback = None self.packet_callback = None self.remote_identified_callback = None self.resource_strategy = None self.resource_callback = None self.resource_started_callback = None self.resource_concluded_callback = None self.teardown_called = False def get_mdu(self): return 4096 def set_link_closed_callback(self, callback): self.closed_callback = callback def set_packet_callback(self, callback): self.packet_callback = callback def set_remote_identified_callback(self, callback): self.remote_identified_callback = callback def set_resource_strategy(self, strategy): self.resource_strategy = strategy def set_resource_callback(self, callback): self.resource_callback = callback def set_resource_started_callback(self, callback): self.resource_started_callback = callback def set_resource_concluded_callback(self, callback): self.resource_concluded_callback = callback def teardown(self): self.teardown_called = True class FakePacket: def __init__(self, link): self.link = link class FakeRnsPacket: MDU = 500 ENCRYPTED_MDU = 500 sent_links = [] sent_payloads = [] def __init__(self, link, data, create_receipt=False): self.link = link self.data = data self.create_receipt = create_receipt def send(self): self.__class__.sent_links.append(self.link) self.__class__.sent_payloads.append(self.data) return True class FakeDestination: def __init__(self): self.hash = bytes.fromhex("44" * 16) class PresenceBridgeOverlayAudioPromotionTest(unittest.TestCase): def setUp(self): self.bridge = load_bridge() self.sender_peer_hash = "22" * 16 self.original_rns_packet = RNS.Packet def tearDown(self): RNS.Packet = self.original_rns_packet def drain_audio_queue(self): while True: try: self.bridge._audio_binary_out_queue.get_nowait() except queue.Empty: return def group_audio_wire(self): room = b"gcall-qortal-1" sender_hash = bytes.fromhex(self.sender_peer_hash) payload = b"opus" return ( self.bridge._GROUP_AUDIO_BINARY_MAGIC + bytes( ( self.bridge._GROUP_AUDIO_BINARY_VERSION, len(room), len(sender_hash), ) ) + len(payload).to_bytes(2, "big") + room + sender_hash + payload ) def group_audio_heartbeat_wire(self): return self.bridge.json.dumps( { "t": self.bridge._GROUP_AUDIO_HEARTBEAT_WIRE_TYPE, "R": "gcall-qortal-1", "c": "PING", "m": int(time.time() * 1000), "r": self.sender_peer_hash, } ).encode("utf-8") def group_audio_heartbeat_wire_without_sender(self): return self.bridge.json.dumps( { "t": self.bridge._GROUP_AUDIO_HEARTBEAT_WIRE_TYPE, "R": "gcall-qortal-1", "c": "PING", "m": int(time.time() * 1000), } ).encode("utf-8") def qchat_file_auth_wire(self, transfer_id="transfer-1", peer_hash=None): return self.bridge.json.dumps( { "type": "QCHAT_FILE_LINK_AUTH", "transferId": transfer_id, "senderAddress": "Q-sender", "downloaderAddress": "Q-downloader", "downloaderPublicKey": "pub-downloader", "downloaderReticulumDestinationHash": peer_hash or self.sender_peer_hash, "downloaderReticulumIdentityPublicKeyBase64": "identity", "timestamp": int(time.time() * 1000), "signature": "sig", } ).encode("utf-8") def install_overlay_state(self, incoming=True): link = FakeLink() link_id = "overlay-test-link" peer_hash = "11" * 16 now = time.time() self.bridge._overlay_links_by_id[link_id] = { "link": link, "peerPresenceHash": peer_hash, "incoming": incoming, "established": True, "established_at": now, "created_at": now, "pending_packets": self.bridge.deque(maxlen=4), "last_activity_at": now, "last_rx_at": None, } self.bridge._overlay_link_ids_by_object[id(link)] = link_id self.bridge._active_overlay_link_id_by_peer_hash[peer_hash] = link_id if incoming: self.bridge._inbound_overlay_neighbors[peer_hash] = now else: self.bridge._active_overlay_neighbors[peer_hash] = now return link, link_id, peer_hash def install_audio_state( self, link_id, peer_hash=None, established=True, link=None, last_activity_at=None, ): peer_hash = peer_hash or self.sender_peer_hash link = link or FakeLink() now = time.time() self.bridge._audio_links_by_id[link_id] = { "link": link, "peerPresenceHash": peer_hash, "peerDestinationHash": peer_hash, "incoming": False, "established": established, "established_at": now if established else None, "created_at": now - 10, "last_activity_at": last_activity_at if last_activity_at is not None else now, "last_rx_at": None, "last_send_ok_at": None, "send_lock": self.bridge.threading.RLock(), "generation": 0, "closing": False, } self.bridge._audio_link_ids_by_object[id(link)] = link_id return link def drain_json_events(self): events = [] while True: try: events.append(self.bridge._json_event_queue.get_nowait()) except queue.Empty: return events def drain_json_responses(self): responses = [] while True: try: responses.append(self.bridge._json_resp_queue.get_nowait()) except queue.Empty: return responses def install_fake_rns_packet(self): FakeRnsPacket.sent_links = [] FakeRnsPacket.sent_payloads = [] RNS.Packet = FakeRnsPacket self.bridge.RNS.Packet = FakeRnsPacket self.bridge._destination = FakeDestination() def test_incoming_overlay_group_audio_promotes_link_without_teardown(self): self.drain_audio_queue() link, overlay_link_id, overlay_peer_hash = self.install_overlay_state( incoming=True ) packet = FakePacket(link) self.bridge._known_peers[self.sender_peer_hash] = object() self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] = { "desired": True, } self.bridge.on_overlay_link_packet(self.group_audio_wire(), packet) self.assertNotIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertNotIn(id(link), self.bridge._overlay_link_ids_by_object) self.assertNotIn( overlay_peer_hash, self.bridge._active_overlay_link_id_by_peer_hash, ) self.assertNotIn(overlay_peer_hash, self.bridge._inbound_overlay_neighbors) self.assertFalse(link.teardown_called) audio_link_id = self.bridge.get_audio_link_id(link) self.assertIsInstance(audio_link_id, str) audio_state = self.bridge.get_audio_link_state(audio_link_id) self.assertIsNotNone(audio_state) self.assertTrue(audio_state["incoming"]) self.assertEqual(audio_state["peerPresenceHash"], self.sender_peer_hash) self.assertEqual(audio_state["peerDestinationHash"], self.sender_peer_hash) self.assertEqual(audio_state["promoted_from_overlay_link_id"], overlay_link_id) self.assertIs(link.packet_callback, self.bridge.on_audio_link_packet) self.assertGreater(self.bridge._audio_binary_out_queue.qsize(), 0) def test_incoming_overlay_group_audio_without_desired_audio_is_not_promoted(self): self.drain_audio_queue() link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=True) packet = FakePacket(link) self.bridge._known_peers[self.sender_peer_hash] = object() self.bridge.on_overlay_link_packet(self.group_audio_wire(), packet) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_audio_link_id(link)) self.assertFalse(link.teardown_called) self.assertEqual(self.bridge._audio_binary_out_queue.qsize(), 0) def test_incoming_overlay_gac_promotes_link_when_audio_is_desired(self): self.drain_audio_queue() link, overlay_link_id, _overlay_peer_hash = self.install_overlay_state( incoming=True ) packet = FakePacket(link) self.bridge._known_peers[self.sender_peer_hash] = object() self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] = { "desired": True, } self.bridge.on_overlay_link_packet(self.group_audio_heartbeat_wire(), packet) self.assertNotIn(overlay_link_id, self.bridge._overlay_links_by_id) audio_link_id = self.bridge.get_audio_link_id(link) self.assertIsInstance(audio_link_id, str) audio_state = self.bridge.get_audio_link_state(audio_link_id) self.assertIsNotNone(audio_state) self.assertTrue(audio_state["incoming"]) self.assertEqual(audio_state["peerPresenceHash"], self.sender_peer_hash) self.assertEqual(audio_state["peerDestinationHash"], self.sender_peer_hash) self.assertIs(link.packet_callback, self.bridge.on_audio_link_packet) def test_incoming_overlay_gac_without_desired_audio_is_not_promoted(self): self.drain_audio_queue() link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=True) packet = FakePacket(link) self.bridge._known_peers[self.sender_peer_hash] = object() self.bridge.on_overlay_link_packet(self.group_audio_heartbeat_wire(), packet) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_audio_link_id(link)) self.assertFalse(link.teardown_called) def test_incoming_overlay_gac_without_sender_is_not_promoted(self): self.drain_audio_queue() link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=True) packet = FakePacket(link) self.bridge._known_peers[self.sender_peer_hash] = object() self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] = { "desired": True, } self.bridge.on_overlay_link_packet( self.group_audio_heartbeat_wire_without_sender(), packet, ) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_audio_link_id(link)) self.assertFalse(link.teardown_called) def test_stale_audio_mapping_does_not_allow_overlay_promotion(self): self.drain_audio_queue() link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=True) packet = FakePacket(link) self.bridge._known_peers[self.sender_peer_hash] = object() self.bridge._active_audio_link_id_by_peer_hash[self.sender_peer_hash] = "stale-link" self.bridge.on_overlay_link_packet(self.group_audio_wire(), packet) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_audio_link_id(link)) self.assertFalse(link.teardown_called) self.assertEqual(self.bridge._audio_binary_out_queue.qsize(), 0) def test_audio_send_with_stale_link_id_uses_established_peer_link(self): self.install_fake_rns_packet() current_link = self.install_audio_state("current-audio-link") self.bridge._active_audio_link_id_by_peer_hash[self.sender_peer_hash] = "stale-audio-link" self.bridge._outgoing_audio_link_id_by_peer_hash[self.sender_peer_hash] = "stale-audio-link" self.bridge._process_audio_batch( [ ( "stale-audio-link", "gcall-qortal-1", self.sender_peer_hash, "", int(time.time() * 1000), b"opus", ) ] ) self.assertEqual(FakeRnsPacket.sent_links, [current_link]) self.assertEqual( self.bridge._active_audio_link_id_by_peer_hash[self.sender_peer_hash], "current-audio-link", ) failures = [ frame for frame in self.drain_json_events() if frame.get("event") == "group_audio_send_failed" ] self.assertEqual(failures, []) def test_audio_heartbeat_with_stale_link_id_uses_established_peer_link(self): self.install_fake_rns_packet() current_link = self.install_audio_state("current-audio-link") self.bridge._active_audio_link_id_by_peer_hash[self.sender_peer_hash] = "stale-audio-link" self.bridge._outgoing_audio_link_id_by_peer_hash[self.sender_peer_hash] = "stale-audio-link" self.bridge.handle_send_group_audio_link_heartbeat( "req-1", { "linkId": "stale-audio-link", "peerPresenceHash": self.sender_peer_hash, "roomId": "gcall-qortal-1", "command": "PING", }, ) self.assertEqual(FakeRnsPacket.sent_links, [current_link]) responses = self.drain_json_responses() self.assertEqual(len(responses), 1) self.assertTrue(responses[0].get("ok")) self.assertEqual( responses[0].get("payload", {}).get("linkId"), "current-audio-link", ) def test_audio_open_stops_after_max_establish_attempts(self): self.bridge._destination = FakeDestination() self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] = { "desired": True, "attempts": self.bridge._AUDIO_LINK_MAX_ESTABLISH_ATTEMPTS, "retry_delay": self.bridge._AUDIO_LINK_RETRY_MIN_SECONDS, "retry_timer": None, "last_failure_reason": "establish_timeout", } ok, payload, error = self.bridge._open_group_audio_link_for_peer( self.sender_peer_hash, retry_reason="establish_timeout", ) self.assertFalse(ok) self.assertEqual(payload.get("code"), "max_establish_attempts") self.assertEqual(error, "Max group audio link establish attempts reached") events = [ frame for frame in self.drain_json_events() if frame.get("event") == "group_audio_send_failed" ] self.assertEqual(len(events), 1) self.assertEqual(events[0].get("payload", {}).get("reason"), "max_establish_attempts") self.assertEqual(events[0].get("payload", {}).get("code"), "max_establish_attempts") def test_audio_retry_not_scheduled_after_max_establish_attempts(self): self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] = { "desired": True, "attempts": self.bridge._AUDIO_LINK_MAX_ESTABLISH_ATTEMPTS, "retry_delay": self.bridge._AUDIO_LINK_RETRY_MIN_SECONDS, "retry_timer": None, "last_failure_reason": "establish_timeout", } self.bridge._schedule_audio_link_retry(self.sender_peer_hash, "establish_timeout") self.bridge._schedule_audio_link_retry(self.sender_peer_hash, "establish_timeout") desired = self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] self.assertIsNone(desired.get("retry_timer")) self.assertEqual(desired.get("last_failure_reason"), "max_establish_attempts") events = [ frame for frame in self.drain_json_events() if frame.get("event") == "group_audio_send_failed" ] self.assertEqual(len(events), 1) self.assertEqual(events[0].get("payload", {}).get("reason"), "max_establish_attempts") self.assertEqual(events[0].get("payload", {}).get("code"), "max_establish_attempts") def test_audio_retry_timer_callback_does_not_enqueue_after_max_attempts(self): enqueued = [] original_enqueue = self.bridge._enqueue_scheduler_task original_timer = self.bridge.threading.Timer class FakeTimer: def __init__(self, delay, function): self.delay = delay self.function = function self.daemon = False self.started = False def start(self): self.started = True def cancel(self): self.started = False try: self.bridge._enqueue_scheduler_task = lambda *args, **kwargs: enqueued.append( (args, kwargs) ) self.bridge.threading.Timer = FakeTimer self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] = { "desired": True, "attempts": self.bridge._AUDIO_LINK_MAX_ESTABLISH_ATTEMPTS - 1, "retry_delay": self.bridge._AUDIO_LINK_RETRY_MIN_SECONDS, "retry_timer": None, "last_failure_reason": "establish_timeout", } self.bridge._schedule_audio_link_retry( self.sender_peer_hash, "establish_timeout", immediate=True, ) desired = self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] timer = desired.get("retry_timer") self.assertIsNotNone(timer) desired["attempts"] = self.bridge._AUDIO_LINK_MAX_ESTABLISH_ATTEMPTS timer.function() self.assertEqual(enqueued, []) events = [ frame for frame in self.drain_json_events() if frame.get("event") == "group_audio_send_failed" ] self.assertEqual(len(events), 1) self.assertEqual( events[0].get("payload", {}).get("code"), "max_establish_attempts", ) finally: self.bridge._enqueue_scheduler_task = original_enqueue self.bridge.threading.Timer = original_timer def test_audio_established_resets_establish_attempts(self): link = self.install_audio_state("current-audio-link", established=False) self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] = { "desired": True, "attempts": self.bridge._AUDIO_LINK_MAX_ESTABLISH_ATTEMPTS, "retry_delay": self.bridge._AUDIO_LINK_RETRY_MAX_SECONDS, "retry_timer": None, "last_failure_reason": "establish_timeout", "max_attempts_emitted": True, } self.bridge.on_outgoing_audio_link_established(link) desired = self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] self.assertEqual(desired.get("attempts"), 0) self.assertEqual(desired.get("retry_delay"), self.bridge._AUDIO_LINK_RETRY_MIN_SECONDS) self.assertEqual(desired.get("last_failure_reason"), "") self.assertFalse(desired.get("max_attempts_emitted")) def test_outbound_overlay_group_audio_is_not_promoted(self): self.drain_audio_queue() link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=False) packet = FakePacket(link) self.bridge._known_peers[self.sender_peer_hash] = object() self.bridge._audio_link_desired_by_peer_hash[self.sender_peer_hash] = { "desired": True, } self.bridge.on_overlay_link_packet(self.group_audio_wire(), packet) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_audio_link_id(link)) self.assertFalse(link.teardown_called) self.assertEqual(self.bridge._audio_binary_out_queue.qsize(), 0) def test_incoming_overlay_qchat_auth_promotes_when_transfer_is_pending(self): link, overlay_link_id, overlay_peer_hash = self.install_overlay_state( incoming=True ) packet = FakePacket(link) self.bridge._qchat_file_pending_sends_by_transfer["transfer-1"] = { "expires_at": time.time() + 60, } self.bridge.on_overlay_link_packet(self.qchat_file_auth_wire(), packet) self.assertNotIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertNotIn(id(link), self.bridge._overlay_link_ids_by_object) self.assertNotIn( overlay_peer_hash, self.bridge._active_overlay_link_id_by_peer_hash, ) self.assertNotIn(overlay_peer_hash, self.bridge._inbound_overlay_neighbors) self.assertFalse(link.teardown_called) file_link_id = self.bridge.get_qchat_file_link_id(link) self.assertIsInstance(file_link_id, str) file_state = self.bridge.get_qchat_file_link_state(file_link_id) self.assertIsNotNone(file_state) self.assertTrue(file_state["incoming"]) self.assertEqual(file_state["peerPresenceHash"], self.sender_peer_hash) self.assertEqual(file_state["transferId"], "transfer-1") self.assertIs(link.packet_callback, self.bridge.on_qchat_file_link_packet) def test_incoming_overlay_qchat_auth_without_pending_transfer_is_not_promoted(self): link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=True) packet = FakePacket(link) self.bridge.on_overlay_link_packet(self.qchat_file_auth_wire(), packet) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_qchat_file_link_id(link)) self.assertFalse(link.teardown_called) def test_incoming_overlay_qchat_auth_with_expired_transfer_is_not_promoted(self): link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=True) packet = FakePacket(link) self.bridge._qchat_file_pending_sends_by_transfer["transfer-1"] = { "expires_at": time.time() - 1, } self.bridge.on_overlay_link_packet(self.qchat_file_auth_wire(), packet) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_qchat_file_link_id(link)) self.assertFalse(link.teardown_called) def test_incoming_overlay_qchat_auth_with_invalid_peer_hash_is_not_promoted(self): link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=True) packet = FakePacket(link) self.bridge._qchat_file_pending_sends_by_transfer["transfer-1"] = { "expires_at": time.time() + 60, } self.bridge.on_overlay_link_packet( self.qchat_file_auth_wire(peer_hash="not-a-hash"), packet, ) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_qchat_file_link_id(link)) self.assertFalse(link.teardown_called) def test_outbound_overlay_qchat_auth_is_not_promoted(self): link, overlay_link_id, _peer_hash = self.install_overlay_state(incoming=False) packet = FakePacket(link) self.bridge._qchat_file_pending_sends_by_transfer["transfer-1"] = { "expires_at": time.time() + 60, } self.bridge.on_overlay_link_packet(self.qchat_file_auth_wire(), packet) self.assertIn(overlay_link_id, self.bridge._overlay_links_by_id) self.assertIsNone(self.bridge.get_qchat_file_link_id(link)) self.assertFalse(link.teardown_called) if __name__ == "__main__": unittest.main()