summaryrefslogtreecommitdiff
path: root/Biz
diff options
context:
space:
mode:
Diffstat (limited to 'Biz')
-rw-r--r--Biz/Kidcam.md35
-rw-r--r--Biz/Kidcam.nix20
-rwxr-xr-xBiz/Kidcam/Core.py271
-rwxr-xr-xBiz/Kidcam/Detector.py135
-rwxr-xr-xBiz/Kidcam/Notifier.py144
-rw-r--r--Biz/Kidcam/README.md403
-rwxr-xr-xBiz/Kidcam/Streamer.py313
-rwxr-xr-xBiz/Kidcam/deploy.sh218
-rw-r--r--Biz/Kidcam/kidcam.service31
9 files changed, 1570 insertions, 0 deletions
diff --git a/Biz/Kidcam.md b/Biz/Kidcam.md
new file mode 100644
index 0000000..7a9733a
--- /dev/null
+++ b/Biz/Kidcam.md
@@ -0,0 +1,35 @@
+# Kidcam
+
+Motion-activated streaming camera system for family notifications.
+
+## Purpose
+
+Detect when kids are playing and automatically stream video with Telegram notifications to family members.
+
+## Hardware
+
+- **Platform**: NVIDIA Jetson Nano
+- **Camera**: Logitech C920 webcam
+- **OS**: Ubuntu 20.04 + JetPack 4.6.x (Jetson Nano unsupported on NixOS)
+
+## Architecture
+
+```
+Person Detection → Video Streaming → Telegram Notifications
+```
+
+1. **Detector**: CV-based person/motion detection using Jetson GPU
+2. **Streamer**: RTSP/HTTP video stream serving
+3. **Notifier**: Telegram bot integration for family alerts
+4. **Core**: Main application loop and configuration
+
+## Deployment
+
+Runs on Ubuntu 20.04 with systemd services (not NixOS). The Nix environment is for development/testing only.
+
+## Components
+
+- `Detector.py` - Person/motion detection
+- `Streamer.py` - Video stream management
+- `Notifier.py` - Telegram notification handler
+- `Core.py` - Main application entry point
diff --git a/Biz/Kidcam.nix b/Biz/Kidcam.nix
new file mode 100644
index 0000000..f12a8d5
--- /dev/null
+++ b/Biz/Kidcam.nix
@@ -0,0 +1,20 @@
+{ pkgs ? import <nixpkgs> { } }:
+
+# Development environment only.
+# Actual deployment is Ubuntu 20.04 + JetPack 4.6.x on Jetson Nano.
+# Jetson Nano is unsupported on NixOS.
+
+pkgs.python3Packages.buildPythonApplication {
+ pname = "kidcam";
+ version = "0.1.0";
+ src = ./.;
+
+ propagatedBuildInputs = with pkgs.python3Packages; [
+ # Add dependencies as needed
+ ];
+
+ meta = {
+ description = "Motion-activated streaming camera for family";
+ platforms = pkgs.lib.platforms.linux;
+ };
+}
diff --git a/Biz/Kidcam/Core.py b/Biz/Kidcam/Core.py
new file mode 100755
index 0000000..5ac3247
--- /dev/null
+++ b/Biz/Kidcam/Core.py
@@ -0,0 +1,271 @@
+#!/usr/bin/env run.sh
+"""Main orchestration service for Kidcam."""
+
+# : out kidcam
+# : dep pytest
+# : dep pytest-asyncio
+import asyncio
+import enum
+import logging
+import Omni.Log as Log # type: ignore
+import os
+import pathlib
+import signal
+import sys
+import time
+import typing
+
+logger = logging.getLogger(__name__)
+
+
+class State(enum.Enum):
+ """State machine states for Kidcam service."""
+
+ IDLE = "idle"
+ ACTIVE = "active"
+ COOLDOWN = "cooldown"
+
+
+class Config(typing.TypedDict):
+ """Configuration for Kidcam service."""
+
+ camera_device: str
+ stream_port: int
+ detection_confidence: float
+ cooldown_minutes: int
+ telegram_bot_token: str
+ telegram_chat_id: str
+
+
+def load_config(config_path: str | None = None) -> Config:
+ """Load configuration from file or environment variables."""
+ if config_path and pathlib.Path(config_path).exists():
+ import json
+
+ with pathlib.Path(config_path).open(encoding="utf-8") as f:
+ data = json.load(f)
+ return typing.cast("Config", data)
+
+ return {
+ "camera_device": os.getenv("CAMERA_DEVICE", "/dev/video0"),
+ "stream_port": int(os.getenv("STREAM_PORT", "8554")),
+ "detection_confidence": float(os.getenv("DETECTION_CONFIDENCE", "0.5")),
+ "cooldown_minutes": int(os.getenv("COOLDOWN_MINUTES", "5")),
+ "telegram_bot_token": os.getenv("TELEGRAM_BOT_TOKEN", ""),
+ "telegram_chat_id": os.getenv("TELEGRAM_CHAT_ID", ""),
+ }
+
+
+class KidcamService:
+ """Main orchestration service for Kidcam."""
+
+ def __init__(
+ self: "KidcamService", config_path: str | None = None
+ ) -> None:
+ """Initialize the Kidcam service with configuration."""
+ self.config = load_config(config_path)
+ self.state = State.IDLE
+ self.last_detection_time = 0.0
+ self.running = False
+ self.shutdown_event = asyncio.Event()
+
+ self.detector: typing.Any | None = None
+ self.streamer: typing.Any | None = None
+ self.notifier: typing.Any | None = None
+
+ logger.info("Kidcam service initialized")
+ logger.debug(f"Config: {self.config}")
+
+ async def _setup_components(self: "KidcamService") -> None:
+ """Initialize detector, streamer, and notifier components."""
+ try:
+ import Biz.Kidcam.Detector as Detector
+ import Biz.Kidcam.Notifier as Notifier
+ import Biz.Kidcam.Streamer as Streamer
+
+ self.detector = Detector.PersonDetector(
+ model_path="yolov8n.pt",
+ confidence_threshold=self.config["detection_confidence"],
+ )
+ self.streamer = Streamer.VideoStreamer(
+ device=self.config["camera_device"],
+ port=self.config["stream_port"],
+ )
+ self.notifier = Notifier.TelegramNotifier(
+ bot_token=self.config["telegram_bot_token"],
+ chat_id=self.config["telegram_chat_id"],
+ )
+ logger.info("All components initialized successfully")
+ except ImportError as e:
+ logger.warning("Component not available: %s", e)
+ logger.warning("Running in stub mode")
+
+ async def _transition_to_idle(self: "KidcamService") -> None:
+ """Transition to IDLE state."""
+ logger.info("State: IDLE - Monitoring for person detection")
+ self.state = State.IDLE
+
+ async def _transition_to_active(self: "KidcamService") -> None:
+ """Transition to ACTIVE state."""
+ logger.info("State: ACTIVE - Person detected, starting stream")
+ self.state = State.ACTIVE
+ self.last_detection_time = time.time()
+
+ if self.streamer:
+ try:
+ await self.streamer.start()
+ logger.info(
+ f"Stream started on port {self.config['stream_port']}"
+ )
+ except Exception as e:
+ logger.exception("Failed to start stream: %s", e)
+
+ if self.notifier:
+ try:
+ stream_url = (
+ f"rtsp://kidcam:{self.config['stream_port']}/stream"
+ )
+ await self.notifier.send_notification(
+ f"šŸ‘¶ Person detected! Stream available at {stream_url}"
+ )
+ logger.info("Notification sent")
+ except Exception as e:
+ logger.exception("Failed to send notification: %s", e)
+
+ async def _transition_to_cooldown(self: "KidcamService") -> None:
+ """Transition to COOLDOWN state."""
+ logger.info(
+ f"State: COOLDOWN - {self.config['cooldown_minutes']} minute cooldown started"
+ )
+ self.state = State.COOLDOWN
+
+ if self.streamer:
+ try:
+ await self.streamer.stop()
+ logger.info("Stream stopped")
+ except Exception as e:
+ logger.exception("Failed to stop stream: %s", e)
+
+ async def _check_cooldown_expired(self: "KidcamService") -> bool:
+ """Check if cooldown period has expired."""
+ if self.state != State.COOLDOWN:
+ return False
+
+ elapsed_minutes = (time.time() - self.last_detection_time) / 60
+ if elapsed_minutes >= self.config["cooldown_minutes"]:
+ logger.info("Cooldown period expired")
+ return True
+ return False
+
+ async def _detect_person(self: "KidcamService") -> bool:
+ """Check if a person is detected by the detector."""
+ if not self.detector:
+ return False
+
+ try:
+ return await self.detector.detect()
+ except Exception as e:
+ logger.exception("Detection error: %s", e)
+ return False
+
+ async def run(self: "KidcamService") -> None:
+ """Main event loop for the Kidcam service."""
+ self.running = True
+ await self._setup_components()
+ await self._transition_to_idle()
+
+ logger.info("Starting main event loop")
+
+ try:
+ while self.running:
+ if await self.shutdown_event.wait():
+ break
+
+ if self.state == State.IDLE:
+ if await self._detect_person():
+ await self._transition_to_active()
+ await asyncio.sleep(1.0)
+
+ elif self.state == State.ACTIVE:
+ if not await self._detect_person():
+ await self._transition_to_cooldown()
+ await asyncio.sleep(1.0)
+
+ elif self.state == State.COOLDOWN:
+ if await self._check_cooldown_expired():
+ await self._transition_to_idle()
+ await asyncio.sleep(10.0)
+
+ except Exception as e:
+ logger.exception("Error in main loop: %s", e)
+ raise
+ finally:
+ await self.shutdown()
+
+ async def shutdown(self: "KidcamService") -> None:
+ """Cleanup and shutdown the service gracefully."""
+ logger.info("Shutting down Kidcam service")
+ self.running = False
+
+ if self.streamer:
+ try:
+ await self.streamer.stop()
+ except Exception as e:
+ logger.exception("Error stopping streamer: %s", e)
+
+ if self.detector:
+ try:
+ await self.detector.close()
+ except Exception as e:
+ logger.exception("Error closing detector: %s", e)
+
+ logger.info("Shutdown complete")
+
+
+def _setup_signal_handlers(service: KidcamService) -> None:
+ """Setup graceful shutdown on SIGTERM and SIGINT."""
+
+ def signal_handler(sig: int, frame: typing.Any) -> None:
+ logger.info("Received signal %s, initiating shutdown", sig)
+ service.shutdown_event.set()
+
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGINT, signal_handler)
+
+
+async def main() -> None:
+ """Entrypoint to start the Kidcam service."""
+ _ = Log.setup(logger, level=logging.INFO)
+
+ config_path = sys.argv[1] if len(sys.argv) > 1 else None
+ service = KidcamService(config_path=config_path)
+
+ _setup_signal_handlers(service)
+
+ logger.info("Starting Kidcam service")
+ await service.run()
+
+
+def test() -> None:
+ """Test the Kidcam service."""
+ _ = Log.setup(logger, level=logging.DEBUG)
+
+ logger.info("Testing configuration loading")
+ config = load_config()
+ assert config["camera_device"] == "/dev/video0"
+ assert config["stream_port"] == 8554
+ assert config["detection_confidence"] == 0.5
+ assert config["cooldown_minutes"] == 5
+
+ logger.info("Testing state transitions")
+ service = KidcamService()
+ assert service.state == State.IDLE
+
+ logger.info("Testing shutdown")
+ asyncio.run(service.shutdown())
+
+ logger.info("All tests passed")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/Biz/Kidcam/Detector.py b/Biz/Kidcam/Detector.py
new file mode 100755
index 0000000..f2af9ad
--- /dev/null
+++ b/Biz/Kidcam/Detector.py
@@ -0,0 +1,135 @@
+#!/usr/bin/env run.sh
+"""Person detection module for Kidcam using YOLOv8-nano."""
+
+# : out kidcam-detector
+# : dep opencv-python
+# : dep ultralytics
+# : dep numpy
+# : dep pytest
+import cv2 as cv
+import numpy as np
+import sys
+
+
+class PersonDetector:
+ """Detect persons in video frames using YOLOv8-nano."""
+
+ def __init__(
+ self,
+ model_path: str,
+ confidence_threshold: float = 0.5,
+ ) -> None:
+ """Initialize person detector.
+
+ Args:
+ model_path: Path to YOLOv8 model file
+ confidence_threshold: Minimum confidence for detection (0.0-1.0)
+ """
+ try:
+ import ultralytics as ul
+
+ self.model = ul.YOLO(model_path)
+ self.confidence_threshold = confidence_threshold
+ self.last_bbox: tuple[int, int, int, int] | None = None
+ except ImportError as e:
+ msg = "ultralytics library required for YOLOv8"
+ raise ImportError(msg) from e
+
+ def detect_person(self, frame: np.ndarray) -> bool:
+ """Detect if person is present in frame.
+
+ Args:
+ frame: BGR image from OpenCV
+
+ Returns:
+ True if person detected above confidence threshold
+ """
+ results = self.model(frame, verbose=False)
+ self.last_bbox = None
+
+ for result in results:
+ boxes = result.boxes
+ for box in boxes:
+ class_id = int(box.cls[0])
+ confidence = float(box.conf[0])
+
+ if class_id == 0 and confidence >= self.confidence_threshold:
+ x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
+ self.last_bbox = (int(x1), int(y1), int(x2), int(y2))
+ return True
+
+ return False
+
+ def get_last_detection_bbox(
+ self,
+ ) -> tuple[int, int, int, int] | None:
+ """Get bounding box from last detection.
+
+ Returns:
+ Tuple of (x1, y1, x2, y2) or None if no recent detection
+ """
+ return self.last_bbox
+
+
+def main() -> None:
+ """Test person detection with webcam."""
+ if "test" in sys.argv:
+ test()
+ return
+
+ try:
+ cap = cv.VideoCapture("/dev/video0")
+ if not cap.isOpened():
+ sys.exit(1)
+
+ detector = PersonDetector("yolov8n.pt", confidence_threshold=0.5)
+
+ while True:
+ ret, frame = cap.read()
+ if not ret:
+ break
+
+ if detector.detect_person(frame):
+ bbox = detector.get_last_detection_bbox()
+ if bbox:
+ x1, y1, x2, y2 = bbox
+ cv.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
+
+ cv.imshow("Kidcam - Person Detection", frame)
+
+ if cv.waitKey(1) & 0xFF == ord("q"):
+ break
+
+ except FileNotFoundError:
+ sys.exit(1)
+ except Exception:
+ sys.exit(1)
+ finally:
+ if "cap" in locals():
+ cap.release()
+ cv.destroyAllWindows()
+
+
+def test() -> None:
+ """Basic unit tests for PersonDetector."""
+ test_frame = np.zeros((480, 640, 3), dtype=np.uint8)
+
+ try:
+ detector = PersonDetector("yolov8n.pt", confidence_threshold=0.5)
+ assert detector.confidence_threshold == 0.5
+ assert detector.get_last_detection_bbox() is None
+
+ result = detector.detect_person(test_frame)
+ assert isinstance(result, bool)
+
+ bbox = detector.get_last_detection_bbox()
+ assert bbox is None or (isinstance(bbox, tuple) and len(bbox) == 4)
+
+ except ImportError:
+ pass
+ except Exception:
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/Biz/Kidcam/Notifier.py b/Biz/Kidcam/Notifier.py
new file mode 100755
index 0000000..50d5d8a
--- /dev/null
+++ b/Biz/Kidcam/Notifier.py
@@ -0,0 +1,144 @@
+#!/usr/bin/env run.sh
+"""Telegram notification module for Kidcam."""
+
+# : out kidcam-notifier
+# : dep python-telegram-bot
+# : dep pytest
+# : dep pytest-asyncio
+import asyncio
+import datetime as dt
+import logging
+import Omni.Test as Test
+import os
+import pytest
+import typing
+import unittest.mock as mock
+
+
+class TelegramNotifier:
+ def __init__(self, bot_token: str, chat_id: str) -> None:
+ self.bot_token = bot_token
+ self.chat_id = chat_id
+ self.logger = logging.getLogger(__name__)
+
+ async def send_notification(
+ self, stream_url: str, message: str = "Kids are playing!"
+ ) -> bool:
+ try:
+ import telegram as tg # type: ignore[import-not-found,unused-ignore]
+
+ bot = tg.Bot(token=self.bot_token)
+ timestamp = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ full_message = (
+ f"{message}\n\nšŸŽ„ Watch: {stream_url}\nšŸ“… {timestamp}"
+ )
+
+ await bot.send_message(chat_id=self.chat_id, text=full_message)
+ self.logger.info("Notification sent: %s", message)
+ return True
+
+ except Exception as e:
+ self.logger.exception("Failed to send notification: %s", e)
+ return False
+
+ async def send_stream_ended(self) -> bool:
+ timestamp = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ message = f"Stream ended\nšŸ“… {timestamp}"
+
+ try:
+ import telegram as tg # type: ignore[import-not-found,unused-ignore]
+
+ bot = tg.Bot(token=self.bot_token)
+ await bot.send_message(chat_id=self.chat_id, text=message)
+ self.logger.info("Stream ended notification sent")
+ return True
+
+ except Exception as e:
+ self.logger.exception("Failed to send stream ended notification: %s", e)
+ return False
+
+
+def from_env() -> TelegramNotifier:
+ bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
+ chat_id = os.getenv("TELEGRAM_CHAT_ID")
+
+ if not bot_token:
+ msg = "TELEGRAM_BOT_TOKEN environment variable not set"
+ raise ValueError(msg)
+ if not chat_id:
+ msg = "TELEGRAM_CHAT_ID environment variable not set"
+ raise ValueError(msg)
+
+ return TelegramNotifier(bot_token=bot_token, chat_id=chat_id)
+
+
+async def main() -> None:
+ notifier = from_env()
+ stream_url = "https://example.com/stream/123"
+ await notifier.send_notification(stream_url)
+
+
+class TestTelegramNotifier(Test.TestCase):
+ @mock.patch.dict(
+ os.environ,
+ {"TELEGRAM_BOT_TOKEN": "test_token", "TELEGRAM_CHAT_ID": "test_chat"},
+ )
+ def test_from_env(self) -> None:
+ notifier = from_env()
+ self.assertEqual(notifier.bot_token, "test_token")
+ self.assertEqual(notifier.chat_id, "test_chat")
+
+ def test_from_env_missing_token(self) -> None:
+ with pytest.raises(ValueError):
+ from_env()
+
+ @mock.patch("telegram.Bot")
+ async def test_send_notification(self, mock_bot_class: typing.Any) -> None:
+ mock_bot = mock.AsyncMock()
+ mock_bot_class.return_value = mock_bot
+
+ notifier = TelegramNotifier(bot_token="test_token", chat_id="test_chat")
+ result = await notifier.send_notification("https://example.com/stream")
+
+ self.assertTrue(result)
+ mock_bot.send_message.assert_called_once()
+ call_args = mock_bot.send_message.call_args
+ self.assertEqual(call_args.kwargs["chat_id"], "test_chat")
+ self.assertIn("Kids are playing!", call_args.kwargs["text"])
+ self.assertIn("https://example.com/stream", call_args.kwargs["text"])
+
+ @mock.patch("telegram.Bot")
+ async def test_send_stream_ended(self, mock_bot_class: typing.Any) -> None:
+ mock_bot = mock.AsyncMock()
+ mock_bot_class.return_value = mock_bot
+
+ notifier = TelegramNotifier(bot_token="test_token", chat_id="test_chat")
+ result = await notifier.send_stream_ended()
+
+ self.assertTrue(result)
+ mock_bot.send_message.assert_called_once()
+ call_args = mock_bot.send_message.call_args
+ self.assertIn("Stream ended", call_args.kwargs["text"])
+
+ @mock.patch("telegram.Bot")
+ async def test_send_notification_error(
+ self, mock_bot_class: typing.Any
+ ) -> None:
+ mock_bot = mock.AsyncMock()
+ mock_bot.send_message.side_effect = Exception("Network error")
+ mock_bot_class.return_value = mock_bot
+
+ notifier = TelegramNotifier(bot_token="test_token", chat_id="test_chat")
+ result = await notifier.send_notification("https://example.com/stream")
+
+ self.assertFalse(result)
+
+
+def test() -> None:
+ import Omni.App as App
+
+ Test.run(App.Area.Test, [TestTelegramNotifier])
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/Biz/Kidcam/README.md b/Biz/Kidcam/README.md
new file mode 100644
index 0000000..fa69c3c
--- /dev/null
+++ b/Biz/Kidcam/README.md
@@ -0,0 +1,403 @@
+# Kidcam - Motion-Activated Family Video Streaming
+
+Kidcam is an intelligent video monitoring system that detects people, sends Telegram notifications, and provides RTSP streaming. Designed for Jetson Nano and similar ARM-based systems.
+
+## Quick Start
+
+```bash
+# Clone repository
+git clone git@simatime.com:omni.git
+cd omni
+
+# Deploy (on Jetson Nano running Ubuntu 20.04)
+sudo Biz/Kidcam/deploy.sh
+
+# Configure Telegram credentials
+sudo nano /etc/kidcam/config.env
+
+# Start service
+sudo systemctl start kidcam
+
+# Check status
+sudo systemctl status kidcam
+sudo journalctl -u kidcam -f
+```
+
+## Hardware Requirements
+
+### Minimum
+- **Device**: NVIDIA Jetson Nano (4GB recommended)
+- **OS**: Ubuntu 20.04 LTS (ARM64)
+- **Camera**: USB webcam or CSI camera module
+- **Storage**: 16GB+ microSD card
+- **Network**: WiFi or Ethernet connection
+
+### Recommended
+- 32GB+ microSD card for logging and model caching
+- 5V 4A power supply for Jetson Nano
+- Camera with 1080p resolution
+
+## Installation
+
+### Automated Deployment
+
+The deployment script handles everything:
+
+```bash
+sudo Biz/Kidcam/deploy.sh
+```
+
+This script:
+1. Verifies system is Ubuntu 20.04 on aarch64
+2. Creates `kidcam` system user
+3. Installs system dependencies (Python, OpenCV, GStreamer)
+4. Sets up Python virtual environment at `/opt/kidcam/venv`
+5. Installs Python packages (ultralytics, telegram bot, opencv)
+6. Creates directories: `/opt/kidcam` and `/etc/kidcam`
+7. Installs and enables systemd service
+8. Creates example configuration file
+
+### Manual Installation
+
+If you prefer manual setup:
+
+```bash
+# 1. Install system dependencies
+sudo apt-get update
+sudo apt-get install -y python3 python3-venv python3-opencv \
+ gstreamer1.0-tools v4l-utils
+
+# 2. Create user and directories
+sudo useradd --system --no-create-home kidcam
+sudo usermod -aG video kidcam
+sudo mkdir -p /opt/kidcam /etc/kidcam
+
+# 3. Copy project files
+sudo rsync -av ./ /opt/kidcam/
+
+# 4. Setup Python environment
+sudo python3 -m venv /opt/kidcam/venv
+sudo /opt/kidcam/venv/bin/pip install ultralytics python-telegram-bot opencv-python
+
+# 5. Install systemd service
+sudo cp Biz/Kidcam/kidcam.service /etc/systemd/system/
+sudo systemctl daemon-reload
+sudo systemctl enable kidcam
+```
+
+## Configuration
+
+Edit `/etc/kidcam/config.env`:
+
+```bash
+# Camera Configuration
+CAMERA_DEVICE=/dev/video0 # USB camera (use /dev/video1 for CSI)
+STREAM_PORT=8554 # RTSP streaming port
+
+# Detection Parameters
+DETECTION_CONFIDENCE=0.5 # Confidence threshold (0.0-1.0)
+COOLDOWN_MINUTES=5 # Minutes before re-triggering
+
+# Telegram Bot Setup
+TELEGRAM_BOT_TOKEN=123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11
+TELEGRAM_CHAT_ID=123456789
+
+# Python Path
+PYTHONPATH=/opt/kidcam
+```
+
+### Getting Telegram Credentials
+
+1. **Create Bot**:
+ - Message [@BotFather](https://t.me/BotFather) on Telegram
+ - Send `/newbot` and follow instructions
+ - Copy the bot token
+
+2. **Get Chat ID**:
+ - Start a chat with your new bot
+ - Send any message
+ - Visit: `https://api.telegram.org/bot<YOUR_BOT_TOKEN>/getUpdates`
+ - Find `"chat":{"id":123456789}` in the JSON response
+
+### Camera Device Selection
+
+```bash
+# List available cameras
+v4l2-ctl --list-devices
+
+# Test camera
+ffmpeg -f v4l2 -i /dev/video0 -frames 1 test.jpg
+```
+
+Common devices:
+- `/dev/video0` - USB webcam
+- `/dev/video1` - CSI camera (Jetson)
+
+## Service Management
+
+```bash
+# Start service
+sudo systemctl start kidcam
+
+# Stop service
+sudo systemctl stop kidcam
+
+# Restart service
+sudo systemctl restart kidcam
+
+# View status
+sudo systemctl status kidcam
+
+# Enable auto-start on boot
+sudo systemctl enable kidcam
+
+# Disable auto-start
+sudo systemctl disable kidcam
+
+# View logs (live)
+sudo journalctl -u kidcam -f
+
+# View last 100 lines
+sudo journalctl -u kidcam -n 100
+
+# View logs since boot
+sudo journalctl -u kidcam -b
+```
+
+## Testing Individual Modules
+
+### Test Camera Access
+
+```bash
+# Check camera permissions
+ls -l /dev/video0
+groups kidcam # Should include 'video' group
+
+# Capture test image
+v4l2-ctl --device=/dev/video0 --stream-mmap --stream-count=1 \
+ --stream-to=test.jpg
+```
+
+### Test Person Detection
+
+```bash
+cd /opt/kidcam
+source venv/bin/activate
+python3 -c "
+import Biz.Kidcam.Detector as Detector
+detector = Detector.PersonDetector('/dev/video0', 0.5)
+print('Detector initialized')
+"
+```
+
+### Test Telegram Notifications
+
+```bash
+cd /opt/kidcam
+source venv/bin/activate
+python3 -c "
+import os
+import Biz.Kidcam.Notifier as Notifier
+notifier = Notifier.TelegramNotifier(
+ os.getenv('TELEGRAM_BOT_TOKEN'),
+ os.getenv('TELEGRAM_CHAT_ID')
+)
+import asyncio
+asyncio.run(notifier.send_notification('Test message'))
+"
+```
+
+### Test RTSP Streaming
+
+```bash
+cd /opt/kidcam
+source venv/bin/activate
+python3 -c "
+import Biz.Kidcam.Streamer as Streamer
+streamer = Streamer.VideoStreamer('/dev/video0', 8554)
+print('Streamer initialized')
+"
+```
+
+View stream with VLC:
+```bash
+vlc rtsp://localhost:8554/stream
+```
+
+## Troubleshooting
+
+### Service Won't Start
+
+```bash
+# Check detailed error logs
+sudo journalctl -u kidcam -n 50 --no-pager
+
+# Verify configuration
+sudo cat /etc/kidcam/config.env
+
+# Test manual start
+cd /opt/kidcam
+sudo -u kidcam venv/bin/python3 -m Biz.Kidcam.Core
+```
+
+### Camera Not Detected
+
+```bash
+# List all video devices
+v4l2-ctl --list-devices
+
+# Check permissions
+ls -l /dev/video*
+groups kidcam # Must include 'video'
+
+# Verify camera works
+ffplay /dev/video0
+```
+
+### Import Errors
+
+```bash
+# Verify PYTHONPATH
+grep PYTHONPATH /etc/kidcam/config.env
+
+# Check Python path in virtual environment
+cd /opt/kidcam
+source venv/bin/activate
+python3 -c "import sys; print('\n'.join(sys.path))"
+
+# Reinstall packages
+sudo -u kidcam venv/bin/pip install --upgrade ultralytics python-telegram-bot
+```
+
+### Telegram Not Working
+
+```bash
+# Test bot token
+curl "https://api.telegram.org/bot<YOUR_TOKEN>/getMe"
+
+# Check network connectivity
+ping api.telegram.org
+
+# Verify environment variables are loaded
+sudo systemctl show kidcam -p Environment
+```
+
+### High CPU/Memory Usage
+
+```bash
+# Check resource usage
+top -u kidcam
+
+# Increase cooldown period
+sudo nano /etc/kidcam/config.env
+# Set COOLDOWN_MINUTES=10 or higher
+
+# Lower detection confidence
+# Set DETECTION_CONFIDENCE=0.7 (fewer detections)
+```
+
+### RTSP Stream Issues
+
+```bash
+# Check if port is open
+sudo netstat -tulpn | grep 8554
+
+# Test with ffplay
+ffplay rtsp://localhost:8554/stream
+
+# Check GStreamer plugins
+gst-inspect-1.0 | grep rtsp
+```
+
+## Architecture
+
+### State Machine
+
+Kidcam operates in three states:
+
+1. **IDLE**: Monitoring for person detection
+2. **ACTIVE**: Person detected, streaming and sending notifications
+3. **COOLDOWN**: Waiting before re-triggering
+
+### Components
+
+- **Core.py**: Main orchestration service
+- **Detector.py**: YOLO-based person detection
+- **Streamer.py**: GStreamer RTSP video streaming
+- **Notifier.py**: Telegram bot integration
+
+### Data Flow
+
+```
+Camera → Detector → [Person?] → Core → Streamer + Notifier
+ ↓
+ State Machine
+```
+
+## Performance Tuning
+
+### Jetson Nano Optimization
+
+```bash
+# Max performance mode
+sudo nvpmodel -m 0
+sudo jetson_clocks
+
+# Check power mode
+sudo nvpmodel -q
+```
+
+### Reduce Resource Usage
+
+Edit `/etc/kidcam/config.env`:
+```bash
+# Lower resolution in Streamer.py configuration
+# Increase detection interval
+COOLDOWN_MINUTES=15
+
+# Higher confidence = fewer false positives
+DETECTION_CONFIDENCE=0.7
+```
+
+## Development
+
+### Local Testing (Non-Jetson)
+
+```bash
+# Build with bild
+bild Biz/Kidcam/Core.py
+
+# Run directly
+python3 -m Biz.Kidcam.Core
+
+# Run with custom config
+CAMERA_DEVICE=/dev/video0 python3 -m Biz.Kidcam.Core
+```
+
+### Linting and Type Checking
+
+```bash
+# Lint
+lint Biz/Kidcam/*.py
+
+# Type check
+typecheck.sh Biz/Kidcam/Core.py
+```
+
+## Security Considerations
+
+- Service runs as unprivileged `kidcam` user
+- Config file has restricted permissions (640)
+- Network access limited to Telegram API and RTSP
+- systemd hardening applied (NoNewPrivileges, ProtectSystem, etc.)
+
+## License
+
+See repository root for license information.
+
+## Support
+
+For issues or questions:
+1. Check logs: `sudo journalctl -u kidcam -f`
+2. Review this troubleshooting guide
+3. File issue in repository
diff --git a/Biz/Kidcam/Streamer.py b/Biz/Kidcam/Streamer.py
new file mode 100755
index 0000000..4779546
--- /dev/null
+++ b/Biz/Kidcam/Streamer.py
@@ -0,0 +1,313 @@
+#!/usr/bin/env run.sh
+"""Video streaming module for Kidcam using GStreamer with HLS output."""
+
+# : out kidcam-streamer
+# : dep pytest
+# NOTE: pygobject (GStreamer) only available on Jetson, skip for typecheck
+
+import gi # type: ignore[import-not-found]
+import http.server as http_server
+import os
+import pathlib
+import shutil
+import socketserver
+import subprocess
+import sys
+import threading
+import typing
+
+gi.require_version("Gst", "1.0")
+
+import contextlib
+import gi.repository.Gst as Gst # type: ignore[import-not-found]
+
+
+class VideoStreamer:
+ """Video streamer using GStreamer for HLS output with hardware encoding."""
+
+ def __init__(
+ self,
+ device: str = "/dev/video0",
+ width: int = 1280,
+ height: int = 720,
+ port: int = 8554,
+ ) -> None:
+ """
+ Initialize video streamer.
+
+ Args:
+ device: Video device path
+ width: Video width in pixels
+ height: Video height in pixels
+ port: HTTP server port for HLS streaming
+ """
+ self.device = device
+ self.width = width
+ self.height = height
+ self.port = port
+ self.hls_dir = pathlib.Path("/tmp/hls")
+ self.pipeline: Gst.Element | None = None
+ self.http_server: socketserver.TCPServer | None = None
+ self.server_thread: threading.Thread | None = None
+ self._streaming = False
+
+ Gst.init(None)
+
+ def _get_tailscale_ip(self) -> str:
+ """Get the Tailscale IP address of this device."""
+ try:
+ result = subprocess.run(
+ ["tailscale", "ip", "-4"],
+ capture_output=True,
+ text=True,
+ check=True,
+ timeout=5,
+ )
+ return result.stdout.strip()
+ except (
+ subprocess.CalledProcessError,
+ subprocess.TimeoutExpired,
+ FileNotFoundError,
+ ):
+ return "localhost"
+
+ def _check_gstreamer_elements(self) -> tuple[bool, list[str]]:
+ """
+ Check if required GStreamer elements are available.
+
+ Returns:
+ Tuple of (all_available, missing_elements)
+ """
+ required_elements = [
+ "v4l2src",
+ "nvv4l2h264enc",
+ "h264parse",
+ "hlssink2",
+ ]
+ missing = []
+
+ for elem_name in required_elements:
+ factory = Gst.ElementFactory.find(elem_name)
+ if factory is None:
+ missing.append(elem_name)
+
+ return (len(missing) == 0, missing)
+
+ def _setup_hls_directory(self) -> None:
+ """Create and clean HLS output directory."""
+ if self.hls_dir.exists():
+ shutil.rmtree(self.hls_dir)
+ self.hls_dir.mkdir(parents=True, exist_ok=True)
+
+ def _start_http_server(self) -> None:
+ """Start HTTP server to serve HLS segments."""
+ os.chdir(self.hls_dir)
+
+ class QuietHandler(http_server.SimpleHTTPRequestHandler):
+ def log_message(self, format: str, *args: typing.Any) -> None:
+ pass
+
+ self.http_server = socketserver.TCPServer(("", self.port), QuietHandler)
+ self.server_thread = threading.Thread(
+ target=self.http_server.serve_forever,
+ daemon=True,
+ )
+ self.server_thread.start()
+
+ def _stop_http_server(self) -> None:
+ """Stop HTTP server."""
+ if self.http_server:
+ self.http_server.shutdown()
+ self.http_server.server_close()
+ self.http_server = None
+
+ if self.server_thread:
+ self.server_thread.join(timeout=2.0)
+ self.server_thread = None
+
+ def start_stream(self) -> str:
+ """
+ Start video streaming.
+
+ Returns:
+ Stream URL (HTTP endpoint for HLS playlist)
+
+ Raises:
+ RuntimeError: If stream is already running or GStreamer elements missing
+ """
+ if self._streaming:
+ msg = "Stream is already running"
+ raise RuntimeError(msg)
+
+ available, missing = self._check_gstreamer_elements()
+ if not available:
+ msg = (
+ f"Missing required GStreamer elements: {', '.join(missing)}. "
+ "Install gst-plugins-good, gst-plugins-bad, and Jetson multimedia packages."
+ )
+ raise RuntimeError(
+ msg
+ )
+
+ self._setup_hls_directory()
+
+ pipeline_str = (
+ f"v4l2src device={self.device} ! "
+ f"video/x-raw,width={self.width},height={self.height},framerate=30/1 ! "
+ f"nvv4l2h264enc bitrate=2000000 ! "
+ f"h264parse ! "
+ f"hlssink2 "
+ f"location={self.hls_dir}/segment%05d.ts "
+ f"playlist-location={self.hls_dir}/stream.m3u8 "
+ f"max-files=10 "
+ f"target-duration=2 "
+ f"playlist-length=5"
+ )
+
+ self.pipeline = Gst.parse_launch(pipeline_str)
+
+ if not self.pipeline:
+ msg = "Failed to create GStreamer pipeline"
+ raise RuntimeError(msg)
+
+ ret = self.pipeline.set_state(Gst.State.PLAYING)
+ if ret == Gst.StateChangeReturn.FAILURE:
+ self.pipeline.set_state(Gst.State.NULL)
+ self.pipeline = None
+ msg = "Failed to start GStreamer pipeline"
+ raise RuntimeError(msg)
+
+ self._start_http_server()
+ self._streaming = True
+
+ return self.get_stream_url()
+
+ def stop_stream(self) -> None:
+ """Stop video streaming and cleanup resources."""
+ if not self._streaming:
+ return
+
+ if self.pipeline:
+ self.pipeline.set_state(Gst.State.NULL)
+ self.pipeline = None
+
+ self._stop_http_server()
+
+ if self.hls_dir.exists():
+ with contextlib.suppress(OSError):
+ shutil.rmtree(self.hls_dir)
+
+ self._streaming = False
+
+ def is_streaming(self) -> bool:
+ """
+ Check if currently streaming.
+
+ Returns:
+ True if streaming, False otherwise
+ """
+ return self._streaming
+
+ def get_stream_url(self) -> str:
+ """
+ Get the HLS stream URL.
+
+ Returns:
+ HTTP URL for HLS playlist
+ """
+ ip = self._get_tailscale_ip()
+ return f"http://{ip}:{self.port}/stream.m3u8"
+
+
+def test() -> None:
+ """Basic tests for VideoStreamer."""
+ streamer = VideoStreamer(
+ device="/dev/video0", width=640, height=480, port=8555
+ )
+
+ assert streamer.device == "/dev/video0"
+ assert streamer.width == 640
+ assert streamer.height == 480
+ assert streamer.port == 8555
+ assert not streamer.is_streaming()
+
+ available, _missing = streamer._check_gstreamer_elements()
+ if not available:
+ pass
+
+ streamer.get_stream_url()
+
+
+def main() -> None:
+ """Test the video streamer."""
+ import argparse
+ import signal
+ import time
+
+ parser = argparse.ArgumentParser(
+ description="Video streaming with GStreamer HLS"
+ )
+ parser.add_argument(
+ "--device",
+ default="/dev/video0",
+ help="Video device path (default: /dev/video0)",
+ )
+ parser.add_argument(
+ "--width",
+ type=int,
+ default=1280,
+ help="Video width (default: 1280)",
+ )
+ parser.add_argument(
+ "--height",
+ type=int,
+ default=720,
+ help="Video height (default: 720)",
+ )
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=8554,
+ help="HTTP server port (default: 8554)",
+ )
+ parser.add_argument(
+ "--test",
+ action="store_true",
+ help="Run tests instead of starting stream",
+ )
+
+ args = parser.parse_args()
+
+ if args.test:
+ test()
+ return
+
+ streamer = VideoStreamer(
+ device=args.device,
+ width=args.width,
+ height=args.height,
+ port=args.port,
+ )
+
+ def signal_handler(sig: int, frame: typing.Any) -> None:
+ streamer.stop_stream()
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ try:
+ streamer.start_stream()
+
+ while streamer.is_streaming():
+ time.sleep(1)
+
+ except RuntimeError:
+ sys.exit(1)
+ except Exception:
+ streamer.stop_stream()
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/Biz/Kidcam/deploy.sh b/Biz/Kidcam/deploy.sh
new file mode 100755
index 0000000..a272482
--- /dev/null
+++ b/Biz/Kidcam/deploy.sh
@@ -0,0 +1,218 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+# Kidcam Deployment Script for Ubuntu 20.04 (Jetson Nano)
+
+readonly SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+readonly PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)"
+readonly KIDCAM_USER="kidcam"
+readonly INSTALL_DIR="/opt/kidcam"
+readonly CONFIG_DIR="/etc/kidcam"
+readonly VENV_DIR="${INSTALL_DIR}/venv"
+
+log_info() {
+ echo "[INFO] $*" >&2
+}
+
+log_error() {
+ echo "[ERROR] $*" >&2
+}
+
+log_success() {
+ echo "[SUCCESS] $*" >&2
+}
+
+check_system() {
+ log_info "Checking system requirements..."
+
+ if [[ "$(uname -s)" != "Linux" ]]; then
+ log_error "This script requires Linux"
+ exit 1
+ fi
+
+ if [[ "$(uname -m)" != "aarch64" ]]; then
+ log_error "This script is designed for aarch64 (ARM64) architecture"
+ log_error "Found: $(uname -m)"
+ exit 1
+ fi
+
+ if ! grep -q "Ubuntu" /etc/os-release; then
+ log_error "This script is designed for Ubuntu"
+ log_error "Found: $(cat /etc/os-release | grep PRETTY_NAME)"
+ exit 1
+ fi
+
+ if [[ $EUID -ne 0 ]]; then
+ log_error "This script must be run as root (use sudo)"
+ exit 1
+ fi
+
+ log_success "System requirements verified"
+}
+
+create_user() {
+ if id "${KIDCAM_USER}" &>/dev/null; then
+ log_info "User '${KIDCAM_USER}' already exists"
+ else
+ log_info "Creating user '${KIDCAM_USER}'..."
+ useradd --system --no-create-home --shell /usr/sbin/nologin \
+ --comment "Kidcam service account" "${KIDCAM_USER}"
+ usermod -aG video "${KIDCAM_USER}"
+ log_success "User '${KIDCAM_USER}' created"
+ fi
+}
+
+install_system_dependencies() {
+ log_info "Installing system dependencies..."
+
+ apt-get update
+ apt-get install -y \
+ python3 \
+ python3-pip \
+ python3-venv \
+ python3-dev \
+ libopencv-dev \
+ python3-opencv \
+ gstreamer1.0-tools \
+ gstreamer1.0-plugins-base \
+ gstreamer1.0-plugins-good \
+ gstreamer1.0-plugins-bad \
+ gstreamer1.0-plugins-ugly \
+ gstreamer1.0-libav \
+ libgstreamer1.0-dev \
+ libgstreamer-plugins-base1.0-dev \
+ v4l-utils \
+ build-essential \
+ cmake \
+ pkg-config
+
+ log_success "System dependencies installed"
+}
+
+setup_directories() {
+ log_info "Setting up directories..."
+
+ mkdir -p "${INSTALL_DIR}"
+ mkdir -p "${CONFIG_DIR}"
+ mkdir -p "${INSTALL_DIR}/_/tmp"
+
+ log_info "Copying project files..."
+ rsync -av --exclude='_/' --exclude='.git' --exclude='*.pyc' \
+ "${PROJECT_ROOT}/" "${INSTALL_DIR}/"
+
+ chown -R "${KIDCAM_USER}:${KIDCAM_USER}" "${INSTALL_DIR}"
+ chown -R "${KIDCAM_USER}:${KIDCAM_USER}" "${CONFIG_DIR}"
+ chmod 755 "${INSTALL_DIR}"
+ chmod 750 "${CONFIG_DIR}"
+
+ log_success "Directories configured"
+}
+
+setup_python_environment() {
+ log_info "Setting up Python virtual environment..."
+
+ if [[ ! -d "${VENV_DIR}" ]]; then
+ sudo -u "${KIDCAM_USER}" python3 -m venv "${VENV_DIR}"
+ log_success "Virtual environment created"
+ else
+ log_info "Virtual environment already exists"
+ fi
+
+ log_info "Installing Python packages..."
+ sudo -u "${KIDCAM_USER}" "${VENV_DIR}/bin/pip" install --upgrade pip wheel setuptools
+
+ # Install packages with careful dependency management
+ sudo -u "${KIDCAM_USER}" "${VENV_DIR}/bin/pip" install \
+ ultralytics \
+ python-telegram-bot \
+ opencv-python \
+ torch torchvision --index-url https://download.pytorch.org/whl/cpu \
+ numpy \
+ pillow
+
+ log_success "Python packages installed"
+}
+
+create_config() {
+ local config_file="${CONFIG_DIR}/config.env"
+
+ if [[ -f "${config_file}" ]]; then
+ log_info "Config file already exists at ${config_file}"
+ log_info "Skipping config creation. Review existing configuration."
+ else
+ log_info "Creating example configuration..."
+ cat > "${config_file}" <<'EOF'
+# Kidcam Configuration
+# Copy this file to /etc/kidcam/config.env and edit with your values
+
+# Camera device (default: /dev/video0)
+CAMERA_DEVICE=/dev/video0
+
+# RTSP stream port (default: 8554)
+STREAM_PORT=8554
+
+# Person detection confidence threshold (0.0-1.0, default: 0.5)
+DETECTION_CONFIDENCE=0.5
+
+# Cooldown period in minutes after detection (default: 5)
+COOLDOWN_MINUTES=5
+
+# Telegram Bot Configuration
+# Get your bot token from @BotFather on Telegram
+TELEGRAM_BOT_TOKEN=your-bot-token-here
+
+# Get your chat ID by messaging your bot and checking:
+# https://api.telegram.org/bot<YOUR_BOT_TOKEN>/getUpdates
+TELEGRAM_CHAT_ID=your-chat-id-here
+
+# Python path for imports
+PYTHONPATH=/opt/kidcam
+EOF
+
+ chown "${KIDCAM_USER}:${KIDCAM_USER}" "${config_file}"
+ chmod 640 "${config_file}"
+
+ log_success "Example config created at ${config_file}"
+ log_info "IMPORTANT: Edit ${config_file} with your Telegram credentials"
+ fi
+}
+
+install_systemd_service() {
+ log_info "Installing systemd service..."
+
+ cp "${SCRIPT_DIR}/kidcam.service" /etc/systemd/system/
+ chmod 644 /etc/systemd/system/kidcam.service
+
+ # Update ExecStart to use virtualenv python
+ sed -i "s|ExecStart=/usr/bin/python3|ExecStart=${VENV_DIR}/bin/python3|" \
+ /etc/systemd/system/kidcam.service
+
+ systemctl daemon-reload
+ systemctl enable kidcam.service
+
+ log_success "Systemd service installed and enabled"
+}
+
+main() {
+ log_info "Starting Kidcam deployment..."
+
+ check_system
+ create_user
+ install_system_dependencies
+ setup_directories
+ setup_python_environment
+ create_config
+ install_systemd_service
+
+ log_success "Deployment complete!"
+ echo ""
+ log_info "Next steps:"
+ echo " 1. Edit /etc/kidcam/config.env with your Telegram credentials"
+ echo " 2. Test camera access: v4l2-ctl --list-devices"
+ echo " 3. Start service: sudo systemctl start kidcam"
+ echo " 4. Check logs: sudo journalctl -u kidcam -f"
+ echo ""
+ log_info "For more information, see ${INSTALL_DIR}/Biz/Kidcam/README.md"
+}
+
+main "$@"
diff --git a/Biz/Kidcam/kidcam.service b/Biz/Kidcam/kidcam.service
new file mode 100644
index 0000000..4f65775
--- /dev/null
+++ b/Biz/Kidcam/kidcam.service
@@ -0,0 +1,31 @@
+[Unit]
+Description=Kidcam - Motion-activated family video streaming
+Documentation=file:///opt/kidcam/Biz/Kidcam/README.md
+After=network-online.target
+Requires=network-online.target
+
+[Service]
+Type=simple
+User=kidcam
+Group=kidcam
+WorkingDirectory=/opt/kidcam
+EnvironmentFile=/etc/kidcam/config.env
+ExecStart=/usr/bin/python3 -m Biz.Kidcam.Core
+Restart=on-failure
+RestartSec=10
+StandardOutput=journal
+StandardError=journal
+SyslogIdentifier=kidcam
+
+# Security hardening
+NoNewPrivileges=true
+PrivateTmp=true
+ProtectSystem=strict
+ProtectHome=true
+ReadWritePaths=/opt/kidcam/_
+DeviceAllow=/dev/video0 rw
+DeviceAllow=/dev/video1 rw
+SupplementaryGroups=video
+
+[Install]
+WantedBy=multi-user.target