module Engine.Worker
  ( Versioned(..)
  , Var
  , newVar
  , readVar
  , stateVar
  , stateVarMap

  , HasInput(..)
  , pushInput
  , pushInputSTM
  , updateInput
  , updateInputSTM
  , getInputData
  , getInputDataSTM

  , HasConfig(..)
  , modifyConfig
  , modifyConfigSTM

  , HasOutput(..)
  , pushOutput
  , pushOutputSTM
  , updateOutput
  , updateOutputSTM
  , getOutputData
  , getOutputDataSTM

  , Cell
  , spawnCell

  , Timed(..)
  , spawnTimed
  , spawnTimed_

  , Merge(..)
  , spawnMerge1
  , spawnMerge2
  , spawnMerge3
  , spawnMerge4
  , spawnMergeT

  , ObserverIO
  , newObserverIO
  , observeIO
  , observeIO_
  , readObservedIO

  , Source
  , newSource
  , pubSource
  , subSource
  ) where

import RIO

import Control.Concurrent.Chan.Unagi.Unboxed (UnagiPrim)
import Control.Concurrent.Chan.Unagi.Unboxed qualified as UnagiPrim
import Data.Vector.Unboxed qualified as Unboxed
import UnliftIO.Concurrent (forkIO, killThread)
import UnliftIO.Resource (MonadResource)
import UnliftIO.Resource qualified as Resource
import Data.StateVar qualified as StateVar

data Versioned a = Versioned
  { forall a. Versioned a -> Vector Word64
vVersion :: Unboxed.Vector Word64
  , forall a. Versioned a -> a
vData    :: a
  }
  deriving (Int -> Versioned a -> ShowS
forall a. Show a => Int -> Versioned a -> ShowS
forall a. Show a => [Versioned a] -> ShowS
forall a. Show a => Versioned a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Versioned a] -> ShowS
$cshowList :: forall a. Show a => [Versioned a] -> ShowS
show :: Versioned a -> String
$cshow :: forall a. Show a => Versioned a -> String
showsPrec :: Int -> Versioned a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> Versioned a -> ShowS
Show, forall a b. a -> Versioned b -> Versioned a
forall a b. (a -> b) -> Versioned a -> Versioned b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Versioned b -> Versioned a
$c<$ :: forall a b. a -> Versioned b -> Versioned a
fmap :: forall a b. (a -> b) -> Versioned a -> Versioned b
$cfmap :: forall a b. (a -> b) -> Versioned a -> Versioned b
Functor, forall a. Eq a => a -> Versioned a -> Bool
forall a. Num a => Versioned a -> a
forall a. Ord a => Versioned a -> a
forall m. Monoid m => Versioned m -> m
forall a. Versioned a -> Bool
forall a. Versioned a -> Int
forall a. Versioned a -> [a]
forall a. (a -> a -> a) -> Versioned a -> a
forall m a. Monoid m => (a -> m) -> Versioned a -> m
forall b a. (b -> a -> b) -> b -> Versioned a -> b
forall a b. (a -> b -> b) -> b -> Versioned a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
product :: forall a. Num a => Versioned a -> a
$cproduct :: forall a. Num a => Versioned a -> a
sum :: forall a. Num a => Versioned a -> a
$csum :: forall a. Num a => Versioned a -> a
minimum :: forall a. Ord a => Versioned a -> a
$cminimum :: forall a. Ord a => Versioned a -> a
maximum :: forall a. Ord a => Versioned a -> a
$cmaximum :: forall a. Ord a => Versioned a -> a
elem :: forall a. Eq a => a -> Versioned a -> Bool
$celem :: forall a. Eq a => a -> Versioned a -> Bool
length :: forall a. Versioned a -> Int
$clength :: forall a. Versioned a -> Int
null :: forall a. Versioned a -> Bool
$cnull :: forall a. Versioned a -> Bool
toList :: forall a. Versioned a -> [a]
$ctoList :: forall a. Versioned a -> [a]
foldl1 :: forall a. (a -> a -> a) -> Versioned a -> a
$cfoldl1 :: forall a. (a -> a -> a) -> Versioned a -> a
foldr1 :: forall a. (a -> a -> a) -> Versioned a -> a
$cfoldr1 :: forall a. (a -> a -> a) -> Versioned a -> a
foldl' :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
$cfoldl' :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
foldl :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
$cfoldl :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
foldr' :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
$cfoldr' :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
foldr :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
$cfoldr :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
foldMap' :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
$cfoldMap' :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
foldMap :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
$cfoldMap :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
fold :: forall m. Monoid m => Versioned m -> m
$cfold :: forall m. Monoid m => Versioned m -> m
Foldable, Functor Versioned
Foldable Versioned
forall (t :: * -> *).
Functor t
-> Foldable t
-> (forall (f :: * -> *) a b.
    Applicative f =>
    (a -> f b) -> t a -> f (t b))
