From 8a95a5838b992f51611d89971cf6ad8cabe68970 Mon Sep 17 00:00:00 2001
From: Ben Sima <ben@bsima.me>
Date: Mon, 30 Mar 2020 07:28:42 -0700
Subject: Rename Com.Simatime.Que to Run.Que

Now that I have the domain name que.run! Aw yeah.
---
 Run/Que.hs | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 167 insertions(+)
 create mode 100644 Run/Que.hs

(limited to 'Run')

diff --git a/Run/Que.hs b/Run/Que.hs
new file mode 100644
index 0000000..f1d0e28
--- /dev/null
+++ b/Run/Que.hs
@@ -0,0 +1,167 @@
+{-# LANGUAGE NoImplicitPrelude #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+{- | Interprocess communication
+-}
+module Run.Que
+  ( main
+  )
+where
+
+import           Com.Simatime.Alpha      hiding ( Text
+                                                , get
+                                                , gets
+                                                , modify
+                                                , poll
+                                                )
+import qualified Com.Simatime.Go               as Go
+import qualified Control.Concurrent.STM        as STM
+import qualified Control.Exception             as Exception
+import           Control.Monad.Reader           ( MonadTrans )
+import qualified Data.ByteString.Builder.Extra as Builder
+import qualified Data.ByteString.Lazy          as BSL
+import           Data.HashMap.Lazy              ( HashMap )
+import qualified Data.HashMap.Lazy             as HashMap
+import qualified Data.Text.Lazy                as Text
+import           Data.Text.Lazy                 ( Text
+                                                , fromStrict
+                                                )
+import qualified Data.Text.Encoding            as Encoding
+import           GHC.Base                       ( String )
+import qualified Network.Wai                   as Wai
+import qualified Network.Wai.Handler.Warp      as Warp
+import           Network.Wai.Middleware.RequestLogger
+                                                ( logStdoutDev )
+import qualified Web.Scotty.Trans              as Scotty
+
+main :: IO ()
+main = Exception.bracket startup shutdown run
+ where
+  run :: Wai.Application -> IO ()
+  run waiapp = Warp.run 8081 waiapp
+  -- | TODO: startup/shutdown ekg server, katip scribes
+  startup :: IO Wai.Application
+  startup = do
+    sync <- STM.newTVarIO <| AppState { ques = HashMap.empty }
+    let runActionToIO m = runReaderT (runApp m) sync
+    Scotty.scottyAppT runActionToIO routes
+  shutdown :: a -> IO a
+  shutdown = pure . identity
+
+routes :: Scotty.ScottyT Text App ()
+routes = do
+  Scotty.middleware logStdoutDev
+
+  -- | Receive a value from a que. Blocks until a value is received,
+  -- then returns. If 'poll=true', then stream data from the Que to the
+  -- client.
+  Scotty.get (Scotty.regex quepath) <| do
+    (ns, qp) <- extract
+    -- ensure namespace exists
+    app . modify <| upsertNamespace ns
+    q    <- app <| que ns qp
+    poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text
+    case poll of
+      "true" -> Scotty.stream $ streamQue q
+      _      -> do
+        r <- liftIO <| takeQue q
+        Scotty.text <| fromStrict <| Encoding.decodeUtf8 r
+
+  -- | Put a value on a que. Returns immediately.
+  Scotty.post (Scotty.regex quepath) <| do
+    (ns, qp) <- extract
+    qdata    <- Scotty.body
+    -- ensure namespace exists
+    app . modify <| upsertNamespace ns
+    q <- app <| que ns qp
+    liftIO <| pushQue (BSL.toStrict qdata) q
+    return ()
+
+-- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
+streamQue :: Que -> Wai.StreamingBody
+streamQue q write _ = Go.mult q >>= loop
+ where
+  loop c =
+    Go.tap c
+      >>= (write . Builder.byteStringInsert)
+      >>  (write <| Builder.byteStringInsert "\n")
+      >>  loop c
+
+-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
+grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
+grab = flip (HashMap.!)
+
+-- | Inserts the namespace in 'AppState' if it doesn't exist.
+upsertNamespace :: Namespace -> AppState -> AppState
+upsertNamespace ns as = if HashMap.member ns (ques as)
+  then as
+  else as { ques = HashMap.insert ns mempty (ques as) }
+
+-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
+insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
+insertQue ns qp q as = as { ques = newQues }
+ where
+  newQues  = HashMap.insert ns newQbase (ques as)
+  newQbase = HashMap.insert qp q <| grab ns <| ques as
+
+quepath :: GHC.Base.String
+quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$"
+
+extract :: Scotty.ActionT Text App (Namespace, Quepath)
+extract = do
+  ns   <- Scotty.param "0"
+  path <- Scotty.param "1"
+  let p = Text.split (== '/') path |> filter (not . Text.null)
+  return (ns, p)
+
+newtype App a = App
+  { runApp :: ReaderT (STM.TVar AppState) IO a
+  }
+  deriving (Applicative, Functor, Monad, MonadIO, MonadReader
+    (STM.TVar AppState))
+
+data AppState = AppState
+  { ques :: HashMap Namespace Quebase
+  }
+
+-- | A synonym for 'lift' in order to be explicit about when we are
+-- operating at the 'App' layer.
+app :: MonadTrans t => App a -> t App a
+app = lift
+
+-- | Get something from the app state
+gets :: (AppState -> b) -> App b
+gets f = ask >>= liftIO . STM.readTVarIO >>= return . f
+
+-- | Apply a function to the app state
+modify :: (AppState -> AppState) -> App ()
+modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f
+
+type Namespace = Text -- ^ housing for a set of que paths
+type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes
+type Quepath = [Text] -- ^ any path can serve as an identifier for a que
+type Quedata = ByteString -- ^ any opaque data
+type Quebase = HashMap Quepath Que -- ^ a collection of ques
+
+-- | Lookup or create a que
+que :: Namespace -> Quepath -> App Que
+que ns qp = do
+  _ques <- gets ques
+  let qbase     = grab ns _ques
+      queExists = HashMap.member qp qbase
+  if queExists
+    then return <| grab qp qbase
+    else do
+      c <- liftIO Go.chan
+      modify (insertQue ns qp c)
+      gets ques /> grab ns /> grab qp
+
+-- | Put data on the que.
+pushQue :: Quedata -> Que -> IO ()
+pushQue = flip Go.write
+
+-- | Tap and read from the Que. Tap first because a Que is actually a
+-- broadcast channel. This allows for multiconsumer Ques.
+takeQue :: Que -> IO Quedata
+takeQue ch = Go.mult ch >>= Go.tap
-- 
cgit v1.2.3