import asyncio import logging from signal import SIGINT, SIGTERM import os import numpy as np from livekit import rtc, api SAMPLE_RATE = 48000 NUM_CHANNELS = 1 # ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set async def main(room: rtc.Room) -> None: @room.on("participant_disconnected") def on_participant_disconnect(participant: rtc.Participant, *_): logging.info("participant disconnected: %s", participant.identity) token = ( api.AccessToken() .with_identity("python-publisher") .with_name("Python Publisher") .with_grants( api.VideoGrants( room_join=True, room="my-room", ) ) .to_jwt() ) url = os.getenv("LIVEKIT_URL") logging.info("connecting to %s", url) try: await room.connect( url, token, options=rtc.RoomOptions( auto_subscribe=True, ), ) logging.info("connected to room %s", room.name) except rtc.ConnectError as e: logging.error("failed to connect to the room: %s", e) return # publish a track source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS) track = rtc.LocalAudioTrack.create_audio_track("sinewave", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_MICROPHONE publication = await room.local_participant.publish_track(track, options) logging.info("published track %s", publication.sid) asyncio.ensure_future(publish_frames(source, 440)) async def publish_frames(source: rtc.AudioSource, frequency: int): amplitude = 32767 # for 16-bit audio samples_per_channel = 480 # 10ms at 48kHz time = np.arange(samples_per_channel) / SAMPLE_RATE total_samples = 0 audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, samples_per_channel) audio_data = np.frombuffer(audio_frame.data, dtype=np.int16) while True: time = (total_samples + np.arange(samples_per_channel)) / SAMPLE_RATE sine_wave = (amplitude * np.sin(2 * np.pi * frequency * time)).astype(np.int16) np.copyto(audio_data, sine_wave) await source.capture_frame(audio_frame) total_samples += samples_per_channel if __name__ == "__main__": logging.basicConfig( level=logging.INFO, handlers=[logging.FileHandler("publish_wave.log"), logging.StreamHandler()], ) loop = asyncio.get_event_loop() room = rtc.Room(loop=loop) async def cleanup(): await room.disconnect() loop.stop() asyncio.ensure_future(main(room)) for signal in [SIGINT, SIGTERM]: loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup())) try: loop.run_forever() finally: loop.close()