-> (forall (f :: * -> *) a. Applicative f => t (f a) -> f (t a))
-> (forall (m :: * -> *) a b.
    Monad m =>
    (a -> m b) -> t a -> m (t b))
-> (forall (m :: * -> *) a. Monad m => t (m a) -> m (t a))
-> Traversable t
forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
sequence :: forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
$csequence :: forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
$cmapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
sequenceA :: forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
$csequenceA :: forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
traverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
$ctraverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
Traversable)

instance Eq (Versioned a) where
  Versioned a
a == :: Versioned a -> Versioned a -> Bool
== Versioned a
b = forall a. Versioned a -> Vector Word64
vVersion Versioned a
a forall a. Eq a => a -> a -> Bool
== forall a. Versioned a -> Vector Word64
vVersion Versioned a
b

instance Ord (Versioned a) where
  compare :: Versioned a -> Versioned a -> Ordering
compare Versioned a
a Versioned a
b = forall a. Ord a => a -> a -> Ordering
compare (forall a. Versioned a -> Vector Word64
vVersion Versioned a
a) (forall a. Versioned a -> Vector Word64
vVersion Versioned a
b)

type Var a = TVar (Versioned a)

newVar :: MonadUnliftIO m => a -> m (Var a)
newVar :: forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar a
initial = forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Versioned
  { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a. Unbox a => a -> Vector a
Unboxed.singleton Word64
0
  , $sel:vData:Versioned :: a
vData    = a
initial
  }

stateVar :: HasInput var => var -> StateVar.StateVar (GetInput var)
stateVar :: forall var. HasInput var => var -> StateVar (GetInput var)
stateVar var
var = forall a. IO a -> (a -> IO ()) -> StateVar a
StateVar.makeStateVar
  (forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData var
var)
  (\GetInput var
new -> forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
var \GetInput var
_old -> GetInput var
new)

stateVarMap
  :: HasInput var
  => (GetInput var -> a)
  -> (a -> GetInput var -> GetInput var)
  -> var
  -> StateVar.StateVar a
stateVarMap :: forall var a.
HasInput var =>
(GetInput var -> a)
-> (a -> GetInput var -> GetInput var) -> var -> StateVar a
stateVarMap GetInput var -> a
mapGet a -> GetInput var -> GetInput var
mapSet var
var = forall a. IO a -> (a -> IO ()) -> StateVar a
StateVar.makeStateVar
  (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap GetInput var -> a
mapGet forall a b. (a -> b) -> a -> b
$ forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData var
var)
  (\a
new -> forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
var (a -> GetInput var -> GetInput var
mapSet a
new))

