{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Hercules.Agent.Producer where

import Control.Applicative
import Control.Concurrent hiding (throwTo)
import Control.Concurrent.Async hiding (cancel)
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.State
import Data.Foldable
import Data.Traversable
import Katip
import UnliftIO.Exception
import Prelude

-- | A thread producing zero or more payloads and a final value.
-- Handles exception propagation.
data Producer p r = Producer
  { forall p r. Producer p r -> STM (Msg p r)
producerQueueRead :: STM (Msg p r),
    forall p r. Producer p r -> ThreadId
producerThread :: ThreadId
  }
  deriving (forall a b. a -> Producer p b -> Producer p a
forall a b. (a -> b) -> Producer p a -> Producer p b
forall p a b. a -> Producer p b -> Producer p a
forall p a b. (a -> b) -> Producer p a -> Producer p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Producer p b -> Producer p a
$c<$ :: forall p a b. a -> Producer p b -> Producer p a
fmap :: forall a b. (a -> b) -> Producer p a -> Producer p b
$cfmap :: forall p a b. (a -> b) -> Producer p a -> Producer p b
Functor)

data ProducerCancelled = ProducerCancelled
  deriving (Int -> ProducerCancelled -> ShowS
[ProducerCancelled] -> ShowS
ProducerCancelled -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProducerCancelled] -> ShowS
$cshowList :: [ProducerCancelled] -> ShowS
show :: ProducerCancelled -> String
$cshow :: ProducerCancelled -> String
showsPrec :: Int -> ProducerCancelled -> ShowS
$cshowsPrec :: Int -> ProducerCancelled -> ShowS
Show, Show ProducerCancelled
Typeable ProducerCancelled
SomeException -> Maybe ProducerCancelled
ProducerCancelled -> String
ProducerCancelled -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ProducerCancelled -> String
$cdisplayException :: ProducerCancelled -> String
fromException :: SomeException -> Maybe ProducerCancelled
$cfromException :: SomeException -> Maybe ProducerCancelled
toException :: ProducerCancelled -> SomeException
$ctoException :: ProducerCancelled -> SomeException
Exception, Typeable)

data Msg p r
  = -- | One of possibly many payloads from the producer
    Payload p
  | -- | The producer stopped due to an exception
    Exception SomeException
  | -- | The producer was done and produced a final value
    Close r
  deriving (forall a b. a -> Msg p b -> Msg p a
forall a b. (a -> b) -> Msg p a -> Msg p b
forall p a b. a -> Msg p b -> Msg p a
forall p a b. (a -> b) -> Msg p a -> Msg p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Msg p b -> Msg p a
$c<$ :: forall p a b. a -> Msg p b -> Msg p a
fmap :: forall a b. (a -> b) -> Msg p a -> Msg p b
$cfmap :: forall p a b. (a -> b) -> Msg p a -> Msg p b
Functor)

-- | @forkProducer f@ produces a computation that forks a thread for @f@, which
-- receives a function for returning payloads @p@.
--
-- @f@ may produce a final result value @r@ when it is done.
forkProducer :: forall m p r. (MonadIO m, MonadUnliftIO m) => ((p -> m ()) -> m r) -> m (Producer p r)
forkProducer :: forall (m :: * -> *) p r.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f = do
  TQueue (Msg p r)
q <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TQueue a)
newTQueueIO
  let write :: MonadIO m' => Msg p r -> m' ()
      write :: forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Msg p r)
q
  IO r
f' <- forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (IO a)
toIO ((p -> m ()) -> m r
f (forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall p r. p -> Msg p r
Payload))
  ThreadId
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO r
f' (forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {r} {p}. Either SomeException r -> Msg p r
toResult)
  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Producer {producerQueueRead :: STM (Msg p r)
producerQueueRead = forall a. TQueue a -> STM a
readTQueue TQueue (Msg p r)
q, producerThread :: ThreadId
producerThread = ThreadId
t}
  where
    toResult :: Either SomeException r -> Msg p r
toResult (Left SomeException
e) = forall p r. SomeException -> Msg p r
Exception SomeException
e
    toResult (Right r
r) = forall p r. r -> Msg p r
Close r
r

-- | Throws 'ProducerCancelled' as an async exception to the producer thread.
-- Blocks until exception is raised. See 'throwTo'.
cancel :: MonadIO m => Producer p r -> m ()
cancel :: forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel Producer p r
p = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e (m :: * -> *).
(Exception e, MonadIO m) =>
ThreadId -> e -> m ()
throwTo (forall p r. Producer p r -> ThreadId
producerThread Producer p r
p) ProducerCancelled
ProducerCancelled

-- | Perform an computation while @withProducer@ takes care of forking and cleaning up.
--
-- @withProducer (\write -> write "a" >> write "b") $ \producer -> consume producer@
withProducer ::
  (MonadIO m, MonadUnliftIO m) =>
  ((p -> m ()) -> m r) ->
  (Producer p r -> m a) ->
  m a
withProducer :: forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer (p -> m ()) -> m r
f = forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (forall (m :: * -> *) p r.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f) forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel

listen ::
  MonadIO m =>
  Producer p r ->
  (p -> m a) ->
  (r -> m a) ->
  STM (m a)
listen :: forall (m :: * -> *) p r a.
MonadIO m =>
Producer p r -> (p -> m a) -> (r -> m a) -> STM (m a)
listen Producer p r
p p -> m a
fPayload r -> m a
fResult =
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Msg p r -> m a
f (forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
p)
  where
    f :: Msg p r -> m a
f (Payload p
payload) = p -> m a
fPayload p
payload
    f (Exception SomeException
e) = forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
    f (Close r
r) = r -> m a
fResult r
r

