""" End-to-end tests for LiveKit RTC library. These tests verify core functionality of the LiveKit RTC library including: - Publishing and subscribing to audio & data tracks - Audio stream consumption and energy verification - Room lifecycle events (connect, disconnect, track publish/unpublish) - Connection state transitions Requirements: - LIVEKIT_URL: LiveKit server URL - LIVEKIT_API_KEY: API key for authentication - LIVEKIT_API_SECRET: API secret for authentication Tests will be skipped if these environment variables are not set. Usage: pytest test_e2e.py -v """ import asyncio import os import time import uuid from typing import Any, Callable, Dict, List, TypeVar import numpy as np import pytest from livekit import rtc, api from livekit.rtc.utils import sine_wave_generator SAMPLE_RATE = 48000 T = TypeVar("T") async def assert_eventually( condition: Callable[[], T], timeout: float = 5.0, interval: float = 0.1, message: str = "Condition not met within timeout", ) -> T: """ Poll a condition until it becomes truthy or timeout is reached. Returns immediately once condition is satisfied. """ deadline = asyncio.get_event_loop().time() + timeout last_result = None while asyncio.get_event_loop().time() < deadline: last_result = condition() if last_result: return last_result await asyncio.sleep(interval) raise AssertionError(f"{message} (last result: {last_result})") def skip_if_no_credentials() -> Any: required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"] missing = [var for var in required_vars if not os.getenv(var)] return pytest.mark.skipif( bool(missing), reason=f"Missing environment variables: {', '.join(missing)}" ) def create_token(identity: str, room_name: str) -> str: return ( api.AccessToken() .with_identity(identity) .with_name(identity) .with_grants( api.VideoGrants( room_join=True, room=room_name, ) ) .to_jwt() ) def unique_room_name(base: str) -> str: return f"{base}-{uuid.uuid4().hex[:8]}" @pytest.mark.asyncio @skip_if_no_credentials() # type: ignore[untyped-decorator] async def test_publish_track() -> None: """Test that a published track can be subscribed by another participant""" room_name = unique_room_name("test-publish-track") url = os.getenv("LIVEKIT_URL") assert url is not None publisher_room = rtc.Room() subscriber_room = rtc.Room() publisher_token = create_token("publisher", room_name) subscriber_token = create_token("subscriber", room_name) track_published_event = asyncio.Event() track_subscribed_event = asyncio.Event() subscribed_track = None @subscriber_room.on("track_published") def on_track_published( publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant ) -> None: track_published_event.set() @subscriber_room.on("track_subscribed") def on_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, ) -> None: nonlocal subscribed_track if track.kind == rtc.TrackKind.KIND_AUDIO: subscribed_track = track track_subscribed_event.set() try: await subscriber_room.connect(url, subscriber_token) await publisher_room.connect(url, publisher_token) source = rtc.AudioSource(SAMPLE_RATE, 1) track = rtc.LocalAudioTrack.create_audio_track("test-audio", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_MICROPHONE publication = await publisher_room.local_participant.publish_track(track, options) assert publication is not None assert publication.sid is not None await asyncio.wait_for(track_published_event.wait(), timeout=5.0) await asyncio.wait_for(track_subscribed_event.wait(), timeout=5.0) assert subscribed_track is not None assert isinstance(subscribed_track, rtc.RemoteAudioTrack) finally: await publisher_room.disconnect() await subscriber_room.disconnect() @pytest.mark.asyncio @skip_if_no_credentials() # type: ignore[untyped-decorator] async def test_video_packet_trailer_metadata() -> None: """Test that packet trailer metadata can be sent and received on video frames.""" room_name = unique_room_name("test-video-packet-trailer") url = os.getenv("LIVEKIT_URL") assert url is not None publisher_room = rtc.Room() subscriber_room = rtc.Room() publisher_token = create_token("video-publisher", room_name) subscriber_token = create_token("video-subscriber", room_name) track_subscribed_event = asyncio.Event() subscribed_track = None subscribed_publication = None video_stream = None source = None @subscriber_room.on("track_subscribed") def on_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, ) -> None: nonlocal subscribed_track, subscribed_publication if track.kind == rtc.TrackKind.KIND_VIDEO: subscribed_track = track subscribed_publication = publication track_subscribed_event.set() try: await subscriber_room.connect(url, subscriber_token) await publisher_room.connect(url, publisher_token) # Use a real video resolution: WebRTC encoders (VP8/H.264) enforce a # 16x16 minimum and assert internally on smaller inputs, which would # crash the FFI process via panic=abort. frame_width, frame_height = 320, 240 source = rtc.VideoSource(frame_width, frame_height) track = rtc.LocalVideoTrack.create_video_track("metadata-video", source) packet_trailer_features = [ rtc.PacketTrailerFeature.PTF_USER_TIMESTAMP, rtc.PacketTrailerFeature.PTF_FRAME_ID, ] options = rtc.TrackPublishOptions( source=rtc.TrackSource.SOURCE_CAMERA, packet_trailer_features=packet_trailer_features, ) publication = await publisher_room.local_participant.publish_track(track, options) assert publication.packet_trailer_features == packet_trailer_features await asyncio.wait_for(track_subscribed_event.wait(), timeout=5.0) assert subscribed_track is not None assert subscribed_publication is not None assert subscribed_publication.packet_trailer_features == packet_trailer_features video_stream = rtc.VideoStream.from_track(track=subscribed_track, capacity=1) frame = rtc.VideoFrame( width=frame_width, height=frame_height, type=rtc.VideoBufferType.RGBA, data=bytes([255, 0, 0, 255] * (frame_width * frame_height)), ) metadata = rtc.FrameMetadata(user_timestamp=123456789, frame_id=77) async def publish_frames() -> None: for _ in range(10): source.capture_frame(frame, metadata=metadata) await asyncio.sleep(0.2) publish_task = asyncio.create_task(publish_frames()) event = await asyncio.wait_for(video_stream.__anext__(), timeout=10.0) await publish_task assert event.metadata is not None assert event.metadata.HasField("user_timestamp") assert event.metadata.HasField("frame_id") assert event.metadata.user_timestamp == 123456789 assert event.metadata.frame_id == 77 finally: if video_stream is not None: await video_stream.aclose() if source is not None: await source.aclose() await publisher_room.disconnect() await subscriber_room.disconnect() @pytest.mark.asyncio @skip_if_no_credentials() # type: ignore[untyped-decorator] async def test_full_reconnect_preserves_local_publication_object() -> None: """Test that FFI local_track_republished updates the existing publication object.""" room_name = unique_room_name("test-local-republish") url = os.getenv("LIVEKIT_URL") assert url is not None room = rtc.Room() token = create_token("publisher", room_name) reconnected_event = asyncio.Event() source = None @room.on("reconnected") def on_reconnected() -> None: reconnected_event.set() try: await room.connect(url, token) source = rtc.VideoSource(2, 2) track = rtc.LocalVideoTrack.create_video_track("republish-video", source) packet_trailer_features = [ rtc.PacketTrailerFeature.PTF_USER_TIMESTAMP, rtc.PacketTrailerFeature.PTF_FRAME_ID, ] publication = await room.local_participant.publish_track( track, rtc.TrackPublishOptions( source=rtc.TrackSource.SOURCE_CAMERA, packet_trailer_features=packet_trailer_features, ), ) previous_sid = publication.sid await room.simulate_scenario(rtc.SimulateScenarioKind.SIMULATE_FULL_RECONNECT) await asyncio.wait_for(reconnected_event.wait(), timeout=10.0) await assert_eventually( lambda: ( publication.sid in room.local_participant.track_publications and room.local_participant.track_publications[publication.sid] is publication ), timeout=10.0, message="local publication was not rekeyed after full reconnect", ) assert publication.sid != previous_sid assert previous_sid not in room.local_participant.track_publications assert publication.packet_trailer_features == packet_trailer_features finally: if source is not None: await source.aclose() await room.disconnect() @pytest.mark.asyncio @skip_if_no_credentials() # type: ignore[untyped-decorator] async def test_audio_stream_subscribe() -> None: """Test that published audio can be consumed and has similar energy levels""" room_name = unique_room_name("test-audio-stream") url = os.getenv("LIVEKIT_URL") assert url is not None publisher_room = rtc.Room() subscriber_room = rtc.Room() publisher_token = create_token("audio-publisher", room_name) subscriber_token = create_token("audio-subscriber", room_name) track_subscribed_event = asyncio.Event() subscribed_track = None @subscriber_room.on("track_subscribed") def on_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, ) -> None: nonlocal subscribed_track if track.kind == rtc.TrackKind.KIND_AUDIO: subscribed_track = track track_subscribed_event.set() try: await subscriber_room.connect(url, subscriber_token) await publisher_room.connect(url, publisher_token) source = rtc.AudioSource(SAMPLE_RATE, 1) track = rtc.LocalAudioTrack.create_audio_track("sine-wave", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_MICROPHONE await publisher_room.local_participant.publish_track(track, options) target_duration = 5.0 published_energy: List[Any] = [] async def publish_audio() -> None: async for frame in sine_wave_generator(440, target_duration, SAMPLE_RATE): data = np.frombuffer(frame.data.tobytes(), dtype=np.int16) energy = np.mean(np.abs(data.astype(np.float32))) published_energy.append(energy) await source.capture_frame(frame) publish_task = asyncio.create_task(publish_audio()) await asyncio.wait_for(track_subscribed_event.wait(), timeout=5.0) assert subscribed_track is not None audio_stream = rtc.AudioStream( subscribed_track, sample_rate=SAMPLE_RATE, num_channels=1, ) received_frames = [] target_frames = int(target_duration * SAMPLE_RATE / 480) frame_count = 0 async for event in audio_stream: frame = event.frame data = np.frombuffer(frame.data, dtype=np.int16) received_frames.append(data) frame_count += 1 if frame_count >= target_frames: break await audio_stream.aclose() await publish_task assert len(received_frames) > 0, "No audio frames were received" received_energy = [] for data in received_frames: energy = np.mean(np.abs(data.astype(np.float32))) received_energy.append(energy) avg_received_energy = np.mean(received_energy) avg_published_energy = np.mean(published_energy) assert avg_received_energy > 0, "Received audio has no energy" assert avg_published_energy > 0, "Published audio has no energy" assert ( avg_received_energy > avg_published_energy * 0.9 and avg_received_energy < avg_published_energy * 1.1 ), "Received audio energy is not within range" finally: await publisher_room.disconnect() await subscriber_room.disconnect() @pytest.mark.asyncio @skip_if_no_credentials() # type: ignore[untyped-decorator] async def test_room_lifecycle_events() -> None: """Test that room lifecycle and track events are fired properly""" room_name = unique_room_name("test-lifecycle-events") url = os.getenv("LIVEKIT_URL") assert url is not None room1 = rtc.Room() room2 = rtc.Room() token1 = create_token("participant-1", room_name) token2 = create_token("participant-2", room_name) events: Dict[str, List[str]] = { "disconnected": [], "participant_connected": [], "participant_disconnected": [], "local_track_published": [], "local_track_unpublished": [], "track_published": [], "track_unpublished": [], "track_subscribed": [], "track_unsubscribed": [], "room_updated": [], "connection_state_changed": [], } @room1.on("disconnected") def on_room1_disconnected(reason: Any) -> None: events["disconnected"].append("room1") @room1.on("participant_connected") def on_room1_participant_connected(participant: rtc.RemoteParticipant) -> None: events["participant_connected"].append(f"room1-{participant.identity}") @room1.on("participant_disconnected") def on_room1_participant_disconnected(participant: rtc.RemoteParticipant) -> None: events["participant_disconnected"].append(f"room1-{participant.identity}") @room1.on("local_track_published") def on_room1_local_track_published(publication: rtc.LocalTrackPublication, track: Any) -> None: events["local_track_published"].append(f"room1-{publication.sid}") @room1.on("local_track_unpublished") def on_room1_local_track_unpublished(publication: rtc.LocalTrackPublication) -> None: events["local_track_unpublished"].append(f"room1-{publication.sid}") @room1.on("room_updated") def on_room1_room_updated() -> None: events["room_updated"].append("room1") @room1.on("connection_state_changed") def on_room1_connection_state_changed(state: rtc.ConnectionState) -> None: events["connection_state_changed"].append(f"room1-{state}") @room2.on("track_published") def on_room2_track_published( publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant ) -> None: events["track_published"].append(f"room2-{publication.sid}") @room2.on("track_subscribed") def on_room2_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, ) -> None: events["track_subscribed"].append(f"room2-{publication.sid}") @room2.on("track_unpublished") def on_room2_track_unpublished( publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant ) -> None: events["track_unpublished"].append(f"room2-{publication.sid}") try: await room1.connect(url, token1) await assert_eventually( lambda: ( len(events["connection_state_changed"]) > 0 and events["connection_state_changed"][-1] == f"room1-{rtc.ConnectionState.CONN_CONNECTED}" ), message="room1 connection_state_changed event not fired or did not reach CONN_CONNECTED state", ) await room2.connect(url, token2) await assert_eventually( lambda: "room1-participant-2" in events["participant_connected"], message="room1 did not receive participant_connected for participant-2", ) await assert_eventually( lambda: room2.remote_participants.get("participant-1") is not None, message="room2 did not see participant-1", ) source = rtc.AudioSource(SAMPLE_RATE, 1) track = rtc.LocalAudioTrack.create_audio_track("test-track", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_MICROPHONE publication = await room1.local_participant.publish_track(track, options) await assert_eventually( lambda: len(events["local_track_published"]) > 0, message="local_track_published event not fired", ) await assert_eventually( lambda: any("room2" in e for e in events["track_published"]), message="room2 did not receive track_published", ) await assert_eventually( lambda: len(events["track_subscribed"]) > 0, message="track_subscribed event not fired" ) await room1.local_participant.unpublish_track(publication.sid) await assert_eventually( lambda: len(events["local_track_unpublished"]) > 0, message="local_track_unpublished event not fired", ) await assert_eventually( lambda: len(events["track_unpublished"]) > 0, message="track_unpublished event not fired", ) await room2.disconnect() await assert_eventually( lambda: "room1-participant-2" in events["participant_disconnected"], message="participant_disconnected not fired for participant-2", ) await room1.disconnect() await assert_eventually( lambda: ( lambda: ( len(events["connection_state_changed"]) > 0 and events["connection_state_changed"][-1] == f"room1-{rtc.ConnectionState.CONN_DISCONNECTED}" ) ), message="room1 disconnected event not fired", ) print("\nEvent Summary:") for event_type, event_list in events.items(): if event_list: print(f" {event_type}: {len(event_list)} events") finally: if room1.isconnected(): await room1.disconnect() if room2.isconnected(): await room2.disconnect() @pytest.mark.asyncio @skip_if_no_credentials() # type: ignore[untyped-decorator] async def test_connection_state_transitions() -> None: """Test that connection state transitions work correctly""" room_name = unique_room_name("test-connection-state") url = os.getenv("LIVEKIT_URL") assert url is not None room = rtc.Room() token = create_token("state-test", room_name) states: List[rtc.ConnectionState] = [] @room.on("connection_state_changed") def on_state_changed(state: rtc.ConnectionState) -> None: states.append(state) try: assert room.connection_state == rtc.ConnectionState.CONN_DISCONNECTED await room.connect(url, token) await assert_eventually( lambda: room.connection_state == rtc.ConnectionState.CONN_CONNECTED, message="Room did not reach CONN_CONNECTED state", ) await assert_eventually( lambda: rtc.ConnectionState.CONN_CONNECTED in states, # type: ignore[comparison-overlap] message="CONN_CONNECTED state not in state change events", ) await room.disconnect() await assert_eventually( lambda: room.connection_state == rtc.ConnectionState.CONN_DISCONNECTED, message="Room did not reach CONN_DISCONNECTED state after disconnect", ) finally: if room.isconnected(): await room.disconnect() @pytest.mark.asyncio @skip_if_no_credentials() # type: ignore[untyped-decorator] @pytest.mark.skipif( os.getenv("RUN_DATA_TRACK_TESTS") != "1", reason="SFU support requires data tracks support to be enabled via config; remove once this is no longer the case.", ) async def test_data_track() -> None: """Test that a published data track delivers frames with correct payloads and timestamps.""" FRAME_COUNT = 5 PAYLOAD_SIZE = 64 TRACK_NAME = "test-track" PUBLISHER_IDENTITY = "dt-publisher" SUBSCRIBER_IDENTITY = "dt-subscriber" room_name = unique_room_name("test-data-track") url = os.getenv("LIVEKIT_URL") assert url is not None publisher_room = rtc.Room() subscriber_room = rtc.Room() publisher_token = create_token(PUBLISHER_IDENTITY, room_name) subscriber_token = create_token(SUBSCRIBER_IDENTITY, room_name) remote_track_event = asyncio.Event() remote_track: None | rtc.RemoteDataTrack = None unpublished_event = asyncio.Event() unpublished_sid = None @subscriber_room.on("data_track_published") def on_data_track_published(track: rtc.RemoteDataTrack) -> None: nonlocal remote_track remote_track = track remote_track_event.set() @subscriber_room.on("data_track_unpublished") def on_data_track_unpublished(sid: str) -> None: nonlocal unpublished_sid unpublished_sid = sid unpublished_event.set() try: assert url is not None await subscriber_room.connect(url, subscriber_token) await publisher_room.connect(url, publisher_token) local_track = await publisher_room.local_participant.publish_data_track(name=TRACK_NAME) assert local_track.info.sid is not None assert local_track.info.name == TRACK_NAME assert local_track.is_published() await asyncio.wait_for(remote_track_event.wait(), timeout=10.0) assert remote_track is not None assert remote_track.info.name == TRACK_NAME assert remote_track.publisher_identity == PUBLISHER_IDENTITY assert remote_track.is_published() stream = remote_track.subscribe() async def push_frames() -> None: for i in range(FRAME_COUNT): frame = rtc.DataTrackFrame( payload=bytes([i] * PAYLOAD_SIZE), user_timestamp=int(time.time() * 1000), ) local_track.try_push(frame) await asyncio.sleep(0.1) await local_track.unpublish() async def publish_and_receive(stream: rtc.DataTrackStream) -> int: push_task = asyncio.create_task(push_frames()) recv_count = 0 async for frame in stream: first_byte = frame.payload[0] assert all(b == first_byte for b in frame.payload), "Payload bytes are not uniform" assert len(frame.payload) == PAYLOAD_SIZE assert frame.user_timestamp is not None latency = (int(time.time() * 1000) - frame.user_timestamp) / 1000.0 assert latency < 5.0, f"Timestamp latency too high: {latency}" recv_count += 1 await push_task return recv_count recv_count = await asyncio.wait_for(publish_and_receive(stream), timeout=10.0) assert recv_count > 0, "No frames were received" await asyncio.wait_for(unpublished_event.wait(), timeout=5.0) assert unpublished_sid == local_track.info.sid finally: await publisher_room.disconnect() await subscriber_room.disconnect()