#!/usr/bin/env run.sh """Main orchestration service for Kidcam.""" # : out kidcam # : dep pytest # : dep pytest-asyncio import asyncio import enum import logging import Omni.Log as Log # type: ignore import os import pathlib import signal import sys import time import typing logger = logging.getLogger(__name__) class State(enum.Enum): """State machine states for Kidcam service.""" IDLE = "idle" ACTIVE = "active" COOLDOWN = "cooldown" class Config(typing.TypedDict): """Configuration for Kidcam service.""" camera_device: str stream_port: int detection_confidence: float cooldown_minutes: int telegram_bot_token: str telegram_chat_id: str def load_config(config_path: str | None = None) -> Config: """Load configuration from file or environment variables.""" if config_path and pathlib.Path(config_path).exists(): import json with pathlib.Path(config_path).open(encoding="utf-8") as f: data = json.load(f) return typing.cast("Config", data) return { "camera_device": os.getenv("CAMERA_DEVICE", "/dev/video0"), "stream_port": int(os.getenv("STREAM_PORT", "8554")), "detection_confidence": float(os.getenv("DETECTION_CONFIDENCE", "0.5")), "cooldown_minutes": int(os.getenv("COOLDOWN_MINUTES", "5")), "telegram_bot_token": os.getenv("TELEGRAM_BOT_TOKEN", ""), "telegram_chat_id": os.getenv("TELEGRAM_CHAT_ID", ""), } class KidcamService: """Main orchestration service for Kidcam.""" def __init__( self: "KidcamService", config_path: str | None = None ) -> None: """Initialize the Kidcam service with configuration.""" self.config = load_config(config_path) self.state = State.IDLE self.last_detection_time = 0.0 self.running = False self.shutdown_event = asyncio.Event() self.detector: typing.Any | None = None self.streamer: typing.Any | None = None self.notifier: typing.Any | None = None logger.info("Kidcam service initialized") logger.debug(f"Config: {self.config}") async def _setup_components(self: "KidcamService") -> None: """Initialize detector, streamer, and notifier components.""" try: import Biz.Kidcam.Detector as Detector import Biz.Kidcam.Notifier as Notifier import Biz.Kidcam.Streamer as Streamer self.detector = Detector.PersonDetector( model_path="yolov8n.pt", confidence_threshold=self.config["detection_confidence"], ) self.streamer = Streamer.VideoStreamer( device=self.config["camera_device"], port=self.config["stream_port"], ) self.notifier = Notifier.TelegramNotifier( bot_token=self.config["telegram_bot_token"], chat_id=self.config["telegram_chat_id"], ) logger.info("All components initialized successfully") except ImportError as e: logger.warning("Component not available: %s", e) logger.warning("Running in stub mode") async def _transition_to_idle(self: "KidcamService") -> None: """Transition to IDLE state.""" logger.info("State: IDLE - Monitoring for person detection") self.state = State.IDLE async def _transition_to_active(self: "KidcamService") -> None: """Transition to ACTIVE state.""" logger.info("State: ACTIVE - Person detected, starting stream") self.state = State.ACTIVE self.last_detection_time = time.time() if self.streamer: try: await self.streamer.start() logger.info( f"Stream started on port {self.config['stream_port']}" ) except Exception as e: logger.exception("Failed to start stream: %s", e) if self.notifier: try: stream_url = ( f"rtsp://kidcam:{self.config['stream_port']}/stream" ) await self.notifier.send_notification( f"👶 Person detected! Stream available at {stream_url}" ) logger.info("Notification sent") except Exception as e: logger.exception("Failed to send notification: %s", e) async def _transition_to_cooldown(self: "KidcamService") -> None: """Transition to COOLDOWN state.""" logger.info( f"State: COOLDOWN - {self.config['cooldown_minutes']} minute cooldown started" ) self.state = State.COOLDOWN if self.streamer: try: await self.streamer.stop() logger.info("Stream stopped") except Exception as e: logger.exception("Failed to stop stream: %s", e) async def _check_cooldown_expired(self: "KidcamService") -> bool: """Check if cooldown period has expired.""" if self.state != State.COOLDOWN: return False elapsed_minutes = (time.time() - self.last_detection_time) / 60 if elapsed_minutes >= self.config["cooldown_minutes"]: logger.info("Cooldown period expired") return True return False async def _detect_person(self: "KidcamService") -> bool: """Check if a person is detected by the detector.""" if not self.detector: return False try: return await self.detector.detect() except Exception as e: logger.exception("Detection error: %s", e) return False async def run(self: "KidcamService") -> None: """Main event loop for the Kidcam service.""" self.running = True await self._setup_components() await self._transition_to_idle() logger.info("Starting main event loop") try: while self.running: if await self.shutdown_event.wait(): break if self.state == State.IDLE: if await self._detect_person(): await self._transition_to_active() await asyncio.sleep(1.0) elif self.state == State.ACTIVE: if not await self._detect_person(): await self._transition_to_cooldown() await asyncio.sleep(1.0) elif self.state == State.COOLDOWN: if await self._check_cooldown_expired(): await self._transition_to_idle() await asyncio.sleep(10.0) except Exception as e: logger.exception("Error in main loop: %s", e) raise finally: await self.shutdown() async def shutdown(self: "KidcamService") -> None: """Cleanup and shutdown the service gracefully.""" logger.info("Shutting down Kidcam service") self.running = False if self.streamer: try: await self.streamer.stop() except Exception as e: logger.exception("Error stopping streamer: %s", e) if self.detector: try: await self.detector.close() except Exception as e: logger.exception("Error closing detector: %s", e) logger.info("Shutdown complete") def _setup_signal_handlers(service: KidcamService) -> None: """Setup graceful shutdown on SIGTERM and SIGINT.""" def signal_handler(sig: int, frame: typing.Any) -> None: logger.info("Received signal %s, initiating shutdown", sig) service.shutdown_event.set() signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) async def main() -> None: """Entrypoint to start the Kidcam service.""" _ = Log.setup(logger, level=logging.INFO) config_path = sys.argv[1] if len(sys.argv) > 1 else None service = KidcamService(config_path=config_path) _setup_signal_handlers(service) logger.info("Starting Kidcam service") await service.run() def test() -> None: """Test the Kidcam service.""" _ = Log.setup(logger, level=logging.DEBUG) logger.info("Testing configuration loading") config = load_config() assert config["camera_device"] == "/dev/video0" assert config["stream_port"] == 8554 assert config["detection_confidence"] == 0.5 assert config["cooldown_minutes"] == 5 logger.info("Testing state transitions") service = KidcamService() assert service.state == State.IDLE logger.info("Testing shutdown") asyncio.run(service.shutdown()) logger.info("All tests passed") if __name__ == "__main__": asyncio.run(main())