{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Hercules.Agent.Producer where

import Control.Applicative
import Control.Concurrent hiding (throwTo)
import Control.Concurrent.Async hiding (cancel)
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.State
import Data.Foldable
import Data.Traversable
import UnliftIO.Exception
import Prelude

-- | A thread producing zero or more payloads and a final value.
-- Handles exception propagation.
data Producer p r = Producer
  { forall p r. Producer p r -> STM (Msg p r)
producerQueueRead :: STM (Msg p r),
    forall p r. Producer p r -> ThreadId
producerThread :: ThreadId
  }
  deriving (forall a b. a -> Producer p b -> Producer p a
forall a b. (a -> b) -> Producer p a -> Producer p b
forall p a b. a -> Producer p b -> Producer p a
forall p a b. (a -> b) -> Producer p a -> Producer p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Producer p b -> Producer p a
$c<$ :: forall p a b. a -> Producer p b -> Producer p a
fmap :: forall a b. (a -> b) -> Producer p a -> Producer p b
$cfmap :: forall p a b. (a -> b) -> Producer p a -> Producer p b
Functor)

data ProducerCancelled = ProducerCancelled
  deriving (Int -> ProducerCancelled -> ShowS
[ProducerCancelled] -> ShowS
ProducerCancelled -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProducerCancelled] -> ShowS
$cshowList :: [ProducerCancelled] -> ShowS
show :: ProducerCancelled -> String
$cshow :: ProducerCancelled -> String
showsPrec :: Int -> ProducerCancelled -> ShowS
$cshowsPrec :: Int -> ProducerCancelled -> ShowS
Show, Show ProducerCancelled
Typeable ProducerCancelled
SomeException -> Maybe ProducerCancelled
ProducerCancelled -> String
ProducerCancelled -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ProducerCancelled -> String
$cdisplayException :: ProducerCancelled -> String
fromException :: SomeException -> Maybe ProducerCancelled
$cfromException :: SomeException -> Maybe ProducerCancelled
toException :: ProducerCancelled -> SomeException
$ctoException :: ProducerCancelled -> SomeException
Exception, Typeable)

data Msg p r
  = -- | One of possibly many payloads from the producer
    Payload p
  | -- | The producer stopped due to an exception
    Exception SomeException
  | -- | The producer was done and produced a final value
    Close r
  deriving (forall a b. a -> Msg p b -> Msg p a
forall a b. (a -> b) -> Msg p a -> Msg p b
forall p a b. a -> Msg p b -> Msg p a
forall p a b. (a -> b) -> Msg p a -> Msg p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Msg p b -> Msg p a
$c<$ :: forall p a b. a -> Msg p b -> Msg p a
fmap :: forall a b. (a -> b) -> Msg p a -> Msg p b
$cfmap :: forall p a b. (a -> b) -> Msg p a -> Msg p b
Functor)

-- | @forkProducer f@ produces a computation that forks a thread for @f@, which
-- receives a function for returning payloads @p@.
--
-- @f@ may produce a final result value @r@ when it is done.
forkProducer :: forall m p r. (MonadUnliftIO m) => ((p -> m ()) -> m r) -> m (Producer p r)
forkProducer :: forall (m :: * -> *) p r.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f = do
  TQueue (Msg p r)
q <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TQueue a)
newTQueueIO
  let write :: MonadIO m' => Msg p r -> m' ()
      write :: forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Msg p r)
q
  IO r
f' <- forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (IO a)
toIO ((p -> m ()) -> m r
f (forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall p r. p -> Msg p r
Payload))
  ThreadId
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO r
f' (forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {r} {p}. Either SomeException r -> Msg p r
toResult)
  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Producer {producerQueueRead :: STM (Msg p r)
producerQueueRead = forall a. TQueue a -> STM a
readTQueue TQueue (Msg p r)
q, producerThread :: ThreadId
producerThread = ThreadId
t}
  where
    toResult :: Either SomeException r -> Msg p r
toResult (Left SomeException
e) = forall p r. SomeException -> Msg p r
Exception SomeException
e
    toResult (Right r
r) = forall p r. r -> Msg p r
Close r
r

-- | Throws 'ProducerCancelled' as an async exception to the producer thread.
-- Blocks until exception is raised. See 'throwTo'.
cancel :: MonadIO m => Producer p r -> m ()
cancel :: forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel Producer p r
p = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e (m :: * -> *).
(Exception e, MonadIO m) =>
ThreadId -> e -> m ()
throwTo (forall p r. Producer p r -> ThreadId
producerThread Producer p r
p) ProducerCancelled
ProducerCancelled

-- | Perform an computation while @withProducer@ takes care of forking and cleaning up.
--
-- @withProducer (\write -> write "a" >> write "b") $ \producer -> consume producer@
withProducer ::
  (MonadUnliftIO m) =>
  ((p -> m ()) -> m r) ->
  (Producer p r -> m a) ->
  m a
withProducer :: forall (m :: * -> *) p r a.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer (p -> m ()) -> m r
f = forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (forall (m :: * -> *) p r.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f) forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel

listen ::
  MonadIO m =>
  Producer p r ->
  (p -> m a) ->
  (r -> m a) ->
  STM (m a)
listen :: forall (m :: * -> *) p r a.
MonadIO m =>
Producer p r -> (p -> m a) -> (r -> m a) -> STM (m a)
listen Producer p r
p p -> m a
fPayload r -> m a
fResult =
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Msg p r -> m a
f (forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
p)
  where
    f :: Msg p r -> m a
f (Payload p
payload) = p -> m a
fPayload p
payload
    f (Exception SomeException
e) = forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
    f (Close r
r) = r -> m a
fResult r
r

