summaryrefslogtreecommitdiff
path: root/Biz/Kidcam/Streamer.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/Kidcam/Streamer.py')
-rwxr-xr-xBiz/Kidcam/Streamer.py313
1 files changed, 313 insertions, 0 deletions
diff --git a/Biz/Kidcam/Streamer.py b/Biz/Kidcam/Streamer.py
new file mode 100755
index 0000000..4779546
--- /dev/null
+++ b/Biz/Kidcam/Streamer.py
@@ -0,0 +1,313 @@
+#!/usr/bin/env run.sh
+"""Video streaming module for Kidcam using GStreamer with HLS output."""
+
+# : out kidcam-streamer
+# : dep pytest
+# NOTE: pygobject (GStreamer) only available on Jetson, skip for typecheck
+
+import gi # type: ignore[import-not-found]
+import http.server as http_server
+import os
+import pathlib
+import shutil
+import socketserver
+import subprocess
+import sys
+import threading
+import typing
+
+gi.require_version("Gst", "1.0")
+
+import contextlib
+import gi.repository.Gst as Gst # type: ignore[import-not-found]
+
+
+class VideoStreamer:
+ """Video streamer using GStreamer for HLS output with hardware encoding."""
+
+ def __init__(
+ self,
+ device: str = "/dev/video0",
+ width: int = 1280,
+ height: int = 720,
+ port: int = 8554,
+ ) -> None:
+ """
+ Initialize video streamer.
+
+ Args:
+ device: Video device path
+ width: Video width in pixels
+ height: Video height in pixels
+ port: HTTP server port for HLS streaming
+ """
+ self.device = device
+ self.width = width
+ self.height = height
+ self.port = port
+ self.hls_dir = pathlib.Path("/tmp/hls")
+ self.pipeline: Gst.Element | None = None
+ self.http_server: socketserver.TCPServer | None = None
+ self.server_thread: threading.Thread | None = None
+ self._streaming = False
+
+ Gst.init(None)
+
+ def _get_tailscale_ip(self) -> str:
+ """Get the Tailscale IP address of this device."""
+ try:
+ result = subprocess.run(
+ ["tailscale", "ip", "-4"],
+ capture_output=True,
+ text=True,
+ check=True,
+ timeout=5,
+ )
+ return result.stdout.strip()
+ except (
+ subprocess.CalledProcessError,
+ subprocess.TimeoutExpired,
+ FileNotFoundError,
+ ):
+ return "localhost"
+
+ def _check_gstreamer_elements(self) -> tuple[bool, list[str]]:
+ """
+ Check if required GStreamer elements are available.
+
+ Returns:
+ Tuple of (all_available, missing_elements)
+ """
+ required_elements = [
+ "v4l2src",
+ "nvv4l2h264enc",
+ "h264parse",
+ "hlssink2",
+ ]
+ missing = []
+
+ for elem_name in required_elements:
+ factory = Gst.ElementFactory.find(elem_name)
+ if factory is None:
+ missing.append(elem_name)
+
+ return (len(missing) == 0, missing)
+
+ def _setup_hls_directory(self) -> None:
+ """Create and clean HLS output directory."""
+ if self.hls_dir.exists():
+ shutil.rmtree(self.hls_dir)
+ self.hls_dir.mkdir(parents=True, exist_ok=True)
+
+ def _start_http_server(self) -> None:
+ """Start HTTP server to serve HLS segments."""
+ os.chdir(self.hls_dir)
+
+ class QuietHandler(http_server.SimpleHTTPRequestHandler):
+ def log_message(self, format: str, *args: typing.Any) -> None:
+ pass
+
+ self.http_server = socketserver.TCPServer(("", self.port), QuietHandler)
+ self.server_thread = threading.Thread(
+ target=self.http_server.serve_forever,
+ daemon=True,
+ )
+ self.server_thread.start()
+
+ def _stop_http_server(self) -> None:
+ """Stop HTTP server."""
+ if self.http_server:
+ self.http_server.shutdown()
+ self.http_server.server_close()
+ self.http_server = None
+
+ if self.server_thread:
+ self.server_thread.join(timeout=2.0)
+ self.server_thread = None
+
+ def start_stream(self) -> str:
+ """
+ Start video streaming.
+
+ Returns:
+ Stream URL (HTTP endpoint for HLS playlist)
+
+ Raises:
+ RuntimeError: If stream is already running or GStreamer elements missing
+ """
+ if self._streaming:
+ msg = "Stream is already running"
+ raise RuntimeError(msg)
+
+ available, missing = self._check_gstreamer_elements()
+ if not available:
+ msg = (
+ f"Missing required GStreamer elements: {', '.join(missing)}. "
+ "Install gst-plugins-good, gst-plugins-bad, and Jetson multimedia packages."
+ )
+ raise RuntimeError(
+ msg
+ )
+
+ self._setup_hls_directory()
+
+ pipeline_str = (
+ f"v4l2src device={self.device} ! "
+ f"video/x-raw,width={self.width},height={self.height},framerate=30/1 ! "
+ f"nvv4l2h264enc bitrate=2000000 ! "
+ f"h264parse ! "
+ f"hlssink2 "
+ f"location={self.hls_dir}/segment%05d.ts "
+ f"playlist-location={self.hls_dir}/stream.m3u8 "
+ f"max-files=10 "
+ f"target-duration=2 "
+ f"playlist-length=5"
+ )
+
+ self.pipeline = Gst.parse_launch(pipeline_str)
+
+ if not self.pipeline:
+ msg = "Failed to create GStreamer pipeline"
+ raise RuntimeError(msg)
+
+ ret = self.pipeline.set_state(Gst.State.PLAYING)
+ if ret == Gst.StateChangeReturn.FAILURE:
+ self.pipeline.set_state(Gst.State.NULL)
+ self.pipeline = None
+ msg = "Failed to start GStreamer pipeline"
+ raise RuntimeError(msg)
+
+ self._start_http_server()
+ self._streaming = True
+
+ return self.get_stream_url()
+
+ def stop_stream(self) -> None:
+ """Stop video streaming and cleanup resources."""
+ if not self._streaming:
+ return
+
+ if self.pipeline:
+ self.pipeline.set_state(Gst.State.NULL)
+ self.pipeline = None
+
+ self._stop_http_server()
+
+ if self.hls_dir.exists():
+ with contextlib.suppress(OSError):
+ shutil.rmtree(self.hls_dir)
+
+ self._streaming = False
+
+ def is_streaming(self) -> bool:
+ """
+ Check if currently streaming.
+
+ Returns:
+ True if streaming, False otherwise
+ """
+ return self._streaming
+
+ def get_stream_url(self) -> str:
+ """
+ Get the HLS stream URL.
+
+ Returns:
+ HTTP URL for HLS playlist
+ """
+ ip = self._get_tailscale_ip()
+ return f"http://{ip}:{self.port}/stream.m3u8"
+
+
+def test() -> None:
+ """Basic tests for VideoStreamer."""
+ streamer = VideoStreamer(
+ device="/dev/video0", width=640, height=480, port=8555
+ )
+
+ assert streamer.device == "/dev/video0"
+ assert streamer.width == 640
+ assert streamer.height == 480
+ assert streamer.port == 8555
+ assert not streamer.is_streaming()
+
+ available, _missing = streamer._check_gstreamer_elements()
+ if not available:
+ pass
+
+ streamer.get_stream_url()
+
+
+def main() -> None:
+ """Test the video streamer."""
+ import argparse
+ import signal
+ import time
+
+ parser = argparse.ArgumentParser(
+ description="Video streaming with GStreamer HLS"
+ )
+ parser.add_argument(
+ "--device",
+ default="/dev/video0",
+ help="Video device path (default: /dev/video0)",
+ )
+ parser.add_argument(
+ "--width",
+ type=int,
+ default=1280,
+ help="Video width (default: 1280)",
+ )
+ parser.add_argument(
+ "--height",
+ type=int,
+ default=720,
+ help="Video height (default: 720)",
+ )
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=8554,
+ help="HTTP server port (default: 8554)",
+ )
+ parser.add_argument(
+ "--test",
+ action="store_true",
+ help="Run tests instead of starting stream",
+ )
+
+ args = parser.parse_args()
+
+ if args.test:
+ test()
+ return
+
+ streamer = VideoStreamer(
+ device=args.device,
+ width=args.width,
+ height=args.height,
+ port=args.port,
+ )
+
+ def signal_handler(sig: int, frame: typing.Any) -> None:
+ streamer.stop_stream()
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ try:
+ streamer.start_stream()
+
+ while streamer.is_streaming():
+ time.sleep(1)
+
+ except RuntimeError:
+ sys.exit(1)
+ except Exception:
+ streamer.stop_stream()
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()