{-# INLINEABLE readVar #-}
readVar :: (MonadUnliftIO m) => Var a -> m a
readVar :: forall (m :: * -> *) a. MonadUnliftIO m => Var a -> m a
readVar = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO

{-# INLINEABLE pushVarSTM #-}
pushVarSTM :: Var a -> (a -> a) -> STM ()
pushVarSTM :: forall a. Var a -> (a -> a) -> STM ()
pushVarSTM Var a
var a -> a
f =
  forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Var a
var \Versioned{Vector Word64
vVersion :: Vector Word64
$sel:vVersion:Versioned :: forall a. Versioned a -> Vector Word64
vVersion, a
vData :: a
$sel:vData:Versioned :: forall a. Versioned a -> a
vData} ->
    Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (forall a. Num a => a -> a -> a
+ Word64
1) Vector Word64
vVersion
      , $sel:vData:Versioned :: a
vData = a -> a
f a
vData
      }

{-# INLINEABLE updateVarSTM #-}
updateVarSTM :: Var a -> (a -> Maybe a) -> STM ()
updateVarSTM :: forall a. Var a -> (a -> Maybe a) -> STM ()
updateVarSTM Var a
var a -> Maybe a
f = do
  Versioned a
current <- forall a. TVar a -> STM a
readTVar Var a
var
  let
    oldData :: a
oldData = forall a. Versioned a -> a
vData Versioned a
current
  case a -> Maybe a
f a
oldData of
    Maybe a
Nothing ->
      forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just a
newData ->
      forall a. TVar a -> a -> STM ()
writeTVar Var a
var Versioned
        { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (forall a. Num a => a -> a -> a
+Word64
1) (forall a. Versioned a -> Vector Word64
vVersion Versioned a
current)
        , $sel:vData:Versioned :: a
vData    = a
newData
        }

class HasInput a where
  type GetInput a
  getInput :: a -> Var (GetInput a)

instance HasInput (Var a) where
  type GetInput (Var a) = a
  getInput :: Var a -> Var (GetInput (Var a))
getInput = forall a. a -> a
id

instance HasInput a => HasInput (a, b) where
  type GetInput (a, b) = GetInput a
  getInput :: (a, b) -> Var (GetInput (a, b))
getInput = forall a. HasInput a => a -> Var (GetInput a)
getInput forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst

{-# INLINEABLE pushInput #-}
pushInput
  :: (MonadIO m, HasInput var)
  => var
  -> (GetInput var -> GetInput var)
  -> m ()
pushInput :: forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
input =
  forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasInput var =>
var -> (GetInput var -> GetInput var) -> STM ()
pushInputSTM var
input

{-# INLINEABLE pushInputSTM #-}
pushInputSTM
  :: HasInput var
  => var
  -> (GetInput var -> GetInput var)
  -> STM ()
pushInputSTM :: forall var.
HasInput var =>
var -> (GetInput var -> GetInput var) -> STM ()
pushInputSTM var
input = forall a. Var a -> (a -> a) -> STM ()
pushVarSTM (forall a. HasInput a => a -> Var (GetInput a)
getInput var
input)

{-# INLINEABLE updateInput #-}
updateInput
  :: ( MonadIO m
     , HasInput var
     )
  => var
  -> (GetInput var -> Maybe (GetInput var))
  -> m ()
updateInput :: forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> Maybe (GetInput var)) -> m ()
updateInput var
input =
  forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasInput var =>
var -> (GetInput var -> Maybe (GetInput var)) -> STM ()
updateInputSTM var
input

updateInputSTM
  :: HasInput var
  => var
  -> (GetInput var -> Maybe (GetInput var))
  -> STM ()
updateInputSTM :: forall var.
HasInput var =>
var -> (GetInput var -> Maybe (GetInput var)) -> STM ()
updateInputSTM var
input = forall a. Var a -> (a -> Maybe a) -> STM ()
updateVarSTM (forall a. HasInput a => a -> Var (GetInput a)
getInput var
input)

{-# INLINEABLE getInputData #-}
getInputData :: (HasInput worker, MonadIO m) => worker -> m (GetInput worker)
getInputData :: forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasInput a => a -> Var (GetInput a)
getInput

{-# INLINEABLE getInputDataSTM #-}
getInputDataSTM :: (HasInput worker) => worker -> STM (GetInput worker)
getInputDataSTM :: forall worker. HasInput worker => worker -> STM (GetInput worker)
getInputDataSTM = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TVar a -> STM a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasInput a => a -> Var (GetInput a)
getInput

class HasConfig a where
  type GetConfig a
  getConfig :: a -> TVar (GetConfig a)

instance HasConfig (TVar a) where
  type GetConfig (TVar a) = a
  getConfig :: TVar a -> TVar (GetConfig (TVar a))
getConfig = forall a. a -> a
id

modifyConfig
  :: (MonadIO m, HasConfig var)
  => var
  -> (GetConfig var -> GetConfig var)
  -> m ()
modifyConfig :: forall (m :: * -> *) var.
(MonadIO m, HasConfig var) =>
var -> (GetConfig var -> GetConfig var) -> m ()
modifyConfig var
config =
  forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasConfig var =>
var -> (GetConfig var -> GetConfig var) -> STM ()
modifyConfigSTM var
config

modifyConfigSTM
  :: HasConfig var
  => var
  -> (GetConfig var -> GetConfig var)
  -> STM ()
modifyConfigSTM :: forall var.
HasConfig var =>
var -> (GetConfig var -> GetConfig var) -> STM ()
modifyConfigSTM var
config =
  forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (forall a. HasConfig a => a -> TVar (GetConfig a)
getConfig var
config)

class HasOutput a where
  type GetOutput a
  getOutput  :: a -> Var (GetOutput a)

instance HasOutput (Var a) where
  type GetOutput (Var a) = a
  getOutput :: Var a -> Var (GetOutput (Var a))
getOutput = forall a. a -> a
id

instance HasOutput b => HasOutput (a, b) where
  type GetOutput (a, b) = GetOutput b
  getOutput :: (a, b) -> Var (GetOutput (a, b))
getOutput = forall a. HasOutput a => a -> Var (GetOutput a)
getOutput forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd

{-# INLINEABLE pushOutput #-}
pushOutput
  :: (MonadIO m, HasOutput var)
  => var
  -> (GetOutput var -> GetOutput var)
  -> m ()
pushOutput :: forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> GetOutput var) -> m ()
pushOutput var
output =
  forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasOutput var =>
var -> (GetOutput var -> GetOutput var) -> STM ()
pushOutputSTM var
output

pushOutputSTM
  :: HasOutput var
  => var
  -> (GetOutput var -> GetOutput var)
  -> STM ()
pushOutputSTM :: forall var.
HasOutput var =>
var -> (GetOutput var -> GetOutput var) -> STM ()
pushOutputSTM var
output GetOutput var -> GetOutput var
f =
  forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput var
output) \Versioned{Vector Word64
vVersion :: Vector Word64
$sel:vVersion:Versioned :: forall a. Versioned a -> Vector Word64
vVersion, GetOutput var
vData :: GetOutput var
$sel:vData:Versioned :: forall a. Versioned a -> a
vData} ->
    Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (forall a. Num a => a -> a -> a
+Word64
1) Vector Word64
vVersion
      , $sel:vData:Versioned :: GetOutput var
vData    = GetOutput var -> GetOutput var
f GetOutput var
vData
      }

{-# INLINEABLE updateOutput #-}
updateOutput
  :: (MonadIO m, HasOutput var)
  => var
  -> (GetOutput var -> Maybe (GetOutput var))
  -> m ()
updateOutput :: forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> m ()
updateOutput var
output =
  forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasOutput var =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> STM ()
updateOutputSTM var
output

updateOutputSTM
  :: HasOutput var
  => var
  -> (GetOutput var -> Maybe (GetOutput var))
  -> STM ()
updateOutputSTM :: forall var.
HasOutput var =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> STM ()
updateOutputSTM var
output GetOutput var -> Maybe (GetOutput var)
f = do
  Versioned (GetOutput var)
current <- forall a. TVar a -> STM a
readTVar Var (GetOutput var)
outputVar
  let
    oldData :: GetOutput var
oldData = forall a. Versioned a -> a
vData Versioned (GetOutput var)
current
  case GetOutput var -> Maybe (GetOutput var)
f GetOutput var
oldData of
    Maybe (GetOutput var)
Nothing ->
      forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just GetOutput var
newData ->
      forall a. TVar a -> a -> STM ()
writeTVar Var (GetOutput var)
outputVar Versioned
        { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (forall a. Num a => a -> a -> a
+Word64
1) (forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput var)
current)
        , $sel:vData:Versioned :: GetOutput var
vData    = GetOutput var
newData
        }
  where
    outputVar :: Var (GetOutput var)
outputVar = forall a. HasOutput a => a -> Var (GetOutput a)
getOutput var
output

{-# INLINEABLE getOutputData #-}
getOutputData :: (HasOutput worker, MonadIO m) => worker -> m (GetOutput worker)
getOutputData :: forall worker (m :: * -> *).
(HasOutput worker, MonadIO m) =>
worker -> m (GetOutput worker)
getOutputData = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasOutput a => a -> Var (GetOutput a)
getOutput

{-# INLINEABLE getOutputDataSTM #-}
getOutputDataSTM :: (HasOutput worker) => worker -> STM (GetOutput worker)
getOutputDataSTM :: forall worker. HasOutput worker => worker -> STM (GetOutput worker)
getOutputDataSTM = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TVar a -> STM a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasOutput a => a -> Var (GetOutput a)
getOutput

-- * Suppply

-- | Updatable cell for composite input or costly output.
type Cell input output = (Var input, Merge output)

spawnCell
  :: ( MonadUnliftIO m
     , MonadResource m
     )
  => (input -> output)
  -> input
  -> m (Cell input output)
spawnCell :: forall (m :: * -> *) input output.
(MonadUnliftIO m, MonadResource m) =>
(input -> output) -> input -> m (Cell input output)
spawnCell input -> output
f input
initialInput = do
  Var input
input <- forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar input
initialInput
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Var input
input,) forall a b. (a -> b) -> a -> b
$
    forall (m :: * -> *) i o.
(MonadUnliftIO m, MonadResource m, HasOutput i) =>
(GetOutput i -> o) -> i -> m (Merge o)
spawnMerge1 input -> output
f Var input
input

-- | Timer-driven stateful producer.
data Timed config output = Timed
  { forall config output. Timed config output -> ThreadId
tWorker :: ThreadId
  , forall config output. Timed config output -> ReleaseKey
tKey    :: Resource.ReleaseKey
  , forall config output. Timed config output -> TVar Bool
tActive :: TVar Bool
  , forall config output. Timed config output -> TVar config
tConfig :: TVar config
  , forall config output. Timed config output -> Var output
tOutput :: Var output
  }

instance HasConfig (Timed config output) where
  type GetConfig (Timed config output) = config
  getConfig :: Timed config output -> TVar (GetConfig (Timed config output))
getConfig = forall config output. Timed config output -> TVar config
tConfig

instance HasOutput (Timed config output) where
  type GetOutput (Timed config output) = output
  getOutput :: Timed config output -> Var (GetOutput (Timed config output))
getOutput = forall config output. Timed config output -> Var output
tOutput

spawnTimed_
  :: ( MonadUnliftIO m
     , MonadResource m
     )
  => Bool
  -> Int
  -> output
  -> m output
  -> m (Timed () output)
spawnTimed_ :: forall (m :: * -> *) output.
(MonadUnliftIO m, MonadResource m) =>
Bool -> Int -> output -> m output -> m (Timed () output)
spawnTimed_ Bool
startActive Int
dt output
initialOutput m output
stepF =
  forall (m :: * -> *) config output state.
(MonadUnliftIO m, MonadResource m) =>
Bool
-> Either Int (config -> Int)
-> (config -> m (output, state))
-> (state -> config -> m (Maybe output, state))
-> config
-> m (Timed config output)
spawnTimed
    Bool
startActive
    (forall a b. a -> Either a b
Left Int
dt)
    (\() -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (output
initialOutput, ()))
    (\()
_old () ->
        m output
stepF forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \output
res ->
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just output
res, ())
    )
    ()

spawnTimed
  :: ( MonadUnliftIO m
     , MonadResource m
     )
  => Bool
  -> Either Int (config -> Int)
  -> (config -> m (output, state))
  -> (state -> config -> m (Maybe output, state))
  -> config
  -> m (Timed config output)
spawnTimed :: forall (m :: * -> *) config output state.
(MonadUnliftIO m, MonadResource m) =>
Bool
-> Either Int (config -> Int)
-> (config -> m (output, state))
-> (state -> config -> m (Maybe output, state))
-> config
-> m (Timed config output)
spawnTimed Bool
startActive Either Int (config -> Int)
dtF config -> m (output, state)
initF state -> config -> m (Maybe output, state)
stepF config
initialConfig = do
  TVar Bool
tActive <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
startActive
  TVar config
tConfig <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO config
initialConfig
  (output
initialOutput, state
initialState) <- config -> m (output, state)
initF config
initialConfig
  Var output
tOutput <- forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar output
initialOutput
  ThreadId
tWorker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
    TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
tActive TVar config
tConfig Var output
tOutput state
initialState
  ReleaseKey
tKey <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
tWorker
  pure Timed{ThreadId
TVar config
TVar Bool
Var output
ReleaseKey
tKey :: ReleaseKey
tWorker :: ThreadId
tOutput :: Var output
tConfig :: TVar config
tActive :: TVar Bool
$sel:tOutput:Timed :: Var output
$sel:tConfig:Timed :: TVar config
$sel:tActive:Timed :: TVar Bool
$sel:tKey:Timed :: ReleaseKey
$sel:tWorker:Timed :: ThreadId
..}
  where
    step :: TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
curState = do
      case Either Int (config -> Int)
dtF of
        Left Int
static ->
          forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
static
        Right config -> Int
fromConfig ->
          forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar config
configVar forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
            forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay forall b c a. (b -> c) -> (a -> b) -> a -> c
. config -> Int
fromConfig

      Bool
active <- forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
activeVar
      if Bool
active then do
        config
config <- forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar config
configVar
        (Maybe output
nextOutput, state
nextState) <- state -> config -> m (Maybe output, state)
stepF state
curState config
config
        forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> m ()
updateOutput Var output
output forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const Maybe output
nextOutput
        TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
nextState
      else
        TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
curState

-- | Supply-driven step cell.
data Merge o = Merge
  { forall o. Merge o -> ThreadId
mWorker :: ThreadId
  , forall o. Merge o -> ReleaseKey
mKey    :: Resource.ReleaseKey
  , forall o. Merge o -> TVar (Versioned o)
mOutput :: TVar (Versioned o)
  }

instance HasOutput (Merge o) where
  type GetOutput (Merge o) = o
  getOutput :: Merge o -> Var (GetOutput (Merge o))
getOutput = forall o. Merge o -> TVar (Versioned o)
mOutput

spawnMerge1
  :: ( MonadUnliftIO m
     , MonadResource m
     , HasOutput i
     )
  => (GetOutput i -> o)
  -> i
  -> m (Merge o)
spawnMerge1 :: forall (m :: * -> *) i o.
(MonadUnliftIO m, MonadResource m, HasOutput i) =>
(GetOutput i -> o) -> i -> m (Merge o)
spawnMerge1 GetOutput i -> o
f i
i = do
  TVar (Versioned o)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    Versioned (GetOutput i)
initial <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i
i)
    forall a. a -> STM (TVar a)
newTVar Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput i)
initial
      , $sel:vData:Versioned :: o
vData    = GetOutput i -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i)
initial)
      }

  ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
    forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
      Versioned (GetOutput i)