joinSTM :: MonadIO m => STM (m a) -> m a
joinSTM :: forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM = forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically

data Syncing a = Syncable a | Syncer (Maybe SomeException -> STM ())

-- | Sends sync notifications after the whole computation succeeds (or fails)
-- Note: not exception safe in the presence of pure exceptions.
withSync ::
  (MonadUnliftIO m, Traversable t) =>
  t (Syncing a) ->
  (t (Maybe a) -> m b) ->
  m b
withSync :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
t (Syncing a) -> (t (Maybe a) -> m b) -> m b
withSync t (Syncing a)
t t (Maybe a) -> m b
f = do
  let (t (Maybe a)
t', Maybe SomeException -> STM ()
syncs) =
        forall s a. State s a -> s -> (a, s)
runState
          ( forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t (Syncing a)
t forall a b. (a -> b) -> a -> b
$ \case
              Syncable a
a -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just a
a)
              Syncer Maybe SomeException -> STM ()
s -> forall a. Maybe a
Nothing forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Maybe SomeException -> STM ()
s)
          )
          (\Maybe SomeException
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
  b
b <- t (Maybe a) -> m b
f t (Maybe a)
t' forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe SomeException -> STM ()
syncs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just)
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ Maybe SomeException -> STM ()
syncs forall a. Maybe a
Nothing
  forall (f :: * -> *) a. Applicative f => a -> f a
pure b
b

--  where trav =
--  deriving (Functor)
-- instance Applicative Syncing where
--  pure = Synced Nothing
--  Synced sf f <*> Synced af a = Synced (sf <> af) (f a)

-- Potential improvements:
--  - Performance: Get rid of the producer thread by doing the batching in STM.
--                 (pinging thread still required)
--  - Performance: Multiple elements as input
--  - Idle footprint: The pinger can be made to wait for the queue to be non-empty before starting the delay.
--     - Add a tryPeek function to Producer
--     - Make sure it is not woken up after the queue has become non-empty
--     - Alternatively, maybe use stm-delay (which uses GHC.Event for efficiency)
--       https://hackage.haskell.org/package/stm-delay-0.1.1.1/docs/Control-Concurrent-STM-Delay.html
withBoundedDelayBatchProducer ::
  (MonadUnliftIO m) =>
  -- | Max time before flushing in microseconds
  Int ->
  -- | Max number of items in batch
  Int ->
  Producer p r ->
  (Producer [p] r -> m a) ->
  m a
withBoundedDelayBatchProducer :: forall (m :: * -> *) p r a.
MonadUnliftIO m =>
Int -> Int -> Producer p r -> (Producer [p] r -> m a) -> m a
withBoundedDelayBatchProducer Int
maxDelay Int
maxItems Producer p r
sourceP Producer [p] r -> m a
f = do
  UnliftIO {unliftIO :: forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO = forall a. m a -> IO a
unlift} <- forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
  TQueue ()
flushes <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TQueue a)
newTQueueIO
  let producer :: ([p] -> f ()) -> f r
producer [p] -> f ()
writeBatch =
        let beginReading :: f r
beginReading = Int -> [p] -> f r
readItems (forall a. Ord a => a -> a -> a
max Int
1 Int
maxItems) []
            doPerformBatch :: [p] -> f ()
doPerformBatch [] = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            doPerformBatch [p]
buf = [p] -> f ()
writeBatch (forall a. [a] -> [a]
reverse [p]
buf)
            readItems :: Int -> [p] -> f r
readItems Int
0 [p]
buf = do
              -- logLocM DebugS "batch on full"
              [p] -> f ()
doPerformBatch [p]
buf
              f r
beginReading
            readItems Int
bufferRemaining [p]
buf =
              forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM
                ( Msg p r -> f r
onQueueRead forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
sourceP
                    forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> f r
onFlush
                      forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TQueue a -> STM a
readTQueue TQueue ()
flushes
                )
              where
                onQueueRead :: Msg p r -> f r
onQueueRead (Payload p
a) =
                  Int -> [p] -> f r
readItems (Int
bufferRemaining forall a. Num a => a -> a -> a
- Int
1) (p
a forall a. a -> [a] -> [a]
: [p]
buf)
                onQueueRead (Close r
r) = do
                  -- logLocM DebugS $ "batch on close: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
                onQueueRead (Exception SomeException
e) = do
                  -- logLocM DebugS $ "batch on exception: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
                onFlush :: f r
onFlush = do
                  -- logLocM DebugS $ "batch on flush: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  f r
beginReading
         in f r
beginReading
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    forall a b. (a -> b) -> a -> b
$ forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync
      ( forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
          Int -> IO ()
threadDelay Int
maxDelay
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ()
flushes ()
      )
    forall a b. (a -> b) -> a -> b
$ \Async Any
_flusher -> forall a. m a -> IO a
unlift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) p r a.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer forall {f :: * -> *}. MonadIO f => ([p] -> f ()) -> f r
producer Producer [p] r -> m a
f

syncer :: MonadIO m => (Syncing a -> m ()) -> m ()
syncer :: forall (m :: * -> *) a. MonadIO m => (Syncing a -> m ()) -> m ()
syncer Syncing a -> m ()
writer = do
  TMVar (Maybe SomeException)
v <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TMVar a)
newEmptyTMVarIO
  Syncing a -> m ()
writer (forall a. (Maybe SomeException -> STM ()) -> Syncing a
Syncer forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
v)
  Maybe SomeException
mexc <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
readTMVar TMVar (Maybe SomeException)
v
  forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe SomeException
mexc (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO)