module Control.Eff.LogWriter.Async
( withAsyncLogWriter
, withAsyncLogging
)
where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.DeepSeq
import Control.Eff as Eff
import Control.Eff.Log
import Control.Eff.LogWriter.IO
import Control.Exception ( evaluate )
import Control.Monad ( unless )
import Control.Monad.Trans.Control ( MonadBaseControl
, liftBaseOp
)
import Data.Foldable ( traverse_ )
import Data.Kind ( )
import Data.Text as T
withAsyncLogging
:: (Lifted IO e, MonadBaseControl IO (Eff e), Integral len)
=> LogWriter IO
-> len
-> Text
-> Facility
-> LogPredicate
-> Eff (Logs : LogWriterReader IO : e) a
-> Eff e a
withAsyncLogging lw queueLength a f p e = liftBaseOp
(withAsyncLogChannel queueLength (runLogWriter lw . force))
(\lc -> withIoLogging (makeLogChannelWriter lc) a f p e)
withAsyncLogWriter
:: (LogsTo IO e, Lifted IO e, MonadBaseControl IO (Eff e), Integral len)
=> len
-> Eff e a
-> Eff e a
withAsyncLogWriter queueLength e = do
lw <- askLogWriter
liftBaseOp (withAsyncLogChannel queueLength (runLogWriter lw . force))
(\lc -> setLogWriter (makeLogChannelWriter lc) e)
withAsyncLogChannel
:: forall a len
. (Integral len)
=> len
-> (LogMessage -> IO ())
-> (LogChannel -> IO a)
-> IO a
withAsyncLogChannel queueLen ioWriter action = do
msgQ <- newTBQueueIO (fromIntegral queueLen)
withAsync (logLoop msgQ) (action . ConcurrentLogChannel msgQ)
where
logLoop tq = do
ms <- atomically $ do
h <- readTBQueue tq
t <- flushTBQueue tq
return (h : t)
traverse_ ioWriter ms
logLoop tq
makeLogChannelWriter :: LogChannel -> LogWriter IO
makeLogChannelWriter lc = mkLogWriterIO logChannelPutIO
where
logChannelPutIO (force -> me) = do
!m <- evaluate me
atomically
(do
dropMessage <- isFullTBQueue logQ
unless dropMessage (writeTBQueue logQ m)
)
logQ = fromLogChannel lc
data LogChannel = ConcurrentLogChannel
{ fromLogChannel :: TBQueue LogMessage
, _logChannelThread :: Async ()
}