summaryrefslogtreecommitdiff
path: root/Biz/Kidcam/Streamer.py
blob: 477954685c37c1beeae04d4f7a41a735d5f1394f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env run.sh
"""Video streaming module for Kidcam using GStreamer with HLS output."""

# : out kidcam-streamer
# : dep pytest
# NOTE: pygobject (GStreamer) only available on Jetson, skip for typecheck

import gi  # type: ignore[import-not-found]
import http.server as http_server
import os
import pathlib
import shutil
import socketserver
import subprocess
import sys
import threading
import typing

gi.require_version("Gst", "1.0")

import contextlib
import gi.repository.Gst as Gst  # type: ignore[import-not-found]


class VideoStreamer:
    """Video streamer using GStreamer for HLS output with hardware encoding."""

    def __init__(
        self,
        device: str = "/dev/video0",
        width: int = 1280,
        height: int = 720,
        port: int = 8554,
    ) -> None:
        """
        Initialize video streamer.

        Args:
            device: Video device path
            width: Video width in pixels
            height: Video height in pixels
            port: HTTP server port for HLS streaming
        """
        self.device = device
        self.width = width
        self.height = height
        self.port = port
        self.hls_dir = pathlib.Path("/tmp/hls")
        self.pipeline: Gst.Element | None = None
        self.http_server: socketserver.TCPServer | None = None
        self.server_thread: threading.Thread | None = None
        self._streaming = False

        Gst.init(None)

    def _get_tailscale_ip(self) -> str:
        """Get the Tailscale IP address of this device."""
        try:
            result = subprocess.run(
                ["tailscale", "ip", "-4"],
                capture_output=True,
                text=True,
                check=True,
                timeout=5,
            )
            return result.stdout.strip()
        except (
            subprocess.CalledProcessError,
            subprocess.TimeoutExpired,
            FileNotFoundError,
        ):
            return "localhost"

    def _check_gstreamer_elements(self) -> tuple[bool, list[str]]:
        """
        Check if required GStreamer elements are available.

        Returns:
            Tuple of (all_available, missing_elements)
        """
        required_elements = [
            "v4l2src",
            "nvv4l2h264enc",
            "h264parse",
            "hlssink2",
        ]
        missing = []

        for elem_name in required_elements:
            factory = Gst.ElementFactory.find(elem_name)
            if factory is None:
                missing.append(elem_name)

        return (len(missing) == 0, missing)

    def _setup_hls_directory(self) -> None:
        """Create and clean HLS output directory."""
        if self.hls_dir.exists():
            shutil.rmtree(self.hls_dir)
        self.hls_dir.mkdir(parents=True, exist_ok=True)

    def _start_http_server(self) -> None:
        """Start HTTP server to serve HLS segments."""
        os.chdir(self.hls_dir)

        class QuietHandler(http_server.SimpleHTTPRequestHandler):
            def log_message(self, format: str, *args: typing.Any) -> None:
                pass

        self.http_server = socketserver.TCPServer(("", self.port), QuietHandler)
        self.server_thread = threading.Thread(
            target=self.http_server.serve_forever,
            daemon=True,
        )
        self.server_thread.start()

    def _stop_http_server(self) -> None:
        """Stop HTTP server."""
        if self.http_server:
            self.http_server.shutdown()
            self.http_server.server_close()
            self.http_server = None

        if self.server_thread:
            self.server_thread.join(timeout=2.0)
            self.server_thread = None

    def start_stream(self) -> str:
        """
        Start video streaming.

        Returns:
            Stream URL (HTTP endpoint for HLS playlist)

        Raises:
            RuntimeError: If stream is already running or GStreamer elements missing
        """
        if self._streaming:
            msg = "Stream is already running"
            raise RuntimeError(msg)

        available, missing = self._check_gstreamer_elements()
        if not available:
            msg = (
                f"Missing required GStreamer elements: {', '.join(missing)}. "
                "Install gst-plugins-good, gst-plugins-bad, and Jetson multimedia packages."
            )
            raise RuntimeError(
                msg
            )

        self._setup_hls_directory()

        pipeline_str = (
            f"v4l2src device={self.device} ! "
            f"video/x-raw,width={self.width},height={self.height},framerate=30/1 ! "
            f"nvv4l2h264enc bitrate=2000000 ! "
            f"h264parse ! "
            f"hlssink2 "
            f"location={self.hls_dir}/segment%05d.ts "
            f"playlist-location={self.hls_dir}/stream.m3u8 "
            f"max-files=10 "
            f"target-duration=2 "
            f"playlist-length=5"
        )

        self.pipeline = Gst.parse_launch(pipeline_str)

        if not self.pipeline:
            msg = "Failed to create GStreamer pipeline"
            raise RuntimeError(msg)

        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            self.pipeline.set_state(Gst.State.NULL)
            self.pipeline = None
            msg = "Failed to start GStreamer pipeline"
            raise RuntimeError(msg)

        self._start_http_server()
        self._streaming = True

        return self.get_stream_url()

    def stop_stream(self) -> None:
        """Stop video streaming and cleanup resources."""
        if not self._streaming:
            return

        if self.pipeline:
            self.pipeline.set_state(Gst.State.NULL)
            self.pipeline = None

        self._stop_http_server()

        if self.hls_dir.exists():
            with contextlib.suppress(OSError):
                shutil.rmtree(self.hls_dir)

        self._streaming = False

    def is_streaming(self) -> bool:
        """
        Check if currently streaming.

        Returns:
            True if streaming, False otherwise
        """
        return self._streaming

    def get_stream_url(self) -> str:
        """
        Get the HLS stream URL.

        Returns:
            HTTP URL for HLS playlist
        """
        ip = self._get_tailscale_ip()
        return f"http://{ip}:{self.port}/stream.m3u8"


def test() -> None:
    """Basic tests for VideoStreamer."""
    streamer = VideoStreamer(
        device="/dev/video0", width=640, height=480, port=8555
    )

    assert streamer.device == "/dev/video0"
    assert streamer.width == 640
    assert streamer.height == 480
    assert streamer.port == 8555
    assert not streamer.is_streaming()

    available, _missing = streamer._check_gstreamer_elements()
    if not available:
        pass

    streamer.get_stream_url()


def main() -> None:
    """Test the video streamer."""
    import argparse
    import signal
    import time

    parser = argparse.ArgumentParser(
        description="Video streaming with GStreamer HLS"
    )
    parser.add_argument(
        "--device",
        default="/dev/video0",
        help="Video device path (default: /dev/video0)",
    )
    parser.add_argument(
        "--width",
        type=int,
        default=1280,
        help="Video width (default: 1280)",
    )
    parser.add_argument(
        "--height",
        type=int,
        default=720,
        help="Video height (default: 720)",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=8554,
        help="HTTP server port (default: 8554)",
    )
    parser.add_argument(
        "--test",
        action="store_true",
        help="Run tests instead of starting stream",
    )

    args = parser.parse_args()

    if args.test:
        test()
        return

    streamer = VideoStreamer(
        device=args.device,
        width=args.width,
        height=args.height,
        port=args.port,
    )

    def signal_handler(sig: int, frame: typing.Any) -> None:
        streamer.stop_stream()
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    try:
        streamer.start_stream()

        while streamer.is_streaming():
            time.sleep(1)

    except RuntimeError:
        sys.exit(1)
    except Exception:
        streamer.stop_stream()
        sys.exit(1)


if __name__ == "__main__":
    main()