From 0eca0c477b15e0412497ca21847bd969e5e73fc2 Mon Sep 17 00:00:00 2001
From: Ben Sima <ben@bsima.me>
Date: Sun, 29 Mar 2020 20:19:29 -0700
Subject: Add polling and streaming to Que

---
 Com/Simatime/Que.hs | 56 ++++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 38 insertions(+), 18 deletions(-)

(limited to 'Com')

diff --git a/Com/Simatime/Que.hs b/Com/Simatime/Que.hs
index 58a41f9..2b56575 100644
--- a/Com/Simatime/Que.hs
+++ b/Com/Simatime/Que.hs
@@ -13,18 +13,23 @@ import           Com.Simatime.Alpha      hiding ( Text
                                                 , get
                                                 , gets
                                                 , modify
+                                                , poll
                                                 )
 import qualified Com.Simatime.Go               as Go
 import qualified Control.Concurrent.STM        as STM
 import qualified Control.Exception             as Exception
-import           GHC.Base                       ( String )
 import           Control.Monad.Reader           ( MonadTrans )
+import qualified Data.ByteString.Builder.Extra as Builder
+import qualified Data.ByteString.Lazy          as BSL
 import           Data.HashMap.Lazy              ( HashMap )
 import qualified Data.HashMap.Lazy             as HashMap
-import           Data.Text.Lazy                 ( Text )
 import qualified Data.Text.Lazy                as Text
-import qualified Data.Text.Lazy.Encoding       as Encoding
-import           Network.Wai                    ( Application )
+import           Data.Text.Lazy                 ( Text
+                                                , fromStrict
+                                                )
+import qualified Data.Text.Encoding            as Encoding
+import           GHC.Base                       ( String )
+import qualified Network.Wai                   as Wai
 import qualified Network.Wai.Handler.Warp      as Warp
 import           Network.Wai.Middleware.RequestLogger
                                                 ( logStdoutDev )
@@ -33,10 +38,10 @@ import qualified Web.Scotty.Trans              as Scotty
 main :: IO ()
 main = Exception.bracket startup shutdown run
  where
-  run :: Application -> IO ()
+  run :: Wai.Application -> IO ()
   run waiapp = Warp.run 8081 waiapp
   -- | TODO: startup/shutdown ekg server, katip scribes
-  startup :: IO Application
+  startup :: IO Wai.Application
   startup = do
     sync <- STM.newTVarIO <| AppState { ques = HashMap.empty }
     let runActionToIO m = runReaderT (runApp m) sync
@@ -48,14 +53,20 @@ routes :: Scotty.ScottyT Text App ()
 routes = do
   Scotty.middleware logStdoutDev
 
-  -- | Receive a value from a que. Blocks until a value is received.
+  -- | Receive a value from a que. Blocks until a value is received,
+  -- then returns. If 'poll=true', then stream data from the Que to the
+  -- client.
   Scotty.get (Scotty.regex quepath) <| do
     (ns, qp) <- extract
     -- ensure namespace exists
     app . modify <| upsertNamespace ns
-    q <- app <| que ns qp
-    r <- liftIO <| takeQue q
-    Scotty.text r
+    q    <- app <| que ns qp
+    poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text
+    case poll of
+      "true" -> Scotty.stream $ streamQue q
+      _      -> do
+        r <- liftIO <| takeQue q
+        Scotty.text <| fromStrict <| Encoding.decodeUtf8 r
 
   -- | Put a value on a que. Returns immediately.
   Scotty.post (Scotty.regex quepath) <| do
@@ -64,9 +75,19 @@ routes = do
     -- ensure namespace exists
     app . modify <| upsertNamespace ns
     q <- app <| que ns qp
-    liftIO <| pushQue (Encoding.decodeUtf8 qdata) q
+    liftIO <| pushQue (BSL.toStrict qdata) q
     return ()
 
+-- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
+streamQue :: Que -> Wai.StreamingBody
+streamQue q write _ = Go.mult q >>= loop
+ where
+  loop c =
+    Go.tap c
+      >>= (write . Builder.byteStringInsert)
+      >>  (write <| Builder.byteStringInsert "\n")
+      >>  loop c
+
 -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
 grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
 grab = flip (HashMap.!)
@@ -77,6 +98,7 @@ upsertNamespace ns as = if HashMap.member ns (ques as)
   then as
   else as { ques = HashMap.insert ns mempty (ques as) }
 
+-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
 insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
 insertQue ns qp q as = as { ques = newQues }
  where
@@ -90,7 +112,8 @@ extract :: Scotty.ActionT Text App (Namespace, Quepath)
 extract = do
   ns   <- Scotty.param "0"
   path <- Scotty.param "1"
-  return (ns, Text.split (== '/') path)
+  let p = Text.split (== '/') path |> filter (not . Text.null)
+  return (ns, p)
 
 newtype App a = App
   { runApp :: ReaderT (STM.TVar AppState) IO a
@@ -115,16 +138,13 @@ gets f = ask >>= liftIO . STM.readTVarIO >>= return . f
 modify :: (AppState -> AppState) -> App ()
 modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f
 
--- * functionality
-
 type Namespace = Text -- ^ housing for a set of que paths
---type Que = Go.Channel Quedata -- ^ a que is just a channel of json
-type Que = Go.Channel Quedata -- ^ a que is just a channel of json
+type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes
 type Quepath = [Text] -- ^ any path can serve as an identifier for a que
-type Quedata = Text -- ^ any opaque data
+type Quedata = ByteString -- ^ any opaque data
 type Quebase = HashMap Quepath Que -- ^ a collection of ques
 
--- | lookup or create a que
+-- | Lookup or create a que
 que :: Namespace -> Quepath -> App Que
 que ns qp = do
   _ques <- gets ques
-- 
cgit v1.2.3