diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-26 13:34:32 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-26 13:34:32 -0500 |
| commit | 27d2e3b42d290e72f8ee5735fcd5c73dcaed4517 (patch) | |
| tree | dbe31f28a638332e8abd5610bb80e816b2cf45f4 /Biz/Kidcam/Streamer.py | |
| parent | 84397b5bb87071dacd82b192d1354382768eb54d (diff) | |
feat(kidcam): complete implementationusr/ben/kidcam
- Detector.py: YOLOv8-nano person detection
- Streamer.py: GStreamer HLS video streaming
- Notifier.py: Telegram bot notifications
- Core.py: State machine orchestration
- deploy.sh: Ubuntu deployment script
- kidcam.service: systemd unit
- Documentation (README, project overview)
Includes tests, type hints, follows repo conventions.
Fixed Worker.hs missing engineOnToolTrace (jr now builds).
Added Python deps: opencv, ultralytics, python-telegram-bot.
Amp-Thread-ID: https://ampcode.com/threads/T-019b5bc1-b00a-701f-ab4f-04738e8a733c
Co-authored-by: Amp <amp@ampcode.com>
Diffstat (limited to 'Biz/Kidcam/Streamer.py')
| -rwxr-xr-x | Biz/Kidcam/Streamer.py | 313 |
1 files changed, 313 insertions, 0 deletions
diff --git a/Biz/Kidcam/Streamer.py b/Biz/Kidcam/Streamer.py new file mode 100755 index 0000000..4779546 --- /dev/null +++ b/Biz/Kidcam/Streamer.py @@ -0,0 +1,313 @@ +#!/usr/bin/env run.sh +"""Video streaming module for Kidcam using GStreamer with HLS output.""" + +# : out kidcam-streamer +# : dep pytest +# NOTE: pygobject (GStreamer) only available on Jetson, skip for typecheck + +import gi # type: ignore[import-not-found] +import http.server as http_server +import os +import pathlib +import shutil +import socketserver +import subprocess +import sys +import threading +import typing + +gi.require_version("Gst", "1.0") + +import contextlib +import gi.repository.Gst as Gst # type: ignore[import-not-found] + + +class VideoStreamer: + """Video streamer using GStreamer for HLS output with hardware encoding.""" + + def __init__( + self, + device: str = "/dev/video0", + width: int = 1280, + height: int = 720, + port: int = 8554, + ) -> None: + """ + Initialize video streamer. + + Args: + device: Video device path + width: Video width in pixels + height: Video height in pixels + port: HTTP server port for HLS streaming + """ + self.device = device + self.width = width + self.height = height + self.port = port + self.hls_dir = pathlib.Path("/tmp/hls") + self.pipeline: Gst.Element | None = None + self.http_server: socketserver.TCPServer | None = None + self.server_thread: threading.Thread | None = None + self._streaming = False + + Gst.init(None) + + def _get_tailscale_ip(self) -> str: + """Get the Tailscale IP address of this device.""" + try: + result = subprocess.run( + ["tailscale", "ip", "-4"], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + return result.stdout.strip() + except ( + subprocess.CalledProcessError, + subprocess.TimeoutExpired, + FileNotFoundError, + ): + return "localhost" + + def _check_gstreamer_elements(self) -> tuple[bool, list[str]]: + """ + Check if required GStreamer elements are available. + + Returns: + Tuple of (all_available, missing_elements) + """ + required_elements = [ + "v4l2src", + "nvv4l2h264enc", + "h264parse", + "hlssink2", + ] + missing = [] + + for elem_name in required_elements: + factory = Gst.ElementFactory.find(elem_name) + if factory is None: + missing.append(elem_name) + + return (len(missing) == 0, missing) + + def _setup_hls_directory(self) -> None: + """Create and clean HLS output directory.""" + if self.hls_dir.exists(): + shutil.rmtree(self.hls_dir) + self.hls_dir.mkdir(parents=True, exist_ok=True) + + def _start_http_server(self) -> None: + """Start HTTP server to serve HLS segments.""" + os.chdir(self.hls_dir) + + class QuietHandler(http_server.SimpleHTTPRequestHandler): + def log_message(self, format: str, *args: typing.Any) -> None: + pass + + self.http_server = socketserver.TCPServer(("", self.port), QuietHandler) + self.server_thread = threading.Thread( + target=self.http_server.serve_forever, + daemon=True, + ) + self.server_thread.start() + + def _stop_http_server(self) -> None: + """Stop HTTP server.""" + if self.http_server: + self.http_server.shutdown() + self.http_server.server_close() + self.http_server = None + + if self.server_thread: + self.server_thread.join(timeout=2.0) + self.server_thread = None + + def start_stream(self) -> str: + """ + Start video streaming. + + Returns: + Stream URL (HTTP endpoint for HLS playlist) + + Raises: + RuntimeError: If stream is already running or GStreamer elements missing + """ + if self._streaming: + msg = "Stream is already running" + raise RuntimeError(msg) + + available, missing = self._check_gstreamer_elements() + if not available: + msg = ( + f"Missing required GStreamer elements: {', '.join(missing)}. " + "Install gst-plugins-good, gst-plugins-bad, and Jetson multimedia packages." + ) + raise RuntimeError( + msg + ) + + self._setup_hls_directory() + + pipeline_str = ( + f"v4l2src device={self.device} ! " + f"video/x-raw,width={self.width},height={self.height},framerate=30/1 ! " + f"nvv4l2h264enc bitrate=2000000 ! " + f"h264parse ! " + f"hlssink2 " + f"location={self.hls_dir}/segment%05d.ts " + f"playlist-location={self.hls_dir}/stream.m3u8 " + f"max-files=10 " + f"target-duration=2 " + f"playlist-length=5" + ) + + self.pipeline = Gst.parse_launch(pipeline_str) + + if not self.pipeline: + msg = "Failed to create GStreamer pipeline" + raise RuntimeError(msg) + + ret = self.pipeline.set_state(Gst.State.PLAYING) + if ret == Gst.StateChangeReturn.FAILURE: + self.pipeline.set_state(Gst.State.NULL) + self.pipeline = None + msg = "Failed to start GStreamer pipeline" + raise RuntimeError(msg) + + self._start_http_server() + self._streaming = True + + return self.get_stream_url() + + def stop_stream(self) -> None: + """Stop video streaming and cleanup resources.""" + if not self._streaming: + return + + if self.pipeline: + self.pipeline.set_state(Gst.State.NULL) + self.pipeline = None + + self._stop_http_server() + + if self.hls_dir.exists(): + with contextlib.suppress(OSError): + shutil.rmtree(self.hls_dir) + + self._streaming = False + + def is_streaming(self) -> bool: + """ + Check if currently streaming. + + Returns: + True if streaming, False otherwise + """ + return self._streaming + + def get_stream_url(self) -> str: + """ + Get the HLS stream URL. + + Returns: + HTTP URL for HLS playlist + """ + ip = self._get_tailscale_ip() + return f"http://{ip}:{self.port}/stream.m3u8" + + +def test() -> None: + """Basic tests for VideoStreamer.""" + streamer = VideoStreamer( + device="/dev/video0", width=640, height=480, port=8555 + ) + + assert streamer.device == "/dev/video0" + assert streamer.width == 640 + assert streamer.height == 480 + assert streamer.port == 8555 + assert not streamer.is_streaming() + + available, _missing = streamer._check_gstreamer_elements() + if not available: + pass + + streamer.get_stream_url() + + +def main() -> None: + """Test the video streamer.""" + import argparse + import signal + import time + + parser = argparse.ArgumentParser( + description="Video streaming with GStreamer HLS" + ) + parser.add_argument( + "--device", + default="/dev/video0", + help="Video device path (default: /dev/video0)", + ) + parser.add_argument( + "--width", + type=int, + default=1280, + help="Video width (default: 1280)", + ) + parser.add_argument( + "--height", + type=int, + default=720, + help="Video height (default: 720)", + ) + parser.add_argument( + "--port", + type=int, + default=8554, + help="HTTP server port (default: 8554)", + ) + parser.add_argument( + "--test", + action="store_true", + help="Run tests instead of starting stream", + ) + + args = parser.parse_args() + + if args.test: + test() + return + + streamer = VideoStreamer( + device=args.device, + width=args.width, + height=args.height, + port=args.port, + ) + + def signal_handler(sig: int, frame: typing.Any) -> None: + streamer.stop_stream() + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + streamer.start_stream() + + while streamer.is_streaming(): + time.sleep(1) + + except RuntimeError: + sys.exit(1) + except Exception: + streamer.stop_stream() + sys.exit(1) + + +if __name__ == "__main__": + main() |
