From 27d2e3b42d290e72f8ee5735fcd5c73dcaed4517 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Fri, 26 Dec 2025 13:34:32 -0500 Subject: feat(kidcam): complete implementation - Detector.py: YOLOv8-nano person detection - Streamer.py: GStreamer HLS video streaming - Notifier.py: Telegram bot notifications - Core.py: State machine orchestration - deploy.sh: Ubuntu deployment script - kidcam.service: systemd unit - Documentation (README, project overview) Includes tests, type hints, follows repo conventions. Fixed Worker.hs missing engineOnToolTrace (jr now builds). Added Python deps: opencv, ultralytics, python-telegram-bot. Amp-Thread-ID: https://ampcode.com/threads/T-019b5bc1-b00a-701f-ab4f-04738e8a733c Co-authored-by: Amp --- Biz/Kidcam/Notifier.py | 144 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100755 Biz/Kidcam/Notifier.py (limited to 'Biz/Kidcam/Notifier.py') diff --git a/Biz/Kidcam/Notifier.py b/Biz/Kidcam/Notifier.py new file mode 100755 index 0000000..50d5d8a --- /dev/null +++ b/Biz/Kidcam/Notifier.py @@ -0,0 +1,144 @@ +#!/usr/bin/env run.sh +"""Telegram notification module for Kidcam.""" + +# : out kidcam-notifier +# : dep python-telegram-bot +# : dep pytest +# : dep pytest-asyncio +import asyncio +import datetime as dt +import logging +import Omni.Test as Test +import os +import pytest +import typing +import unittest.mock as mock + + +class TelegramNotifier: + def __init__(self, bot_token: str, chat_id: str) -> None: + self.bot_token = bot_token + self.chat_id = chat_id + self.logger = logging.getLogger(__name__) + + async def send_notification( + self, stream_url: str, message: str = "Kids are playing!" + ) -> bool: + try: + import telegram as tg # type: ignore[import-not-found,unused-ignore] + + bot = tg.Bot(token=self.bot_token) + timestamp = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + full_message = ( + f"{message}\n\nšŸŽ„ Watch: {stream_url}\nšŸ“… {timestamp}" + ) + + await bot.send_message(chat_id=self.chat_id, text=full_message) + self.logger.info("Notification sent: %s", message) + return True + + except Exception as e: + self.logger.exception("Failed to send notification: %s", e) + return False + + async def send_stream_ended(self) -> bool: + timestamp = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + message = f"Stream ended\nšŸ“… {timestamp}" + + try: + import telegram as tg # type: ignore[import-not-found,unused-ignore] + + bot = tg.Bot(token=self.bot_token) + await bot.send_message(chat_id=self.chat_id, text=message) + self.logger.info("Stream ended notification sent") + return True + + except Exception as e: + self.logger.exception("Failed to send stream ended notification: %s", e) + return False + + +def from_env() -> TelegramNotifier: + bot_token = os.getenv("TELEGRAM_BOT_TOKEN") + chat_id = os.getenv("TELEGRAM_CHAT_ID") + + if not bot_token: + msg = "TELEGRAM_BOT_TOKEN environment variable not set" + raise ValueError(msg) + if not chat_id: + msg = "TELEGRAM_CHAT_ID environment variable not set" + raise ValueError(msg) + + return TelegramNotifier(bot_token=bot_token, chat_id=chat_id) + + +async def main() -> None: + notifier = from_env() + stream_url = "https://example.com/stream/123" + await notifier.send_notification(stream_url) + + +class TestTelegramNotifier(Test.TestCase): + @mock.patch.dict( + os.environ, + {"TELEGRAM_BOT_TOKEN": "test_token", "TELEGRAM_CHAT_ID": "test_chat"}, + ) + def test_from_env(self) -> None: + notifier = from_env() + self.assertEqual(notifier.bot_token, "test_token") + self.assertEqual(notifier.chat_id, "test_chat") + + def test_from_env_missing_token(self) -> None: + with pytest.raises(ValueError): + from_env() + + @mock.patch("telegram.Bot") + async def test_send_notification(self, mock_bot_class: typing.Any) -> None: + mock_bot = mock.AsyncMock() + mock_bot_class.return_value = mock_bot + + notifier = TelegramNotifier(bot_token="test_token", chat_id="test_chat") + result = await notifier.send_notification("https://example.com/stream") + + self.assertTrue(result) + mock_bot.send_message.assert_called_once() + call_args = mock_bot.send_message.call_args + self.assertEqual(call_args.kwargs["chat_id"], "test_chat") + self.assertIn("Kids are playing!", call_args.kwargs["text"]) + self.assertIn("https://example.com/stream", call_args.kwargs["text"]) + + @mock.patch("telegram.Bot") + async def test_send_stream_ended(self, mock_bot_class: typing.Any) -> None: + mock_bot = mock.AsyncMock() + mock_bot_class.return_value = mock_bot + + notifier = TelegramNotifier(bot_token="test_token", chat_id="test_chat") + result = await notifier.send_stream_ended() + + self.assertTrue(result) + mock_bot.send_message.assert_called_once() + call_args = mock_bot.send_message.call_args + self.assertIn("Stream ended", call_args.kwargs["text"]) + + @mock.patch("telegram.Bot") + async def test_send_notification_error( + self, mock_bot_class: typing.Any + ) -> None: + mock_bot = mock.AsyncMock() + mock_bot.send_message.side_effect = Exception("Network error") + mock_bot_class.return_value = mock_bot + + notifier = TelegramNotifier(bot_token="test_token", chat_id="test_chat") + result = await notifier.send_notification("https://example.com/stream") + + self.assertFalse(result) + + +def test() -> None: + import Omni.App as App + + Test.run(App.Area.Test, [TestTelegramNotifier]) + + +if __name__ == "__main__": + asyncio.run(main()) -- cgit v1.2.3