#!/usr/bin/env run.sh """Video streaming module for Kidcam using GStreamer with HLS output.""" # : out kidcam-streamer # : dep pytest # NOTE: pygobject (GStreamer) only available on Jetson, skip for typecheck import gi # type: ignore[import-not-found] import http.server as http_server import os import pathlib import shutil import socketserver import subprocess import sys import threading import typing gi.require_version("Gst", "1.0") import contextlib import gi.repository.Gst as Gst # type: ignore[import-not-found] class VideoStreamer: """Video streamer using GStreamer for HLS output with hardware encoding.""" def __init__( self, device: str = "/dev/video0", width: int = 1280, height: int = 720, port: int = 8554, ) -> None: """ Initialize video streamer. Args: device: Video device path width: Video width in pixels height: Video height in pixels port: HTTP server port for HLS streaming """ self.device = device self.width = width self.height = height self.port = port self.hls_dir = pathlib.Path("/tmp/hls") self.pipeline: Gst.Element | None = None self.http_server: socketserver.TCPServer | None = None self.server_thread: threading.Thread | None = None self._streaming = False Gst.init(None) def _get_tailscale_ip(self) -> str: """Get the Tailscale IP address of this device.""" try: result = subprocess.run( ["tailscale", "ip", "-4"], capture_output=True, text=True, check=True, timeout=5, ) return result.stdout.strip() except ( subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError, ): return "localhost" def _check_gstreamer_elements(self) -> tuple[bool, list[str]]: """ Check if required GStreamer elements are available. Returns: Tuple of (all_available, missing_elements) """ required_elements = [ "v4l2src", "nvv4l2h264enc", "h264parse", "hlssink2", ] missing = [] for elem_name in required_elements: factory = Gst.ElementFactory.find(elem_name) if factory is None: missing.append(elem_name) return (len(missing) == 0, missing) def _setup_hls_directory(self) -> None: """Create and clean HLS output directory.""" if self.hls_dir.exists(): shutil.rmtree(self.hls_dir) self.hls_dir.mkdir(parents=True, exist_ok=True) def _start_http_server(self) -> None: """Start HTTP server to serve HLS segments.""" os.chdir(self.hls_dir) class QuietHandler(http_server.SimpleHTTPRequestHandler): def log_message(self, format: str, *args: typing.Any) -> None: pass self.http_server = socketserver.TCPServer(("", self.port), QuietHandler) self.server_thread = threading.Thread( target=self.http_server.serve_forever, daemon=True, ) self.server_thread.start() def _stop_http_server(self) -> None: """Stop HTTP server.""" if self.http_server: self.http_server.shutdown() self.http_server.server_close() self.http_server = None if self.server_thread: self.server_thread.join(timeout=2.0) self.server_thread = None def start_stream(self) -> str: """ Start video streaming. Returns: Stream URL (HTTP endpoint for HLS playlist) Raises: RuntimeError: If stream is already running or GStreamer elements missing """ if self._streaming: msg = "Stream is already running" raise RuntimeError(msg) available, missing = self._check_gstreamer_elements() if not available: msg = ( f"Missing required GStreamer elements: {', '.join(missing)}. " "Install gst-plugins-good, gst-plugins-bad, and Jetson multimedia packages." ) raise RuntimeError( msg ) self._setup_hls_directory() pipeline_str = ( f"v4l2src device={self.device} ! " f"video/x-raw,width={self.width},height={self.height},framerate=30/1 ! " f"nvv4l2h264enc bitrate=2000000 ! " f"h264parse ! " f"hlssink2 " f"location={self.hls_dir}/segment%05d.ts " f"playlist-location={self.hls_dir}/stream.m3u8 " f"max-files=10 " f"target-duration=2 " f"playlist-length=5" ) self.pipeline = Gst.parse_launch(pipeline_str) if not self.pipeline: msg = "Failed to create GStreamer pipeline" raise RuntimeError(msg) ret = self.pipeline.set_state(Gst.State.PLAYING) if ret == Gst.StateChangeReturn.FAILURE: self.pipeline.set_state(Gst.State.NULL) self.pipeline = None msg = "Failed to start GStreamer pipeline" raise RuntimeError(msg) self._start_http_server() self._streaming = True return self.get_stream_url() def stop_stream(self) -> None: """Stop video streaming and cleanup resources.""" if not self._streaming: return if self.pipeline: self.pipeline.set_state(Gst.State.NULL) self.pipeline = None self._stop_http_server() if self.hls_dir.exists(): with contextlib.suppress(OSError): shutil.rmtree(self.hls_dir) self._streaming = False def is_streaming(self) -> bool: """ Check if currently streaming. Returns: True if streaming, False otherwise """ return self._streaming def get_stream_url(self) -> str: """ Get the HLS stream URL. Returns: HTTP URL for HLS playlist """ ip = self._get_tailscale_ip() return f"http://{ip}:{self.port}/stream.m3u8" def test() -> None: """Basic tests for VideoStreamer.""" streamer = VideoStreamer( device="/dev/video0", width=640, height=480, port=8555 ) assert streamer.device == "/dev/video0" assert streamer.width == 640 assert streamer.height == 480 assert streamer.port == 8555 assert not streamer.is_streaming() available, _missing = streamer._check_gstreamer_elements() if not available: pass streamer.get_stream_url() def main() -> None: """Test the video streamer.""" import argparse import signal import time parser = argparse.ArgumentParser( description="Video streaming with GStreamer HLS" ) parser.add_argument( "--device", default="/dev/video0", help="Video device path (default: /dev/video0)", ) parser.add_argument( "--width", type=int, default=1280, help="Video width (default: 1280)", ) parser.add_argument( "--height", type=int, default=720, help="Video height (default: 720)", ) parser.add_argument( "--port", type=int, default=8554, help="HTTP server port (default: 8554)", ) parser.add_argument( "--test", action="store_true", help="Run tests instead of starting stream", ) args = parser.parse_args() if args.test: test() return streamer = VideoStreamer( device=args.device, width=args.width, height=args.height, port=args.port, ) def signal_handler(sig: int, frame: typing.Any) -> None: streamer.stop_stream() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: streamer.start_stream() while streamer.is_streaming(): time.sleep(1) except RuntimeError: sys.exit(1) except Exception: streamer.stop_stream() sys.exit(1) if __name__ == "__main__": main()