#!/usr/bin/env run.sh
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE NoImplicitPrelude #-}

-- | An EDSL to make working with concurrent in-process code a bit easier
--   to read.
--
-- This module is expected to be imported qualified as `Go`. Inspired by
-- Golang and Clojure's core.async.
--
-- \$example
-- : out go
module Control.Concurrent.Go
  ( -- * Running and forking
    fork,

    -- * Channels
    Channel,
    Mult,
    chan,
    read,
    write,
    mult,
    tap,

    -- * internal
    sleep,
    test,
    main,
  )
where

import Alpha
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Chan.Unagi.Bounded as Chan
import qualified Data.Aeson as Aeson
import qualified Omni.Cli as Cli
import Omni.Test ((@?=))
import qualified Omni.Test as Test
import qualified System.IO.Unsafe as Unsafe

main :: IO ()
main =
  Cli.main
    <| Cli.Plan
      { Cli.help = help,
        Cli.move = \_ -> pure (),
        Cli.test = test,
        Cli.tidy = pure
      }
  where
    help =
      [Cli.docopt|
  go

Usage:
  go test
  |]

-- | A standard channel.
data Channel a = Channel
  { _in :: !(Chan.InChan a),
    _out :: !(Chan.OutChan a),
    _size :: !Int
  }

instance Aeson.ToJSON (Channel a) where
  toJSON c = Aeson.String ("#<channel " <> len c <> ">" :: Text)
    where
      len = show <. Unsafe.unsafePerformIO <. Chan.estimatedLength <. _in

-- | Starts a background process.
fork :: IO () -> IO Concurrent.ThreadId
fork = Concurrent.forkIO

-- | Make a new channel.
chan :: Int -> IO (Channel a)
chan n = do
  (i, o) <- Chan.newChan n
  pure <| Channel i o n

-- | A channel for broadcasting to multiple consumers. See 'mult'.
type Mult a = Chan.OutChan a

-- | Duplicates a channel, but then anything written to the source will
-- be available to both. This is like Clojure's `core.async/mult`
mult :: Channel a -> IO (Mult a)
mult = Chan.dupChan <. _in

-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`.
--
-- You can use this to read from a channel in a background process, e.g.:
--
-- >>> c <- Go.chan
-- >>> Go.fork <. forever <| Go.mult c +> Go.tap +> print
tap :: Mult a -> IO a
tap = Chan.readChan

-- | Take from a channel. Blocks until a value is received.
read :: Channel a -> IO a
read = Chan.readChan <. _out

-- | Write to a channel. Blocks if the channel is full.
write :: Channel a -> a -> IO Bool
write = Chan.tryWriteChan <. _in

-- | Sleep for some number of milliseconds
sleep :: Int -> IO ()
sleep n = threadDelay <| n * 1_000

-- <|example
--
-- A simple example from ghci:
--
-- >>> import qualified Control.Concurrent.Go as Go
-- >>> c <- Go.chan :: IO (Go.Channel Text)
-- >>> Go.write c "test"
-- >>> Go.read c
-- "test"
--
-- An example with tap and mult:
--
-- >>> c <- Go.chan :: IO (Go.Channel Text)
-- >>> Go.write c "hi"
-- >>> Go.read c
-- "hi"
-- m <- Go.mult
-- >>> Go.fork <| forever (Go.tap m +> \t -> print ("one: " <> t))
-- ThreadId 810
-- >>> Go.fork <| forever (Go.tap m +> \t -> print ("two: " <> t))
-- ThreadId 825
-- >>> Go.write c "test"
-- "two: t"eosnte":
--  test"
--
-- The text is garbled because the actions are happening concurrently and
-- trying to serialize to write the output, but you get the idea.
--
test :: Test.Tree
test =
  Test.group
    "Control.Concurrent.Go"
    [ Test.unit "simple example" <| do
        c <- chan 1 :: IO (Channel Text)
        recv <- mult c
        _ <- fork (forever (tap recv +> pure))
        ret <- write c "simple example"
        True @?= ret,
      Test.unit "simple MVar counter" <| do
        counter <- newEmptyMVar
        putMVar counter (0 :: Integer)
        modifyMVar_ counter (pure <. (+ 1))
        modifyMVar_ counter (pure <. (+ 1))
        modifyMVar_ counter (pure <. (+ 1))
        r <- takeMVar counter
        r @?= 3
        {- Why don't these work?
         Test.unit "subscription counter" <| do
          counter <- newEmptyMVar :: IO (MVar Integer)
          putMVar counter 0
          let dec = modifyMVar_ counter (\x -> pure <| x -1)
          let inc = modifyMVar_ counter (pure <. (+ 1))
          c <- chan 10 :: IO (Channel Bool)
          c1 <- mult c
          _ <- fork (forever (tap c1 +> bool dec inc))
          _ <- write c True
          _ <- write c True
          _ <- write c True
          threadDelay 1
          r1 <- takeMVar counter
          r1 @?= 3,
        Test.unit "SPMC" <| do
          out1 <- newEmptyMVar
          out2 <- newEmptyMVar
          putMVar out1 "init"
          putMVar out2 "init"
          c <- chan 10 :: IO (Channel Text)
          c1 <- mult c
          c2 <- mult c
          _ <- fork <| forever (tap c1 +> swapMVar out1 >> pure ())
          _ <- fork <| forever (tap c2 +> swapMVar out2 >> pure ())
          _ <- write c "test1"
          _ <- write c "test2"
          threadDelay 1
          r1 <- takeMVar out1
          r2 <- takeMVar out2
          r1 @?= r2
          r1 @?= "test2",
        Test.unit "Unagi SPMC" <| do
          out1 <- newEmptyMVar
          out2 <- newEmptyMVar
          putMVar out1 "init"
          putMVar out2 "init"
          (i, _) <- Chan.newChan 10 :: IO (Chan.InChan Text, Chan.OutChan Text)
          o1 <- Chan.dupChan i
          o2 <- Chan.dupChan i
          _ <- forkIO <| forever (Chan.readChan o1 +> swapMVar out1 >> pure ())
          _ <- forkIO <| forever (Chan.readChan o2 +> swapMVar out2 >> pure ())
          _ <- Chan.writeChan i "test1"
          _ <- Chan.writeChan i "test2"
          threadDelay 1
          r1 <- takeMVar out1
          r2 <- takeMVar out2
          r1 @?= r2
          r1 @?= "test2"
        -}
    ]