next <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i
i)
      Versioned o
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output

      let nextVersion :: Vector Word64
nextVersion = forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput i)
next
      if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
        forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: o
vData    = GetOutput i -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i)
next)
          }
      else
        forall a. STM a
retrySTM

  ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker

  pure Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mKey:Merge :: ReleaseKey
mKey    = ReleaseKey
key
    , $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
    }

spawnMerge2
  :: ( MonadUnliftIO m
     , MonadResource m
     , HasOutput i1
     , HasOutput i2
     )
  => (GetOutput i1 -> GetOutput i2 -> o)
  -> i1
  -> i2
  -> m (Merge o)
spawnMerge2 :: forall (m :: * -> *) i1 i2 o.
(MonadUnliftIO m, MonadResource m, HasOutput i1, HasOutput i2) =>
(GetOutput i1 -> GetOutput i2 -> o) -> i1 -> i2 -> m (Merge o)
spawnMerge2 GetOutput i1 -> GetOutput i2 -> o
f i1
i1 i2
i2 = do
  TVar (Versioned o)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    (Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2) <- (,)
      forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)

    forall a. a -> STM (TVar a)
newTVar Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall {a} {a}. Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2
      , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2)
      }

  ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
    forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
      Versioned (GetOutput i1)
next1 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      Versioned (GetOutput i2)
next2 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      Versioned o
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output

      let nextVersion :: Vector Word64
