{-# LANGUAGE GeneralizedNewtypeDeriving #-}

{- Defines a Pulsar Monad, which wraps a ReaderT and runs internal computations in the background -}
module Pulsar.Internal.Core where

import           Control.Concurrent.Async       ( cancel )
import           Control.Concurrent.MVar
import qualified Control.Logging               as L
import           Control.Monad.Catch            ( MonadThrow
                                                , finally
                                                , throwM
                                                )
import           Control.Monad.Managed
import           Control.Monad.Reader
import           Data.Foldable                  ( traverse_ )
import           Data.IORef                     ( readIORef )
import           Pulsar.AppState                ( AppState(..) )
import           Pulsar.Connection              ( PulsarCtx(..) )

{- | Pulsar connection monad, which abstracts over a 'Managed' monad. -}
newtype Connection a = Connection (Managed a)
  deriving (a -> Connection b -> Connection a
(a -> b) -> Connection a -> Connection b
(forall a b. (a -> b) -> Connection a -> Connection b)
-> (forall a b. a -> Connection b -> Connection a)
-> Functor Connection
forall a b. a -> Connection b -> Connection a
forall a b. (a -> b) -> Connection a -> Connection b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Connection b -> Connection a
$c<$ :: forall a b. a -> Connection b -> Connection a
fmap :: (a -> b) -> Connection a -> Connection b
$cfmap :: forall a b. (a -> b) -> Connection a -> Connection b
Functor, Functor Connection
a -> Connection a
Functor Connection =>
(forall a. a -> Connection a)
-> (forall a b.
    Connection (a -> b) -> Connection a -> Connection b)
-> (forall a b c.
    (a -> b -> c) -> Connection a -> Connection b -> Connection c)
-> (forall a b. Connection a -> Connection b -> Connection b)
-> (forall a b. Connection a -> Connection b -> Connection a)
-> Applicative Connection
Connection a -> Connection b -> Connection b
Connection a -> Connection b -> Connection a
Connection (a -> b) -> Connection a -> Connection b
(a -> b -> c) -> Connection a -> Connection b -> Connection c
forall a. a -> Connection a
forall a b. Connection a -> Connection b -> Connection a
forall a b. Connection a -> Connection b -> Connection b
forall a b. Connection (a -> b) -> Connection a -> Connection b
forall a b c.
(a -> b -> c) -> Connection a -> Connection b -> Connection c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: Connection a -> Connection b -> Connection a
$c<* :: forall a b. Connection a -> Connection b -> Connection a
*> :: Connection a -> Connection b -> Connection b
$c*> :: forall a b. Connection a -> Connection b -> Connection b
liftA2 :: (a -> b -> c) -> Connection a -> Connection b -> Connection c
$cliftA2 :: forall a b c.
(a -> b -> c) -> Connection a -> Connection b -> Connection c
<*> :: Connection (a -> b) -> Connection a -> Connection b
$c<*> :: forall a b. Connection (a -> b) -> Connection a -> Connection b
pure :: a -> Connection a
$cpure :: forall a. a -> Connection a
$cp1Applicative :: Functor Connection
Applicative, Applicative Connection
a -> Connection a
Applicative Connection =>
(forall a b. Connection a -> (a -> Connection b) -> Connection b)
-> (forall a b. Connection a -> Connection b -> Connection b)
-> (forall a. a -> Connection a)
-> Monad Connection
Connection a -> (a -> Connection b) -> Connection b
Connection a -> Connection b -> Connection b
forall a. a -> Connection a
forall a b. Connection a -> Connection b -> Connection b
forall a b. Connection a -> (a -> Connection b) -> Connection b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> Connection a
$creturn :: forall a. a -> Connection a
>> :: Connection a -> Connection b -> Connection b
$c>> :: forall a b. Connection a -> Connection b -> Connection b
>>= :: Connection a -> (a -> Connection b) -> Connection b
$c>>= :: forall a b. Connection a -> (a -> Connection b) -> Connection b
$cp1Monad :: Applicative Connection
Monad, Monad Connection
Monad Connection =>
(forall a. IO a -> Connection a) -> MonadIO Connection
IO a -> Connection a
forall a. IO a -> Connection a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> Connection a
$cliftIO :: forall a. IO a -> Connection a
$cp1MonadIO :: Monad Connection
MonadIO, MonadIO Connection
MonadIO Connection =>
(forall a. Managed a -> Connection a) -> MonadManaged Connection
Managed a -> Connection a
forall a. Managed a -> Connection a
forall (m :: * -> *).
MonadIO m =>
(forall a. Managed a -> m a) -> MonadManaged m
using :: Managed a -> Connection a
$cusing :: forall a. Managed a -> Connection a
$cp1MonadManaged :: MonadIO Connection
MonadManaged)

instance MonadThrow Connection where
  throwM :: e -> Connection a
throwM = IO a -> Connection a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Connection a) -> (e -> IO a) -> e -> Connection a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM

{- | Alias for Connection PulsarCtx. -}
type PulsarConnection = Connection PulsarCtx

{- | The main Pulsar monad, which abstracts over a 'ReaderT' monad. -}
newtype Pulsar a = Pulsar (ReaderT PulsarCtx IO a)
  deriving (a -> Pulsar b -> Pulsar a
(a -> b) -> Pulsar a -> Pulsar b
(forall a b. (a -> b) -> Pulsar a -> Pulsar b)
-> (forall a b. a -> Pulsar b -> Pulsar a) -> Functor Pulsar
forall a b. a -> Pulsar b -> Pulsar a
forall a b. (a -> b) -> Pulsar a -> Pulsar b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Pulsar b -> Pulsar a
$c<$ :: forall a b. a -> Pulsar b -> Pulsar a
fmap :: (a -> b) -> Pulsar a -> Pulsar b
$cfmap :: forall a b. (a -> b) -> Pulsar a -> Pulsar b
Functor, Functor Pulsar
a -> Pulsar a
Functor Pulsar =>
(forall a. a -> Pulsar a)
-> (forall a b. Pulsar (a -> b) -> Pulsar a -> Pulsar b)
-> (forall a b c.
    (a -> b -> c) -> Pulsar a -> Pulsar b -> Pulsar c)
-> (forall a b. Pulsar a -> Pulsar b -> Pulsar b)
-> (forall a b. Pulsar a -> Pulsar b -> Pulsar a)
-> Applicative Pulsar
Pulsar a -> Pulsar b -> Pulsar b
Pulsar a -> Pulsar b -> Pulsar a
Pulsar (a -> b) -> Pulsar a -> Pulsar b
(a -> b -> c) -> Pulsar a -> Pulsar b -> Pulsar c
forall a. a -> Pulsar a
forall a b. Pulsar a -> Pulsar b -> Pulsar a
forall a b. Pulsar a -> Pulsar b -> Pulsar b
forall a b. Pulsar (a -> b) -> Pulsar a -> Pulsar b
forall a b c. (a -> b -> c) -> Pulsar a -> Pulsar b -> Pulsar c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: Pulsar a -> Pulsar b -> Pulsar a
$c<* :: forall a b. Pulsar a -> Pulsar b -> Pulsar a
*> :: Pulsar a -> Pulsar b -> Pulsar b
$c*> :: forall a b. Pulsar a -> Pulsar b -> Pulsar b
liftA2 :: (a -> b -> c) -> Pulsar a -> Pulsar b -> Pulsar c
$cliftA2 :: forall a b c. (a -> b -> c) -> Pulsar a -> Pulsar b -> Pulsar c
<*> :: Pulsar (a -> b) -> Pulsar a -> Pulsar b
$c<*> :: forall a b. Pulsar (a -> b) -> Pulsar a -> Pulsar b
pure :: a -> Pulsar a
$cpure :: forall a. a -> Pulsar a
$cp1Applicative :: Functor Pulsar
Applicative, Applicative Pulsar
a -> Pulsar a
Applicative Pulsar =>
(forall a b. Pulsar a -> (a -> Pulsar b) -> Pulsar b)
-> (forall a b. Pulsar a -> Pulsar b -> Pulsar b)
-> (forall a. a -> Pulsar a)
-> Monad Pulsar
Pulsar a -> (a -> Pulsar b) -> Pulsar b
Pulsar a -> Pulsar b -> Pulsar b
forall a. a -> Pulsar a
forall a b. Pulsar a -> Pulsar b -> Pulsar b
forall a b. Pulsar a -> (a -> Pulsar b) -> Pulsar b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> Pulsar a
$creturn :: forall a. a -> Pulsar a
>> :: Pulsar a -> Pulsar b -> Pulsar b
$c>> :: forall a b. Pulsar a -> Pulsar b -> Pulsar b
>>= :: Pulsar a -> (a -> Pulsar b) -> Pulsar b
$c>>= :: forall a b. Pulsar a -> (a -> Pulsar b) -> Pulsar b
$cp1Monad :: Applicative Pulsar
Monad, Monad Pulsar
Monad Pulsar => (forall a. IO a -> Pulsar a) -> MonadIO Pulsar
IO a -> Pulsar a
forall a. IO a -> Pulsar a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> Pulsar a
$cliftIO :: forall a. IO a -> Pulsar a
$cp1MonadIO :: Monad Pulsar
MonadIO, MonadReader PulsarCtx)

{- | Runs a Pulsar computation with default logging to standard output -}
runPulsar :: PulsarConnection -> Pulsar a -> IO ()
runPulsar :: PulsarConnection -> Pulsar a -> IO ()
runPulsar = LogOptions -> PulsarConnection -> Pulsar a -> IO ()
forall a. LogOptions -> PulsarConnection -> Pulsar a -> IO ()
runPulsar' (LogLevel -> LogOutput -> LogOptions
LogOptions LogLevel
Debug LogOutput
StdOut)

