diff options
Diffstat (limited to 'Omni/Task/Core.hs')
| -rw-r--r-- | Omni/Task/Core.hs | 74 |
1 files changed, 60 insertions, 14 deletions
diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs index 3a71900..ffecd60 100644 --- a/Omni/Task/Core.hs +++ b/Omni/Task/Core.hs @@ -93,7 +93,12 @@ data TaskActivity = TaskActivity activityTimestamp :: UTCTime, activityStage :: ActivityStage, activityMessage :: Maybe Text, - activityMetadata :: Maybe Text -- JSON for extra data + activityMetadata :: Maybe Text, -- JSON for extra data + activityAmpThreadUrl :: Maybe Text, -- Link to amp thread + activityStartedAt :: Maybe UTCTime, -- When work started + activityCompletedAt :: Maybe UTCTime, -- When work completed + activityCostCents :: Maybe Int, -- API cost in cents + activityTokensUsed :: Maybe Int -- Total tokens used } deriving (Show, Eq, Generic) @@ -241,6 +246,11 @@ instance SQL.FromRow TaskActivity where <*> SQL.field <*> SQL.field <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field instance SQL.ToRow TaskActivity where toRow a = @@ -249,7 +259,12 @@ instance SQL.ToRow TaskActivity where SQL.toField (activityTimestamp a), SQL.toField (activityStage a), SQL.toField (activityMessage a), - SQL.toField (activityMetadata a) + SQL.toField (activityMetadata a), + SQL.toField (activityAmpThreadUrl a), + SQL.toField (activityStartedAt a), + SQL.toField (activityCompletedAt a), + SQL.toField (activityCostCents a), + SQL.toField (activityTokensUsed a) ] -- | Case-insensitive ID comparison @@ -352,6 +367,11 @@ initTaskDb = do \ stage TEXT NOT NULL, \ \ message TEXT, \ \ metadata TEXT, \ + \ amp_thread_url TEXT, \ + \ started_at DATETIME, \ + \ completed_at DATETIME, \ + \ cost_cents INTEGER, \ + \ tokens_used INTEGER, \ \ FOREIGN KEY (task_id) REFERENCES tasks(id) \ \)" @@ -1011,21 +1031,47 @@ logActivity tid stage metadata = "INSERT INTO task_activity (task_id, stage, message, metadata) VALUES (?, ?, ?, ?)" (tid, show stage :: String, Nothing :: Maybe Text, metadata) +-- | Log activity with worker metrics (amp thread URL, timing, cost) +logActivityWithMetrics :: Text -> ActivityStage -> Maybe Text -> Maybe Text -> Maybe UTCTime -> Maybe UTCTime -> Maybe Int -> Maybe Int -> IO Int +logActivityWithMetrics tid stage metadata ampUrl startedAt completedAt costCents tokens = + withDb <| \conn -> do + SQL.execute + conn + "INSERT INTO task_activity (task_id, stage, message, metadata, amp_thread_url, started_at, completed_at, cost_cents, tokens_used) \ + \VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + (tid, show stage :: String, Nothing :: Maybe Text, metadata, ampUrl, startedAt, completedAt, costCents, tokens) + [SQL.Only actId] <- SQL.query_ conn "SELECT last_insert_rowid()" :: IO [SQL.Only Int] + pure actId + +-- | Update an existing activity record with metrics +updateActivityMetrics :: Int -> Maybe Text -> Maybe UTCTime -> Maybe Int -> Maybe Int -> IO () +updateActivityMetrics actId ampUrl completedAt costCents tokens = + withDb <| \conn -> + SQL.execute + conn + "UPDATE task_activity SET amp_thread_url = COALESCE(?, amp_thread_url), \ + \completed_at = COALESCE(?, completed_at), \ + \cost_cents = COALESCE(?, cost_cents), \ + \tokens_used = COALESCE(?, tokens_used) \ + \WHERE id = ?" + (ampUrl, completedAt, costCents, tokens, actId) + -- | Get all activities for a task, ordered by timestamp descending getActivitiesForTask :: Text -> IO [TaskActivity] getActivitiesForTask tid = - withDb <| \conn -> do - rows <- - SQL.query - conn - "SELECT id, task_id, timestamp, stage, message, metadata \ - \FROM task_activity WHERE task_id = ? ORDER BY timestamp DESC" - (SQL.Only tid) :: - IO [(Int, Text, UTCTime, Text, Maybe Text, Maybe Text)] - pure [TaskActivity (Just i) taskId ts (readStage stg) msg meta | (i, taskId, ts, stg, msg, meta) <- rows] - where - readStage :: Text -> ActivityStage - readStage s = fromMaybe Claiming (readMaybe (T.unpack s)) + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, timestamp, stage, message, metadata, \ + \amp_thread_url, started_at, completed_at, cost_cents, tokens_used \ + \FROM task_activity WHERE task_id = ? ORDER BY timestamp DESC" + (SQL.Only tid) + +-- | Get the most recent running activity for a task (for metrics display) +getLatestRunningActivity :: Text -> IO (Maybe TaskActivity) +getLatestRunningActivity tid = do + activities <- getActivitiesForTask tid + pure <| List.find (\a -> activityStage a == Running) activities -- | Get tasks with unmet blocking dependencies (not ready, not done) getBlockedTasks :: IO [Task] |