nextVersion = forall {a} {a}. Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2

      if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
        forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2)
          }
      else
        forall a. STM a
retrySTM

  ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker

  pure Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mKey:Merge :: ReleaseKey
mKey    = ReleaseKey
key
    , $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
    }
  where
    mkVersion :: Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b = forall a. Versioned a -> Vector Word64
vVersion Versioned a
a forall a. Semigroup a => a -> a -> a
<> forall a. Versioned a -> Vector Word64
vVersion Versioned a
b

spawnMerge3
  :: ( MonadUnliftIO m
     , MonadResource m
     , HasOutput i1
     , HasOutput i2
     , HasOutput i3
     )
  => (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o)
  -> i1
  -> i2
  -> i3
  -> m (Merge o)
spawnMerge3 :: forall (m :: * -> *) i1 i2 i3 o.
(MonadUnliftIO m, MonadResource m, HasOutput i1, HasOutput i2,
 HasOutput i3) =>
(GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o)
-> i1 -> i2 -> i3 -> m (Merge o)
spawnMerge3 GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f i1
i1 i2
i2 i3
i3 = do
  TVar (Versioned o)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    (Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2, Versioned (GetOutput i3)
initial3) <- (,,)
      forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)

    forall a. a -> STM (TVar a)
newTVar Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall {a} {a} {a}.
Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2 Versioned (GetOutput i3)
initial3
      , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2) (forall a. Versioned a -> a
vData Versioned (GetOutput i3)
initial3)
      }

  ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
    forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
      Versioned (GetOutput i1)
