import asyncio import logging from signal import SIGINT, SIGTERM import numpy as np from livekit import rtc URL = "ws://localhost:7880" TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5MDY2MTMyODgsImlzcyI6IkFQSVRzRWZpZFpqclFvWSIsIm5hbWUiOiJuYXRpdmUiLCJuYmYiOjE2NzI2MTMyODgsInN1YiI6Im5hdGl2ZSIsInZpZGVvIjp7InJvb20iOiJ0ZXN0Iiwicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZSwicm9vbUxpc3QiOnRydWV9fQ.uSNIangMRu8jZD5mnRYoCHjcsQWCrJXgHCs0aNIgBFY" # noqa SAMPLE_RATE = 48000 NUM_CHANNELS = 1 async def publish_frames(source: rtc.AudioSource, frequency: int): amplitude = 32767 # for 16-bit audio samples_per_channel = 480 # 10ms at 48kHz time = np.arange(samples_per_channel) / SAMPLE_RATE total_samples = 0 audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, samples_per_channel) audio_data = np.frombuffer(audio_frame.data, dtype=np.int16) while True: time = (total_samples + np.arange(samples_per_channel)) / SAMPLE_RATE sine_wave = (amplitude * np.sin(2 * np.pi * frequency * time)).astype(np.int16) np.copyto(audio_data, sine_wave) await source.capture_frame(audio_frame) total_samples += samples_per_channel async def main(room: rtc.Room) -> None: @room.on("participant_disconnected") def on_participant_disconnect(participant: rtc.Participant, *_): logging.info("participant disconnected: %s", participant.identity) logging.info("connecting to %s", URL) try: await room.connect( URL, TOKEN, options=rtc.RoomOptions( auto_subscribe=True, ), ) logging.info("connected to room %s", room.name) except rtc.ConnectError as e: logging.error("failed to connect to the room: %s", e) return # publish a track source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS) track = rtc.LocalAudioTrack.create_audio_track("sinewave", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_MICROPHONE publication = await room.local_participant.publish_track(track, options) logging.info("published track %s", publication.sid) asyncio.ensure_future(publish_frames(source, 440)) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, handlers=[logging.FileHandler("publish_wave.log"), logging.StreamHandler()], ) loop = asyncio.get_event_loop() room = rtc.Room(loop=loop) async def cleanup(): await room.disconnect() loop.stop() asyncio.ensure_future(main(room)) for signal in [SIGINT, SIGTERM]: loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup())) try: loop.run_forever() finally: loop.close()