{- | Runs a Pulsar computation with the supplied logging options -}
runPulsar' :: LogOptions -> PulsarConnection -> Pulsar a -> IO ()
runPulsar' :: LogOptions -> PulsarConnection -> Pulsar a -> IO ()
runPulsar' (LogOptions lvl :: LogLevel
lvl out :: LogOutput
out) (Connection mgd :: Managed PulsarCtx
mgd) (Pulsar mr :: ReaderT PulsarCtx IO a
mr) = do
  LogLevel -> IO ()
L.setLogLevel (LogLevel -> IO ()) -> LogLevel -> IO ()
forall a b. (a -> b) -> a -> b
$ LogLevel -> LogLevel
fromLogLevel LogLevel
lvl
  String -> IO ()
L.setLogTimeFormat "%H:%M:%S%Q"
  case LogOutput
out of
    StdOut  -> IO () -> IO ()
forall (m :: * -> *) a.
(MonadBaseControl IO m, MonadIO m) =>
m a -> m a
L.withStdoutLogging IO ()
runner
    File fp :: String
fp -> String -> IO () -> IO ()
forall (m :: * -> *) a.
(MonadBaseControl IO m, MonadIO m) =>
String -> m a -> m a
L.withFileLogging String
fp IO ()
runner
 where
  runner :: IO ()
runner = Managed () -> IO ()
runManaged (Managed () -> IO ()) -> Managed () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    PulsarCtx
ctx <- Managed PulsarCtx
mgd
    Managed a -> Managed ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Managed a -> Managed ())
-> (IO a -> Managed a) -> IO a -> Managed ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO a -> Managed a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Managed ()) -> IO a -> Managed ()
forall a b. (a -> b) -> a -> b
$ ReaderT PulsarCtx IO a -> PulsarCtx -> IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT PulsarCtx IO a
mr PulsarCtx
ctx IO a -> IO () -> IO a
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` PulsarCtx -> IO ()
finalizers PulsarCtx
ctx
  finalizers :: PulsarCtx -> IO ()
finalizers ctx :: PulsarCtx
ctx = do
    let (worker :: Async ()
worker, connVar :: MVar ()
connVar) = PulsarCtx -> (Async (), MVar ())
ctxConnWorker PulsarCtx
ctx
    AppState
app <- IORef AppState -> IO AppState
forall a. IORef a -> IO a
readIORef (PulsarCtx -> IORef AppState
ctxState PulsarCtx
ctx)
    ((Async (), MVar ()) -> IO ()) -> [(Async (), MVar ())] -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (\(a :: Async ()
a, v :: MVar ()
v) -> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
v () IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
a) (AppState -> [(Async (), MVar ())]
_appWorkers AppState
app)
      IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
connVar ()
    Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
worker

{- | Internal logging options. Can be used together with `runPulsar'`. -}
data LogOptions = LogOptions
  { LogOptions -> LogLevel
logLevel :: LogLevel
  , LogOptions -> LogOutput
logOutput :: LogOutput
  } deriving Int -> LogOptions -> ShowS
[LogOptions] -> ShowS
LogOptions -> String
(Int -> LogOptions -> ShowS)
-> (LogOptions -> String)
-> ([LogOptions] -> ShowS)
-> Show LogOptions
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LogOptions] -> ShowS
$cshowList :: [LogOptions] -> ShowS
show :: LogOptions -> String
$cshow :: LogOptions -> String
showsPrec :: Int -> LogOptions -> ShowS
$cshowsPrec :: Int -> LogOptions -> ShowS
Show

{- | Internal logging level, part of 'LogOptions'. Can be used together with `runPulsar'`. -}
data LogLevel = Error | Warn | Info | Debug deriving Int -> LogLevel -> ShowS
[LogLevel] -> ShowS
LogLevel -> String
(Int -> LogLevel -> ShowS)
-> (LogLevel -> String) -> ([LogLevel] -> ShowS) -> Show LogLevel
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LogLevel] -> ShowS
$cshowList :: [LogLevel] -> ShowS
show :: LogLevel -> String
$cshow :: LogLevel -> String
showsPrec :: Int -> LogLevel -> ShowS
$cshowsPrec :: Int -> LogLevel -> ShowS
Show

{- | Internal logging output, part of 'LogOptions'. Can be used together with `runPulsar'`. -}
data LogOutput = StdOut | File FilePath deriving Int -> LogOutput -> ShowS
[LogOutput] -> ShowS
LogOutput -> String
(Int -> LogOutput -> ShowS)
-> (LogOutput -> String)
-> ([LogOutput] -> ShowS)
-> Show LogOutput
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LogOutput] -> ShowS
$cshowList :: [LogOutput] -> ShowS
show :: LogOutput -> String
$cshow :: LogOutput -> String
showsPrec :: Int -> LogOutput -> ShowS
$cshowsPrec :: Int -> LogOutput -> ShowS
Show

fromLogLevel :: LogLevel -> L.LogLevel
fromLogLevel :: LogLevel -> LogLevel
fromLogLevel Error = LogLevel
L.LevelError
fromLogLevel Warn  = LogLevel
L.LevelWarn
fromLogLevel Info  = LogLevel
L.LevelInfo
fromLogLevel Debug = LogLevel
L.LevelDebug