next1 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      Versioned (GetOutput i2)
next2 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      Versioned (GetOutput i3)
next3 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
      Versioned o
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output

      let nextVersion :: Vector Word64
nextVersion = forall {a} {a} {a}.
Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2 Versioned (GetOutput i3)
next3

      if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
        forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2) (forall a. Versioned a -> a
vData Versioned (GetOutput i3)
next3)
          }
      else
        forall a. STM a
retrySTM

  ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker

  pure Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mKey:Merge :: ReleaseKey
mKey    = ReleaseKey
key
    , $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
    }
  where
    mkVersion :: Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b Versioned a
c = forall a. Monoid a => [a] -> a
mconcat [forall a. Versioned a -> Vector Word64
vVersion Versioned a
a, forall a. Versioned a -> Vector Word64
vVersion Versioned a
b, forall a. Versioned a -> Vector Word64
vVersion Versioned a
c]

spawnMerge4
  :: ( MonadUnliftIO m
     , MonadResource m
     , HasOutput i1
     , HasOutput i2
     , HasOutput i3
     , HasOutput i4
     )
  => (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o)
  -> i1
  -> i2
  -> i3
  -> i4
  -> m (Merge o)
spawnMerge4 :: forall (m :: * -> *) i1 i2 i3 i4 o.
(MonadUnliftIO m, MonadResource m, HasOutput i1, HasOutput i2,
 HasOutput i3, HasOutput i4) =>
(GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o)
-> i1 -> i2 -> i3 -> i4 -> m (Merge o)
spawnMerge4 GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f i1
i1 i2
i2 i3
i3 i4
i4 = do
  TVar (Versioned o)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    (Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2, Versioned (GetOutput i3)
initial3, Versioned (GetOutput i4)
initial4) <- (,,,)
      forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i4
i4)

    forall a. a -> STM (TVar a)
newTVar Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall {a} {a} {a} {a}.
Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2 Versioned (GetOutput i3)
initial3 Versioned (GetOutput i4)
initial4
      , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2) (forall a. Versioned a -> a
vData Versioned (GetOutput i3)
initial3) (forall a. Versioned a -> a
vData Versioned (GetOutput i4)
initial4)
      }

  ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
    forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
      Versioned (GetOutput i1)
next1 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      Versioned (GetOutput i2)
next2 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      Versioned (GetOutput i3)
next3 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
      Versioned (GetOutput i4)
next4 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i4
i4)
      Versioned o
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output

      let nextVersion :: Vector Word64
nextVersion = forall {a} {a} {a} {a}.
Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2 Versioned (GetOutput i3)
next3 Versioned (GetOutput i4)
next4

      if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
        forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2) (forall a. Versioned a -> a
vData Versioned (GetOutput i3)
next3) (forall a. Versioned a -> a
vData Versioned (GetOutput i4)
next4)
          }
      else
        forall a. STM a
retrySTM

  ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker

  pure Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mKey:Merge :: ReleaseKey
mKey    = ReleaseKey
key
    , $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
    }
  where
    mkVersion :: Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b Versioned a
c Versioned a
d = forall a. Monoid a => [a] -> a
mconcat [forall a. Versioned a -> Vector Word64
vVersion Versioned a
a, forall a. Versioned a -> Vector Word64
vVersion Versioned a
b, forall a. Versioned a -> Vector Word64
vVersion Versioned a
c, forall a. Versioned a -> Vector Word64
vVersion Versioned a
d]

{- |
  Spawn a merge over a homogeneous traversable collection of processes.

  A merging function will receive a collection of outputs to summarize.
-}
spawnMergeT
  :: ( Traversable t
     , HasOutput input
     , MonadUnliftIO m
     , MonadResource m
     )
  => (t (GetOutput input) -> output)
  -> t input
  -> m (Merge output)
spawnMergeT :: forall (t :: * -> *) input (m :: * -> *) output.
(Traversable t, HasOutput input, MonadUnliftIO m,
 MonadResource m) =>
(t (GetOutput input) -> output) -> t input -> m (Merge output)
spawnMergeT t (GetOutput input) -> output
f t input
inputs = do
  TVar (Versioned output)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    t (Versioned (GetOutput input))
initial <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a. TVar a -> STM a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasOutput a => a -> Var (GetOutput a)
getOutput) t input
inputs

    forall a. a -> STM (TVar a)
newTVar Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap forall a. Versioned a -> Vector Word64
vVersion t (Versioned (GetOutput input))
initial
      , $sel:vData:Versioned :: output
vData    = t (GetOutput input) -> output
f (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData t (Versioned (GetOutput input))
initial)
      }

  ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
    forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
      t (Versioned (GetOutput input))
next <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a. TVar a -> STM a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasOutput a => a -> Var (GetOutput a)
getOutput) t input
inputs
      Versioned output
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned output)
output

      let nextVersion :: Vector Word64
nextVersion = forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap forall a. Versioned a -> Vector Word64
vVersion t (Versioned (GetOutput input))
next

      if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned output
old then
        forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned output)
output Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: output
vData    = t (GetOutput input) -> output
f (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData t (Versioned (GetOutput input))
next)
          }
      else
        forall a. STM a
retrySTM

  ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker

  pure Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mKey:Merge :: ReleaseKey
mKey    = ReleaseKey
key
    , $sel:mOutput:Merge :: TVar (Versioned output)
mOutput = TVar (Versioned output)
output
    }

-- * Demand

type ObserverIO a = IORef (Versioned a)

newObserverIO :: MonadIO m => a -> m (ObserverIO a)
newObserverIO :: forall (m :: * -> *) a. MonadIO m => a -> m (ObserverIO a)
newObserverIO a
initial = forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef Versioned
  { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a. Monoid a => a
mempty
  , $sel:vData:Versioned :: a
vData    = a
initial
  }

observeIO
  :: (MonadUnliftIO m, HasOutput output)
  => output
  -> ObserverIO a
  -> (a -> GetOutput output -> m a)
  -> m a
observeIO :: forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a
observeIO output
output ObserverIO a
currentRef a -> GetOutput output -> m a
action = do
  Versioned (GetOutput output)
outputV <- forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput output
output)
  Versioned a
currentV <- forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef ObserverIO a
currentRef
  if forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput output)
outputV forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned a
currentV then do
    a
derived <- a -> GetOutput output -> m a
action (forall a. Versioned a -> a
vData Versioned a
currentV) (forall a. Versioned a -> a
vData Versioned (GetOutput output)
outputV)
    forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
atomicWriteIORef ObserverIO a
currentRef Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput output)
outputV
      , $sel:vData:Versioned :: a
vData    = a
derived
      }
    pure a
derived
  else
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Versioned a -> a
vData Versioned a
currentV)

observeIO_
  :: (MonadUnliftIO m, HasOutput output)
  => output
  -> ObserverIO a
  -> (a -> GetOutput output -> m a)
  -> m ()
observeIO_ :: forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m ()
observeIO_ output
output ObserverIO a
currentRef =
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a
observeIO output
output ObserverIO a
currentRef

{-# INLINEABLE readObservedIO #-}
readObservedIO :: (MonadUnliftIO m) => IORef (Versioned a) -> m a
readObservedIO :: forall (m :: * -> *) a.
MonadUnliftIO m =>
IORef (Versioned a) -> m a
readObservedIO = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef

-- * PubSub

newtype Source a = Source (UnagiPrim.InChan a)

newSource :: (MonadUnliftIO m, UnagiPrim a) => m (Source a)
newSource :: forall (m :: * -> *) a.
(MonadUnliftIO m, UnagiPrim a) =>
m (Source a)
newSource = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. InChan a -> Source a
Source forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. UnagiPrim a => IO (InChan a, OutChan a)
UnagiPrim.newChan

pubSource :: (MonadUnliftIO m, UnagiPrim a) => Source a -> a -> m ()
pubSource :: forall (m :: * -> *) a.
(MonadUnliftIO m, UnagiPrim a) =>
Source a -> a -> m ()
pubSource (Source InChan a
ic) = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. UnagiPrim a => InChan a -> a -> IO ()
UnagiPrim.writeChan InChan a
ic

subSource :: MonadUnliftIO m => Source a -> m (UnagiPrim.OutChan a)
subSource :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Source a -> m (OutChan a)
subSource (Source InChan a
ic) = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. InChan a -> IO (OutChan a)
UnagiPrim.dupChan InChan a
ic