#!/usr/bin/env run.sh {-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE NoImplicitPrelude #-} {-# OPTIONS_GHC -fno-warn-orphans #-} -- | Interprocess communication -- -- Prior art: -- - -- - -- - -- - sorta: and -- -- : out que-server module Biz.Que.Host ( main, ) where import Alpha hiding (gets, modify, poll) import qualified Control.Concurrent.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception import Data.HashMap.Lazy (HashMap) import qualified Data.HashMap.Lazy as HashMap import Network.HTTP.Media ((//), (/:)) import Network.Socket (SockAddr (..)) import qualified Network.Wai.Handler.Warp as Warp import qualified Omni.Cli as Cli import qualified Omni.Log as Log import Omni.Test ((@=?)) import qualified Omni.Test as Test import Servant import Servant.Server.Generic (AsServerT, genericServeT) import qualified Servant.Types.SourceT as Source import qualified System.Envy as Envy main :: IO () main = Cli.main <| Cli.Plan help move test pure move :: Cli.Arguments -> IO () move _ = Exception.bracket startup shutdown <| uncurry Warp.run where startup = Envy.decodeWithDefaults Envy.defConfig +> \cfg@Config {..} -> do initialState <- atomically <| STM.newTVar mempty -- natural transformation let nt :: AppState -> App a -> Servant.Handler a nt s x = runReaderT x s let app :: AppState -> Application app s = genericServeT (nt s) (paths cfg) Log.info ["boot", "que"] >> Log.br Log.info ["boot", "port", show <| quePort] >> Log.br Log.info ["boot", "skey", queSkey] >> Log.br pure (quePort, app initialState) shutdown :: a -> IO a shutdown = pure <. identity help :: Cli.Docopt help = [Cli.docopt| que-server Usage: que-server que-server test |] test :: Test.Tree test = Test.group "Biz.Que.Host" [ Test.unit "id" <| 1 @=? (1 :: Integer), Test.unit "putQue requires auth for '_'" <| do st <- atomically <| STM.newTVar mempty let cfg = Envy.defConfig let handlers = paths cfg -- Case 1: No auth, should fail let nonLocalHost = SockAddrInet 0 0 let handler1 = putQue handlers nonLocalHost Nothing "_" "testq" "body" res1 <- Servant.runHandler (runReaderT handler1 st) case res1 of Left err -> if errHTTPCode err == 401 then pure () else Test.assertFailure ("Expected 401, got " <> show err) Right _ -> Test.assertFailure "Expected failure, got success" -- Case 2: Correct auth, should succeed let handler2 = putQue handlers nonLocalHost (Just "admin-key") "_" "testq" "body" res2 <- Servant.runHandler (runReaderT handler2 st) case res2 of Left err -> Test.assertFailure (show err) Right _ -> pure () ] type App = ReaderT AppState Servant.Handler type Ques = HashMap Namespace Quebase type AppState = STM.TVar Ques data Config = Config { -- | QUE_PORT quePort :: Warp.Port, -- | QUE_SKEY queSkey :: Text } deriving (Generic, Show) instance Envy.DefConfig Config where defConfig = Config 3001 "admin-key" instance Envy.FromEnv Config -- | A simple HTML type. This recognizes "content-type: text/html" but doesn't -- do any conversion, rendering, or sanitization like the -- 'Servant.HTML.Lucid.HTML' type would do. data HTML deriving (Typeable) instance Accept HTML where contentTypes _ = "text" // "html" /: ("charset", "utf-8") :| ["text" // "html"] instance MimeRender HTML ByteString where mimeRender _ = str instance MimeUnrender HTML Text where mimeUnrender _ bs = Right <| str bs instance MimeUnrender OctetStream Text where mimeUnrender _ bs = Right <| str bs instance MimeRender PlainText ByteString where mimeRender _ = str instance MimeUnrender PlainText ByteString where mimeUnrender _ bs = Right <| str bs data Paths path = Paths { home :: path :- Get '[JSON] NoContent, dash :: path :- RemoteHost :> Header "Authorization" Text :> "_" :> "dash" :> Get '[JSON] Ques, getQue :: path :- RemoteHost :> Header "Authorization" Text :> Capture "ns" Text :> Capture "quename" Text :> Get '[PlainText, HTML, OctetStream] Message, getStream :: path :- RemoteHost :> Header "Authorization" Text :> Capture "ns" Text :> Capture "quename" Text :> "stream" :> StreamGet NoFraming OctetStream (SourceIO Message), putQue :: path :- RemoteHost :> Header "Authorization" Text :> Capture "ns" Text :> Capture "quepath" Text :> ReqBody '[PlainText, HTML, OctetStream] Text :> Post '[PlainText, HTML, OctetStream] NoContent } deriving (Generic) paths :: Config -> Paths (AsServerT App) paths Config {..} = Paths { home = throwError <| err301 {errHeaders = [("Location", "/_/index")]}, dash = \rh mAuth -> do checkAuth queSkey rh mAuth "_" gets, getQue = \rh mAuth ns qn -> do checkAuth queSkey rh mAuth ns guardNs ns ["pub", "_"] modify <| upsertNamespace ns q <- que ns qn Go.mult q |> liftIO +> Go.tap |> liftIO, getStream = \rh mAuth ns qn -> do checkAuth queSkey rh mAuth ns guardNs ns ["pub", "_"] modify <| upsertNamespace ns q <- que ns qn Go.mult q |> liftIO +> Go.tap |> Source.fromAction (const False) -- peek chan instead of False? |> pure, putQue = \rh mAuth ns qp body -> do checkAuth queSkey rh mAuth ns guardNs ns ["pub", "_"] modify <| upsertNamespace ns q <- que ns qp body |> str |> Go.write q >> Go.read q -- flush the que, otherwise msgs never clear |> liftIO -- TODO: detect number of readers, respond with "sent to N readers" or -- "no readers, msg lost" >> pure NoContent } checkAuth :: Text -> SockAddr -> Maybe Text -> Text -> App () checkAuth skey rh mAuth ns = do let authorized = mAuth == Just skey let isLocal = isLocalhost rh when (ns == "_" && not (authorized || isLocal)) <| do throwError err401 {errBody = "Authorized access only for '_' namespace"} isLocalhost :: SockAddr -> Bool isLocalhost (SockAddrInet _ h) = h == 0x0100007f -- 127.0.0.1 isLocalhost (SockAddrInet6 _ _ (0, 0, 0, 1) _) = True -- ::1 isLocalhost (SockAddrUnix _) = True isLocalhost _ = False -- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist` -- list, return a 405 error. guardNs :: (Applicative a, MonadError ServerError a) => Text -> [Text] -> a () guardNs ns whitelist = when (not <| ns `elem` whitelist) <| do throwError <| err405 {errBody = str msg} where msg = "not allowed: use 'pub' namespace or signup to protect '" <> ns <> "' at https://que.run" -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) -- | Inserts the namespace in 'Ques' if it doesn't exist. upsertNamespace :: Namespace -> HashMap Namespace Quebase -> HashMap Namespace Quebase upsertNamespace ns as = if HashMap.member ns as then as else HashMap.insert ns mempty as -- | Inserts the que at the proper 'Namespace' and 'Quepath'. insertQue :: Namespace -> Quepath -> Que -> HashMap Namespace Quebase -> HashMap Namespace Quebase insertQue ns qp q hm = newQues where newQues = HashMap.insert ns newQbase hm newQbase = HashMap.insert qp q <| grab ns hm -- | housing for a set of que paths type Namespace = Text -- | a que is just a channel of bytes type Que = Go.Channel Message -- | any path can serve as an identifier for a que type Quepath = Text -- | any opaque data type Message = ByteString -- | a collection of ques type Quebase = HashMap Quepath Que -- | Get app state gets :: App Ques gets = ask +> STM.readTVarIO .> liftIO +> pure -- | Apply a function to the app state modify :: (Ques -> Ques) -> App () modify f = ask +> flip STM.modifyTVar' f .> atomically .> liftIO -- | Lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do ques <- gets let qbase = grab ns ques if HashMap.member qp qbase then pure <| grab qp qbase else do c <- liftIO <| Go.chan 5 modify (insertQue ns qp c) gets /> grab ns /> grab qp