joinSTM :: MonadIO m => STM (m a) -> m a
joinSTM :: forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM = forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically

data Syncing a = Syncable a | Syncer (Maybe SomeException -> STM ())

-- | Sends sync notifications after the whole computation succeeds (or fails)
-- Note: not exception safe in the presence of pure exceptions.
withSync ::
  (MonadIO m, MonadUnliftIO m, Traversable t) =>
  t (Syncing a) ->
  (t (Maybe a) -> m b) ->
  m b
withSync :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadIO m, MonadUnliftIO m, Traversable t) =>
t (Syncing a) -> (t (Maybe a) -> m b) -> m b
withSync t (Syncing a)
t t (Maybe a) -> m b
f = do
  let (t (Maybe a)
t', Maybe SomeException -> STM ()
syncs) =
        forall s a. State s a -> s -> (a, s)
runState
          ( forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t (Syncing a)
t forall a b. (a -> b) -> a -> b
$ \case
              Syncable a
a -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just a
a)
              Syncer Maybe SomeException -> STM ()
s -> forall a. Maybe a
Nothing forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Maybe SomeException -> STM ()
s)
          )
          (\Maybe SomeException
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
  b
b <- t (Maybe a) -> m b
f t (Maybe a)
t' forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe SomeException -> STM ()
syncs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just)
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ Maybe SomeException -> STM ()
syncs forall a. Maybe a
Nothing
  forall (f :: * -> *) a. Applicative f => a -> f a
pure b
b

--  where trav =
--  deriving (Functor)
-- instance Applicative Syncing where
--  pure = Synced Nothing
--  Synced sf f <*> Synced af a = Synced (sf <> af) (f a)

-- Potential improvements:
--  - Performance: Get rid of the producer thread by doing the batching in STM.
--                 (pinging thread still required)
--  - Performance: Multiple elements as input
--  - Idle footprint: The pinger can be made to wait for the queue to be non-empty before starting the delay.
--     - Add a tryPeek function to Producer
--     - Make sure it is not woken up after the queue has become non-empty
--     - Alternatively, maybe use stm-delay (which uses GHC.Event for efficiency)
--       https://hackage.haskell.org/package/stm-delay-0.1.1.1/docs/Control-Concurrent-STM-Delay.html
withBoundedDelayBatchProducer ::
  (MonadIO m, MonadUnliftIO m, KatipContext m) =>
  -- | Max time before flushing in microseconds
  Int ->
  -- | Max number of items in batch
  Int ->
  Producer p r ->
  (Producer [p] r -> m a) ->
  m a
withBoundedDelayBatchProducer :: forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m, KatipContext m) =>
Int -> Int -> Producer p r -> (Producer [p] r -> m a) -> m a
withBoundedDelayBatchProducer Int
maxDelay Int
maxItems Producer p r
sourceP Producer [p] r -> m a
f = do
  UnliftIO {unliftIO :: forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO = forall a. m a -> IO a
unlift} <- forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
  TQueue ()
flushes <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TQueue a)
newTQueueIO
  let producer :: ([p] -> f ()) -> f r
producer [p] -> f ()
writeBatch =
        let beginReading :: f r
beginReading = Int -> [p] -> f r
readItems (forall a. Ord a => a -> a -> a
max Int
1 Int
maxItems) []
            doPerformBatch :: [p] -> f ()
doPerformBatch [] = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            doPerformBatch [p]
buf = [p] -> f ()
writeBatch (forall a. [a] -> [a]
reverse [p]
buf)
            readItems :: Int -> [p] -> f r
readItems Int
0 [p]
buf = do
              -- logLocM DebugS "batch on full"
              [p] -> f ()
doPerformBatch [p]
buf
              f r
beginReading
            readItems Int
bufferRemaining [p]
buf =
              forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM
                ( Msg p r -> f r
onQueueRead forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
sourceP
                    forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> f r
onFlush
                      forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TQueue a -> STM a
readTQueue TQueue ()
flushes
                )
              where
                onQueueRead :: Msg p r -> f r
onQueueRead (Payload p
a) =
                  Int -> [p] -> f r
readItems (Int
bufferRemaining forall a. Num a => a -> a -> a
- Int
1) (p
a forall a. a -> [a] -> [a]
: [p]
buf)
                onQueueRead (Close r
r) = do
                  -- logLocM DebugS $ "batch on close: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
                onQueueRead (Exception SomeException
e) = do
                  -- logLocM DebugS $ "batch on exception: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
                onFlush :: f r
onFlush = do
                  -- logLocM DebugS $ "batch on flush: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  f r
beginReading
         in f r
beginReading
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    forall a b. (a -> b) -> a -> b
$ forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync
      ( forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
          Int -> IO ()
threadDelay Int
maxDelay
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ()
flushes ()
      )
    forall a b. (a -> b) -> a -> b
$ \Async Any
_flusher -> forall a. m a -> IO a
unlift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer forall {f :: * -> *}. MonadIO f => ([p] -> f ()) -> f r
producer Producer [p] r -> m a
f

syncer :: MonadIO m => (Syncing a -> m ()) -> m ()
syncer :: forall (m :: * -> *) a. MonadIO m => (Syncing a -> m ()) -> m ()
syncer Syncing a -> m ()
writer = do
  TMVar (Maybe SomeException)
v <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TMVar a)
newEmptyTMVarIO
  Syncing a -> m ()
writer (forall a. (Maybe SomeException -> STM ()) -> Syncing a
Syncer forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
v)
  Maybe SomeException
mexc <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
readTMVar TMVar (Maybe SomeException)
v
  forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe SomeException
mexc (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO)