module Engine.Worker
  ( Versioned(..)
  , Var
  , newVar
  , readVar
  , stateVar
  , stateVarMap

  , HasInput(..)
  , pushInput
  , pushInputSTM
  , updateInput
  , updateInputSTM
  , getInputData
  , getInputDataSTM

  , HasConfig(..)
  , modifyConfig
  , modifyConfigSTM

  , HasOutput(..)
  , pushOutput
  , pushOutputSTM
  , updateOutput
  , updateOutputSTM
  , getOutputData
  , getOutputDataSTM

  , Cell
  , spawnCell

  , Timed(..)
  , spawnTimed
  , spawnTimed_

  , Merge(..)
  , spawnMerge1
  , spawnMerge2
  , spawnMerge3
  , spawnMerge4
  , spawnMergeT

  , ObserverIO
  , newObserverIO
  , observeIO
  , observeIO_
  , readObservedIO

  , Source
  , newSource
  , pubSource
  , subSource

  , HasWorker(..)
  , register
  , registerCollection
  , registered
  , registeredCollection
  ) where

import RIO

import Control.Concurrent.Chan.Unagi.Unboxed (UnagiPrim)
import Control.Concurrent.Chan.Unagi.Unboxed qualified as UnagiPrim
import Data.Vector.Unboxed qualified as Unboxed
import UnliftIO.Concurrent (forkIO, killThread)
import UnliftIO.Resource (MonadResource)
import UnliftIO.Resource qualified as Resource
import Data.StateVar qualified as StateVar

data Versioned a = Versioned
  { forall a. Versioned a -> Vector Word64
vVersion :: Unboxed.Vector Word64
  , forall a. Versioned a -> a
vData    :: a
  }
  deriving (Int -> Versioned a -> ShowS
[Versioned a] -> ShowS
Versioned a -> String
(Int -> Versioned a -> ShowS)
-> (Versioned a -> String)
-> ([Versioned a] -> ShowS)
-> Show (Versioned a)
forall a. Show a => Int -> Versioned a -> ShowS
forall a. Show a => [Versioned a] -> ShowS
forall a. Show a => Versioned a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Versioned a] -> ShowS
$cshowList :: forall a. Show a => [Versioned a] -> ShowS
show :: Versioned a -> String
$cshow :: forall a. Show a => Versioned a -> String
showsPrec :: Int -> Versioned a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> Versioned a -> ShowS
Show, (forall a b. (a -> b) -> Versioned a -> Versioned b)
-> (forall a b. a -> Versioned b -> Versioned a)
-> Functor Versioned
forall a b. a -> Versioned b -> Versioned a
forall a b. (a -> b) -> Versioned a -> Versioned b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Versioned b -> Versioned a
$c<$ :: forall a b. a -> Versioned b -> Versioned a
fmap :: forall a b. (a -> b) -> Versioned a -> Versioned b
$cfmap :: forall a b. (a -> b) -> Versioned a -> Versioned b
Functor, (forall m. Monoid m => Versioned m -> m)
-> (forall m a. Monoid m => (a -> m) -> Versioned a -> m)
-> (forall m a. Monoid m => (a -> m) -> Versioned a -> m)
-> (forall a b. (a -> b -> b) -> b -> Versioned a -> b)
-> (forall a b. (a -> b -> b) -> b -> Versioned a -> b)
-> (forall b a. (b -> a -> b) -> b -> Versioned a -> b)
-> (forall b a. (b -> a -> b) -> b -> Versioned a -> b)
-> (forall a. (a -> a -> a) -> Versioned a -> a)
-> (forall a. (a -> a -> a) -> Versioned a -> a)
-> (forall a. Versioned a -> [a])
-> (forall a. Versioned a -> Bool)
-> (forall a. Versioned a -> Int)
-> (forall a. Eq a => a -> Versioned a -> Bool)
-> (forall a. Ord a => Versioned a -> a)
-> (forall a. Ord a => Versioned a -> a)
-> (forall a. Num a => Versioned a -> a)
-> (forall a. Num a => Versioned a -> a)
-> Foldable Versioned
forall a. Eq a => a -> Versioned a -> Bool
forall a. Num a => Versioned a -> a
forall a. Ord a => Versioned a -> a
forall m. Monoid m => Versioned m -> m
forall a. Versioned a -> Bool
forall a. Versioned a -> Int
forall a. Versioned a -> [a]
forall a. (a -> a -> a) -> Versioned a -> a
forall m a. Monoid m => (a -> m) -> Versioned a -> m
forall b a. (b -> a -> b) -> b -> Versioned a -> b
forall a b. (a -> b -> b) -> b -> Versioned a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
product :: forall a. Num a => Versioned a -> a
$cproduct :: forall a. Num a => Versioned a -> a
sum :: forall a. Num a => Versioned a -> a
$csum :: forall a. Num a => Versioned a -> a
minimum :: forall a. Ord a => Versioned a -> a
$cminimum :: forall a. Ord a => Versioned a -> a
maximum :: forall a. Ord a => Versioned a -> a
$cmaximum :: forall a. Ord a => Versioned a -> a
elem :: forall a. Eq a => a -> Versioned a -> Bool
$celem :: forall a. Eq a => a -> Versioned a -> Bool
length :: forall a. Versioned a -> Int
$clength :: forall a. Versioned a -> Int
null :: forall a. Versioned a -> Bool
$cnull :: forall a. Versioned a -> Bool
toList :: forall a. Versioned a -> [a]
$ctoList :: forall a. Versioned a -> [a]
foldl1 :: forall a. (a -> a -> a) -> Versioned a -> a
$cfoldl1 :: forall a. (a -> a -> a) -> Versioned a -> a
foldr1 :: forall a. (a -> a -> a) -> Versioned a -> a
$cfoldr1 :: forall a. (a -> a -> a) -> Versioned a -> a
foldl' :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
$cfoldl' :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
foldl :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
$cfoldl :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
foldr' :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
$cfoldr' :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
foldr :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
$cfoldr :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
foldMap' :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
$cfoldMap' :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
foldMap :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
$cfoldMap :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
fold :: forall m. Monoid m => Versioned m -> m
$cfold :: forall m. Monoid m => Versioned m -> m
Foldable, Functor Versioned
Foldable Versioned
Functor Versioned
-> Foldable Versioned
-> (forall (f :: * -> *) a b.
    Applicative f =>
    (a -> f b) -> Versioned a -> f (Versioned b))
-> (forall (f :: * -> *) a.
    Applicative f =>
    Versioned (f a) -> f (Versioned a))
-> (forall (m :: * -> *) a b.
    Monad m =>
    (a -> m b) -> Versioned a -> m (Versioned b))
-> (forall (m :: * -> *) a.
    Monad m =>
    Versioned (m a) -> m (Versioned a))
-> Traversable Versioned
forall (t :: * -> *).
Functor t
-> Foldable t
-> (forall (f :: * -> *) a b.
    Applicative f =>
    (a -> f b) -> t a -> f (t b))
-> (forall (f :: * -> *) a. Applicative f => t (f a) -> f (t a))
-> (forall (m :: * -> *) a b.
    Monad m =>
    (a -> m b) -> t a -> m (t b))
-> (forall (m :: * -> *) a. Monad m => t (m a) -> m (t a))
-> Traversable t
forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
sequence :: forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
$csequence :: forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
$cmapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
sequenceA :: forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
$csequenceA :: forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
traverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
$ctraverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
Traversable)

instance Eq (Versioned a) where
  Versioned a
a == :: Versioned a -> Versioned a -> Bool
== Versioned a
b = Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
a Vector Word64 -> Vector Word64 -> Bool
forall a. Eq a => a -> a -> Bool
== Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
b

instance Ord (Versioned a) where
  compare :: Versioned a -> Versioned a -> Ordering
compare Versioned a
a Versioned a
b = Vector Word64 -> Vector Word64 -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
a) (Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
b)

type Var a = TVar (Versioned a)

newVar :: MonadUnliftIO m => a -> m (Var a)
newVar :: forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar a
initial = Versioned a -> m (TVar (Versioned a))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
  { $sel:vVersion:Versioned :: Vector Word64
vVersion = Word64 -> Vector Word64
forall a. Unbox a => a -> Vector a
Unboxed.singleton Word64
0
  , $sel:vData:Versioned :: a
vData    = a
initial
  }

stateVar :: HasInput var => var -> StateVar.StateVar (GetInput var)
stateVar :: forall var. HasInput var => var -> StateVar (GetInput var)
stateVar var
var = IO (GetInput var)
-> (GetInput var -> IO ()) -> StateVar (GetInput var)
forall a. IO a -> (a -> IO ()) -> StateVar a
StateVar.makeStateVar
  (var -> IO (GetInput var)
forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData var
var)
  (\GetInput var
new -> var -> (GetInput var -> GetInput var) -> IO ()
forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
var \GetInput var
_old -> GetInput var
new)

stateVarMap
  :: HasInput var
  => (GetInput var -> a)
  -> (a -> GetInput var -> GetInput var)
  -> var
  -> StateVar.StateVar a
stateVarMap :: forall var a.
HasInput var =>
(GetInput var -> a)
-> (a -> GetInput var -> GetInput var) -> var -> StateVar a
stateVarMap GetInput var -> a
mapGet a -> GetInput var -> GetInput var
mapSet var
var = IO a -> (a -> IO ()) -> StateVar a
forall a. IO a -> (a -> IO ()) -> StateVar a
StateVar.makeStateVar
  ((GetInput var -> a) -> IO (GetInput var) -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap GetInput var -> a
mapGet (IO (GetInput var) -> IO a) -> IO (GetInput var) -> IO a
forall a b. (a -> b) -> a -> b
$ var -> IO (GetInput var)
forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData var
var)
  (\a
new -> var -> (GetInput var -> GetInput var) -> IO ()
forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
var (a -> GetInput var -> GetInput var
mapSet a
new))

{-# INLINEABLE readVar #-}
readVar :: (MonadUnliftIO m) => Var a -> m a
readVar :: forall (m :: * -> *) a. MonadUnliftIO m => Var a -> m a
readVar = (Versioned a -> a) -> m (Versioned a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Versioned a -> a
forall a. Versioned a -> a
vData (m (Versioned a) -> m a)
-> (TVar (Versioned a) -> m (Versioned a))
-> TVar (Versioned a)
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (Versioned a) -> m (Versioned a)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO

{-# INLINEABLE pushVarSTM #-}
pushVarSTM :: Var a -> (a -> a) -> STM ()
pushVarSTM :: forall a. Var a -> (a -> a) -> STM ()
pushVarSTM Var a
var a -> a
f =
  Var a -> (Versioned a -> Versioned a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Var a
var \Versioned{Vector Word64
vVersion :: Vector Word64
$sel:vVersion:Versioned :: forall a. Versioned a -> Vector Word64
vVersion, a
vData :: a
$sel:vData:Versioned :: forall a. Versioned a -> a
vData} ->
    Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = (Word64 -> Word64) -> Vector Word64 -> Vector Word64
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1) Vector Word64
vVersion
      , $sel:vData:Versioned :: a
vData = a -> a
f a
vData
      }

{-# INLINEABLE updateVarSTM #-}
updateVarSTM :: Var a -> (a -> Maybe a) -> STM ()
updateVarSTM :: forall a. Var a -> (a -> Maybe a) -> STM ()
updateVarSTM Var a
var a -> Maybe a
f = do
  Versioned a
current <- Var a -> STM (Versioned a)
forall a. TVar a -> STM a
readTVar Var a
var
  let
    oldData :: a
oldData = Versioned a -> a
forall a. Versioned a -> a
vData Versioned a
current
  case a -> Maybe a
f a
oldData of
    Maybe a
Nothing ->
      () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just a
newData ->
      Var a -> Versioned a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Var a
var Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
        { $sel:vVersion:Versioned :: Vector Word64
vVersion = (Word64 -> Word64) -> Vector Word64 -> Vector Word64
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+Word64
1) (Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
current)
        , $sel:vData:Versioned :: a
vData    = a
newData
        }

class HasInput a where
  type GetInput a
  getInput :: a -> Var (GetInput a)

instance HasInput (Var a) where
  type GetInput (Var a) = a
  getInput :: Var a -> Var (GetInput (Var a))
getInput = Var a -> Var (GetInput (Var a))
forall a. a -> a
id

instance HasInput a => HasInput (a, b) where
  type GetInput (a, b) = GetInput a
  getInput :: (a, b) -> Var (GetInput (a, b))
getInput = a -> Var (GetInput a)
forall a. HasInput a => a -> Var (GetInput a)
getInput (a -> Var (GetInput a))
-> ((a, b) -> a) -> (a, b) -> Var (GetInput a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, b) -> a
forall a b. (a, b) -> a
fst

{-# INLINEABLE pushInput #-}
pushInput
  :: (MonadIO m, HasInput var)
  => var
  -> (GetInput var -> GetInput var)
  -> m ()
pushInput :: forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
input =
  STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((GetInput var -> GetInput var) -> STM ())
-> (GetInput var -> GetInput var)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. var -> (GetInput var -> GetInput var) -> STM ()
forall var.
HasInput var =>
var -> (GetInput var -> GetInput var) -> STM ()
pushInputSTM var
input

{-# INLINEABLE pushInputSTM #-}
pushInputSTM
  :: HasInput var
  => var
  -> (GetInput var -> GetInput var)
  -> STM ()
pushInputSTM :: forall var.
HasInput var =>
var -> (GetInput var -> GetInput var) -> STM ()
pushInputSTM var
input = Var (GetInput var) -> (GetInput var -> GetInput var) -> STM ()
forall a. Var a -> (a -> a) -> STM ()
pushVarSTM (var -> Var (GetInput var)
forall a. HasInput a => a -> Var (GetInput a)
getInput var
input)

{-# INLINEABLE updateInput #-}
updateInput
  :: ( MonadIO m
     , HasInput var
     )
  => var
  -> (GetInput var -> Maybe (GetInput var))
  -> m ()
updateInput :: forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> Maybe (GetInput var)) -> m ()
updateInput var
input =
  STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((GetInput var -> Maybe (GetInput var)) -> STM ())
-> (GetInput var -> Maybe (GetInput var))
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. var -> (GetInput var -> Maybe (GetInput var)) -> STM ()
forall var.
HasInput var =>
var -> (GetInput var -> Maybe (GetInput var)) -> STM ()
updateInputSTM var
input

updateInputSTM
  :: HasInput var
  => var
  -> (GetInput var -> Maybe (GetInput var))
  -> STM ()
updateInputSTM :: forall var.
HasInput var =>
var -> (GetInput var -> Maybe (GetInput var)) -> STM ()
updateInputSTM var
input = Var (GetInput var)
-> (GetInput var -> Maybe (GetInput var)) -> STM ()
forall a. Var a -> (a -> Maybe a) -> STM ()
updateVarSTM (var -> Var (GetInput var)
forall a. HasInput a => a -> Var (GetInput a)
getInput var
input)

{-# INLINEABLE getInputData #-}
getInputData :: (HasInput worker, MonadIO m) => worker -> m (GetInput worker)
getInputData :: forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData = (Versioned (GetInput worker) -> GetInput worker)
-> m (Versioned (GetInput worker)) -> m (GetInput worker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Versioned (GetInput worker) -> GetInput worker
forall a. Versioned a -> a
vData (m (Versioned (GetInput worker)) -> m (GetInput worker))
-> (worker -> m (Versioned (GetInput worker)))
-> worker
-> m (GetInput worker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (Versioned (GetInput worker))
-> m (Versioned (GetInput worker))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar (Versioned (GetInput worker))
 -> m (Versioned (GetInput worker)))
-> (worker -> TVar (Versioned (GetInput worker)))
-> worker
-> m (Versioned (GetInput worker))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. worker -> TVar (Versioned (GetInput worker))
forall a. HasInput a => a -> Var (GetInput a)
getInput

{-# INLINEABLE getInputDataSTM #-}
getInputDataSTM :: (HasInput worker) => worker -> STM (GetInput worker)
getInputDataSTM :: forall worker. HasInput worker => worker -> STM (GetInput worker)
getInputDataSTM = (Versioned (GetInput worker) -> GetInput worker)
-> STM (Versioned (GetInput worker)) -> STM (GetInput worker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Versioned (GetInput worker) -> GetInput worker
forall a. Versioned a -> a
vData (STM (Versioned (GetInput worker)) -> STM (GetInput worker))
-> (worker -> STM (Versioned (GetInput worker)))
-> worker
-> STM (GetInput worker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (Versioned (GetInput worker))
-> STM (Versioned (GetInput worker))
forall a. TVar a -> STM a
readTVar (TVar (Versioned (GetInput worker))
 -> STM (Versioned (GetInput worker)))
-> (worker -> TVar (Versioned (GetInput worker)))
-> worker
-> STM (Versioned (GetInput worker))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. worker -> TVar (Versioned (GetInput worker))
forall a. HasInput a => a -> Var (GetInput a)
getInput

class HasConfig a where
  type GetConfig a
  getConfig :: a -> TVar (GetConfig a)

instance HasConfig (TVar a) where
  type GetConfig (TVar a) = a
  getConfig :: TVar a -> TVar (GetConfig (TVar a))
getConfig = TVar a -> TVar (GetConfig (TVar a))
forall a. a -> a
id

modifyConfig
  :: (MonadIO m, HasConfig var)
  => var
  -> (GetConfig var -> GetConfig var)
  -> m ()
modifyConfig :: forall (m :: * -> *) var.
(MonadIO m, HasConfig var) =>
var -> (GetConfig var -> GetConfig var) -> m ()
modifyConfig var
config =
  STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((GetConfig var -> GetConfig var) -> STM ())
-> (GetConfig var -> GetConfig var)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. var -> (GetConfig var -> GetConfig var) -> STM ()
forall var.
HasConfig var =>
var -> (GetConfig var -> GetConfig var) -> STM ()
modifyConfigSTM var
config

modifyConfigSTM
  :: HasConfig var
  => var
  -> (GetConfig var -> GetConfig var)
  -> STM ()
modifyConfigSTM :: forall var.
HasConfig var =>
var -> (GetConfig var -> GetConfig var) -> STM ()
modifyConfigSTM var
config =
  TVar (GetConfig var) -> (GetConfig var -> GetConfig var) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (var -> TVar (GetConfig var)
forall a. HasConfig a => a -> TVar (GetConfig a)
getConfig var
config)

class HasOutput a where
  type GetOutput a
  getOutput  :: a -> Var (GetOutput a)

instance HasOutput (Var a) where
  type GetOutput (Var a) = a
  getOutput :: Var a -> Var (GetOutput (Var a))
getOutput = Var a -> Var (GetOutput (Var a))
forall a. a -> a
id

instance HasOutput b => HasOutput (a, b) where
  type GetOutput (a, b) = GetOutput b
  getOutput :: (a, b) -> Var (GetOutput (a, b))
getOutput = b -> Var (GetOutput b)
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput (b -> Var (GetOutput b))
-> ((a, b) -> b) -> (a, b) -> Var (GetOutput b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, b) -> b
forall a b. (a, b) -> b
snd

{-# INLINEABLE pushOutput #-}
pushOutput
  :: (MonadIO m, HasOutput var)
  => var
  -> (GetOutput var -> GetOutput var)
  -> m ()
pushOutput :: forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> GetOutput var) -> m ()
pushOutput var
output =
  STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((GetOutput var -> GetOutput var) -> STM ())
-> (GetOutput var -> GetOutput var)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. var -> (GetOutput var -> GetOutput var) -> STM ()
forall var.
HasOutput var =>
var -> (GetOutput var -> GetOutput var) -> STM ()
pushOutputSTM var
output

pushOutputSTM
  :: HasOutput var
  => var
  -> (GetOutput var -> GetOutput var)
  -> STM ()
pushOutputSTM :: forall var.
HasOutput var =>
var -> (GetOutput var -> GetOutput var) -> STM ()
pushOutputSTM var
output GetOutput var -> GetOutput var
f =
  TVar (Versioned (GetOutput var))
-> (Versioned (GetOutput var) -> Versioned (GetOutput var))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (var -> TVar (Versioned (GetOutput var))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput var
output) \Versioned{Vector Word64
vVersion :: Vector Word64
$sel:vVersion:Versioned :: forall a. Versioned a -> Vector Word64
vVersion, GetOutput var
vData :: GetOutput var
$sel:vData:Versioned :: forall a. Versioned a -> a
vData} ->
    Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = (Word64 -> Word64) -> Vector Word64 -> Vector Word64
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+Word64
1) Vector Word64
vVersion
      , $sel:vData:Versioned :: GetOutput var
vData    = GetOutput var -> GetOutput var
f GetOutput var
vData
      }

{-# INLINEABLE updateOutput #-}
updateOutput
  :: (MonadIO m, HasOutput var)
  => var
  -> (GetOutput var -> Maybe (GetOutput var))
  -> m ()
updateOutput :: forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> m ()
updateOutput var
output =
  STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((GetOutput var -> Maybe (GetOutput var)) -> STM ())
-> (GetOutput var -> Maybe (GetOutput var))
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. var -> (GetOutput var -> Maybe (GetOutput var)) -> STM ()
forall var.
HasOutput var =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> STM ()
updateOutputSTM var
output

updateOutputSTM
  :: HasOutput var
  => var
  -> (GetOutput var -> Maybe (GetOutput var))
  -> STM ()
updateOutputSTM :: forall var.
HasOutput var =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> STM ()
updateOutputSTM var
output GetOutput var -> Maybe (GetOutput var)
f = do
  Versioned (GetOutput var)
current <- TVar (Versioned (GetOutput var)) -> STM (Versioned (GetOutput var))
forall a. TVar a -> STM a
readTVar TVar (Versioned (GetOutput var))
outputVar
  let
    oldData :: GetOutput var
oldData = Versioned (GetOutput var) -> GetOutput var
forall a. Versioned a -> a
vData Versioned (GetOutput var)
current
  case GetOutput var -> Maybe (GetOutput var)
f GetOutput var
oldData of
    Maybe (GetOutput var)
Nothing ->
      () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just GetOutput var
newData ->
      TVar (Versioned (GetOutput var))
-> Versioned (GetOutput var) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned (GetOutput var))
outputVar Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
        { $sel:vVersion:Versioned :: Vector Word64
vVersion = (Word64 -> Word64) -> Vector Word64 -> Vector Word64
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+Word64
1) (Versioned (GetOutput var) -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput var)
current)
        , $sel:vData:Versioned :: GetOutput var
vData    = GetOutput var
newData
        }
  where
    outputVar :: TVar (Versioned (GetOutput var))
outputVar = var -> TVar (Versioned (GetOutput var))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput var
output

{-# INLINEABLE getOutputData #-}
getOutputData :: (HasOutput worker, MonadIO m) => worker -> m (GetOutput worker)
getOutputData :: forall worker (m :: * -> *).
(HasOutput worker, MonadIO m) =>
worker -> m (GetOutput worker)
getOutputData = (Versioned (GetOutput worker) -> GetOutput worker)
-> m (Versioned (GetOutput worker)) -> m (GetOutput worker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Versioned (GetOutput worker) -> GetOutput worker
forall a. Versioned a -> a
vData (m (Versioned (GetOutput worker)) -> m (GetOutput worker))
-> (worker -> m (Versioned (GetOutput worker)))
-> worker
-> m (GetOutput worker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (Versioned (GetOutput worker))
-> m (Versioned (GetOutput worker))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar (Versioned (GetOutput worker))
 -> m (Versioned (GetOutput worker)))
-> (worker -> TVar (Versioned (GetOutput worker)))
-> worker
-> m (Versioned (GetOutput worker))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. worker -> TVar (Versioned (GetOutput worker))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput

{-# INLINEABLE getOutputDataSTM #-}
getOutputDataSTM :: (HasOutput worker) => worker -> STM (GetOutput worker)
getOutputDataSTM :: forall worker. HasOutput worker => worker -> STM (GetOutput worker)
getOutputDataSTM = (Versioned (GetOutput worker) -> GetOutput worker)
-> STM (Versioned (GetOutput worker)) -> STM (GetOutput worker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Versioned (GetOutput worker) -> GetOutput worker
forall a. Versioned a -> a
vData (STM (Versioned (GetOutput worker)) -> STM (GetOutput worker))
-> (worker -> STM (Versioned (GetOutput worker)))
-> worker
-> STM (GetOutput worker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (Versioned (GetOutput worker))
-> STM (Versioned (GetOutput worker))
forall a. TVar a -> STM a
readTVar (TVar (Versioned (GetOutput worker))
 -> STM (Versioned (GetOutput worker)))
-> (worker -> TVar (Versioned (GetOutput worker)))
-> worker
-> STM (Versioned (GetOutput worker))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. worker -> TVar (Versioned (GetOutput worker))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput

-- * Suppply

-- | Updatable cell for composite input or costly output.
type Cell input output = (Var input, Merge output)

spawnCell
  :: MonadUnliftIO m
  => (input -> output)
  -> input
  -> m (Cell input output)
spawnCell :: forall (m :: * -> *) input output.
MonadUnliftIO m =>
(input -> output) -> input -> m (Cell input output)
spawnCell input -> output
f input
initialInput = do
  Var input
input <- input -> m (Var input)
forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar input
initialInput
  (Merge output -> Cell input output)
-> m (Merge output) -> m (Cell input output)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Var input
input,) (m (Merge output) -> m (Cell input output))
-> m (Merge output) -> m (Cell input output)
forall a b. (a -> b) -> a -> b
$
    (GetOutput (Var input) -> output) -> Var input -> m (Merge output)
forall (m :: * -> *) i o.
(MonadUnliftIO m, HasOutput i) =>
(GetOutput i -> o) -> i -> m (Merge o)
spawnMerge1 input -> output
GetOutput (Var input) -> output
f Var input
input

-- | Timer-driven stateful producer.
data Timed config output = Timed
  { forall config output. Timed config output -> ThreadId
tWorker :: ThreadId
  , forall config output. Timed config output -> TVar Bool
tActive :: TVar Bool
  , forall config output. Timed config output -> TVar config
tConfig :: TVar config
  , forall config output. Timed config output -> Var output
tOutput :: Var output
  }

instance HasConfig (Timed config output) where
  type GetConfig (Timed config output) = config
  getConfig :: Timed config output -> TVar (GetConfig (Timed config output))
getConfig = Timed config output -> TVar (GetConfig (Timed config output))
forall config output. Timed config output -> TVar config
tConfig

instance HasOutput (Timed config output) where
  type GetOutput (Timed config output) = output
  getOutput :: Timed config output -> Var (GetOutput (Timed config output))
getOutput = Timed config output -> Var (GetOutput (Timed config output))
forall config output. Timed config output -> Var output
tOutput

spawnTimed_
  :: MonadUnliftIO m
  => Bool
  -> Int
  -> output
  -> m output
  -> m (Timed () output)
spawnTimed_ :: forall (m :: * -> *) output.
MonadUnliftIO m =>
Bool -> Int -> output -> m output -> m (Timed () output)
spawnTimed_ Bool
startActive Int
dt output
initialOutput m output
stepF =
  Bool
-> Either Int (() -> Int)
-> (() -> m (output, ()))
-> (() -> () -> m (Maybe output, ()))
-> ()
-> m (Timed () output)
forall (m :: * -> *) config output state.
MonadUnliftIO m =>
Bool
-> Either Int (config -> Int)
-> (config -> m (output, state))
-> (state -> config -> m (Maybe output, state))
-> config
-> m (Timed config output)
spawnTimed
    Bool
startActive
    (Int -> Either Int (() -> Int)
forall a b. a -> Either a b
Left Int
dt)
    (\() -> (output, ()) -> m (output, ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (output
initialOutput, ()))
    (\()
_old () ->
        m output
stepF m output
-> (output -> m (Maybe output, ())) -> m (Maybe output, ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \output
res ->
          (Maybe output, ()) -> m (Maybe output, ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (output -> Maybe output
forall a. a -> Maybe a
Just output
res, ())
    )
    ()

spawnTimed
  :: MonadUnliftIO m
  => Bool
  -> Either Int (config -> Int)
  -> (config -> m (output, state))
  -> (state -> config -> m (Maybe output, state))
  -> config
  -> m (Timed config output)
spawnTimed :: forall (m :: * -> *) config output state.
MonadUnliftIO m =>
Bool
-> Either Int (config -> Int)
-> (config -> m (output, state))
-> (state -> config -> m (Maybe output, state))
-> config
-> m (Timed config output)
spawnTimed Bool
startActive Either Int (config -> Int)
dtF config -> m (output, state)
initF state -> config -> m (Maybe output, state)
stepF config
initialConfig = do
  TVar Bool
tActive <- Bool -> m (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
startActive
  TVar config
tConfig <- config -> m (TVar config)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO config
initialConfig
  (output
initialOutput, state
initialState) <- config -> m (output, state)
initF config
initialConfig
  Var output
tOutput <- output -> m (Var output)
forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar output
initialOutput
  ThreadId
tWorker <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
    TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
tActive TVar config
tConfig Var output
tOutput state
initialState
  pure Timed :: forall config output.
ThreadId
-> TVar Bool -> TVar config -> Var output -> Timed config output
Timed{ThreadId
TVar config
TVar Bool
Var output
tWorker :: ThreadId
tOutput :: Var output
tConfig :: TVar config
tActive :: TVar Bool
$sel:tOutput:Timed :: Var output
$sel:tConfig:Timed :: TVar config
$sel:tActive:Timed :: TVar Bool
$sel:tWorker:Timed :: ThreadId
..}
  where
    step :: TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
curState = do
      case Either Int (config -> Int)
dtF of
        Left Int
static ->
          Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
static
        Right config -> Int
fromConfig ->
          TVar config -> m config
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar config
configVar m config -> (config -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
            Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int -> m ()) -> (config -> Int) -> config -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. config -> Int
fromConfig

      Bool
active <- TVar Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
activeVar
      if Bool
active then do
        config
config <- TVar config -> m config
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar config
configVar
        (Maybe output
nextOutput, state
nextState) <- state -> config -> m (Maybe output, state)
stepF state
curState config
config
        Var output
-> (GetOutput (Var output) -> Maybe (GetOutput (Var output)))
-> m ()
forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> m ()
updateOutput Var output
output ((GetOutput (Var output) -> Maybe (GetOutput (Var output)))
 -> m ())
-> (GetOutput (Var output) -> Maybe (GetOutput (Var output)))
-> m ()
forall a b. (a -> b) -> a -> b
$ Maybe output -> output -> Maybe output
forall a b. a -> b -> a
const Maybe output
nextOutput
        TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
nextState
      else
        TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
curState

-- | Supply-driven step cell.
data Merge o = Merge
  { forall o. Merge o -> ThreadId
mWorker   :: ThreadId
  , forall o. Merge o -> TVar (Versioned o)
mOutput   :: TVar (Versioned o)
  }

instance HasOutput (Merge o) where
  type GetOutput (Merge o) = o
  getOutput :: Merge o -> Var (GetOutput (Merge o))
getOutput = Merge o -> Var (GetOutput (Merge o))
forall o. Merge o -> TVar (Versioned o)
mOutput

spawnMerge1
  :: ( MonadUnliftIO m
     , HasOutput i
     )
  => (GetOutput i -> o)
  -> i
  -> m (Merge o)
spawnMerge1 :: forall (m :: * -> *) i o.
(MonadUnliftIO m, HasOutput i) =>
(GetOutput i -> o) -> i -> m (Merge o)
spawnMerge1 GetOutput i -> o
f i
i = do
  TVar (Versioned o)
output <- STM (TVar (Versioned o)) -> m (TVar (Versioned o))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    Versioned (GetOutput i)
initial <- TVar (Versioned (GetOutput i)) -> STM (Versioned (GetOutput i))
forall a. TVar a -> STM a
readTVar (i -> TVar (Versioned (GetOutput i))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i
i)
    Versioned o -> STM (TVar (Versioned o))
forall a. a -> STM (TVar a)
newTVar Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = Versioned (GetOutput i) -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput i)
initial
      , $sel:vData:Versioned :: o
vData    = GetOutput i -> o
f (Versioned (GetOutput i) -> GetOutput i
forall a. Versioned a -> a
vData Versioned (GetOutput i)
initial)
      }

  ThreadId
worker <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
      Versioned (GetOutput i)
next <- TVar (Versioned (GetOutput i)) -> STM (Versioned (GetOutput i))
forall a. TVar a -> STM a
readTVar (i -> TVar (Versioned (GetOutput i))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i
i)
      Versioned o
old <- TVar (Versioned o) -> STM (Versioned o)
forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output

      let nextVersion :: Vector Word64
nextVersion = Versioned (GetOutput i) -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput i)
next
      if Vector Word64
nextVersion Vector Word64 -> Vector Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Versioned o -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
        TVar (Versioned o) -> Versioned o -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: o
vData    = GetOutput i -> o
f (Versioned (GetOutput i) -> GetOutput i
forall a. Versioned a -> a
vData Versioned (GetOutput i)
next)
          }
      else
        STM ()
forall a. STM a
retrySTM

  pure Merge :: forall o. ThreadId -> TVar (Versioned o) -> Merge o
Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
    }

spawnMerge2
  :: ( MonadUnliftIO m
     , HasOutput i1
     , HasOutput i2
     )
  => (GetOutput i1 -> GetOutput i2 -> o)
  -> i1
  -> i2
  -> m (Merge o)
spawnMerge2 :: forall (m :: * -> *) i1 i2 o.
(MonadUnliftIO m, HasOutput i1, HasOutput i2) =>
(GetOutput i1 -> GetOutput i2 -> o) -> i1 -> i2 -> m (Merge o)
spawnMerge2 GetOutput i1 -> GetOutput i2 -> o
f i1
i1 i2
i2 = do
  TVar (Versioned o)
output <- STM (TVar (Versioned o)) -> m (TVar (Versioned o))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    (Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2) <- (,)
      (Versioned (GetOutput i1)
 -> Versioned (GetOutput i2)
 -> (Versioned (GetOutput i1), Versioned (GetOutput i2)))
-> STM (Versioned (GetOutput i1))
-> STM
     (Versioned (GetOutput i2)
      -> (Versioned (GetOutput i1), Versioned (GetOutput i2)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Versioned (GetOutput i1)) -> STM (Versioned (GetOutput i1))
forall a. TVar a -> STM a
readTVar (i1 -> TVar (Versioned (GetOutput i1))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      STM
  (Versioned (GetOutput i2)
   -> (Versioned (GetOutput i1), Versioned (GetOutput i2)))
-> STM (Versioned (GetOutput i2))
-> STM (Versioned (GetOutput i1), Versioned (GetOutput i2))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar (Versioned (GetOutput i2)) -> STM (Versioned (GetOutput i2))
forall a. TVar a -> STM a
readTVar (i2 -> TVar (Versioned (GetOutput i2))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)

    Versioned o -> STM (TVar (Versioned o))
forall a. a -> STM (TVar a)
newTVar Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = Versioned (GetOutput i1)
-> Versioned (GetOutput i2) -> Vector Word64
forall {a} {a}. Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2
      , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> o
f (Versioned (GetOutput i1) -> GetOutput i1
forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (Versioned (GetOutput i2) -> GetOutput i2
forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2)
      }

  ThreadId
worker <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Versioned (GetOutput i1)
next1 <- TVar (Versioned (GetOutput i1)) -> STM (Versioned (GetOutput i1))
forall a. TVar a -> STM a
readTVar (i1 -> TVar (Versioned (GetOutput i1))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      Versioned (GetOutput i2)
next2 <- TVar (Versioned (GetOutput i2)) -> STM (Versioned (GetOutput i2))
forall a. TVar a -> STM a
readTVar (i2 -> TVar (Versioned (GetOutput i2))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      Versioned o
old <- TVar (Versioned o) -> STM (Versioned o)
forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output

      let nextVersion :: Vector Word64
nextVersion = Versioned (GetOutput i1)
-> Versioned (GetOutput i2) -> Vector Word64
forall {a} {a}. Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2

      if Vector Word64
nextVersion Vector Word64 -> Vector Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Versioned o -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
        TVar (Versioned o) -> Versioned o -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> o
f (Versioned (GetOutput i1) -> GetOutput i1
forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (Versioned (GetOutput i2) -> GetOutput i2
forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2)
          }
      else
        STM ()
forall a. STM a
retrySTM

  pure Merge :: forall o. ThreadId -> TVar (Versioned o) -> Merge o
Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
    }
  where
    mkVersion :: Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b = Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
a Vector Word64 -> Vector Word64 -> Vector Word64
forall a. Semigroup a => a -> a -> a
<> Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
b

spawnMerge3
  :: ( MonadUnliftIO m
     , HasOutput i1
     , HasOutput i2
     , HasOutput i3
     )
  => (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o)
  -> i1
  -> i2
  -> i3
  -> m (Merge o)
spawnMerge3 :: forall (m :: * -> *) i1 i2 i3 o.
(MonadUnliftIO m, HasOutput i1, HasOutput i2, HasOutput i3) =>
(GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o)
-> i1 -> i2 -> i3 -> m (Merge o)
spawnMerge3 GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f i1
i1 i2
i2 i3
i3 = do
  TVar (Versioned o)
output <- STM (TVar (Versioned o)) -> m (TVar (Versioned o))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    (Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2, Versioned (GetOutput i3)
initial3) <- (,,)
      (Versioned (GetOutput i1)
 -> Versioned (GetOutput i2)
 -> Versioned (GetOutput i3)
 -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
     Versioned (GetOutput i3)))
-> STM (Versioned (GetOutput i1))
-> STM
     (Versioned (GetOutput i2)
      -> Versioned (GetOutput i3)
      -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
          Versioned (GetOutput i3)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Versioned (GetOutput i1)) -> STM (Versioned (GetOutput i1))
forall a. TVar a -> STM a
readTVar (i1 -> TVar (Versioned (GetOutput i1))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      STM
  (Versioned (GetOutput i2)
   -> Versioned (GetOutput i3)
   -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
       Versioned (GetOutput i3)))
-> STM (Versioned (GetOutput i2))
-> STM
     (Versioned (GetOutput i3)
      -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
          Versioned (GetOutput i3)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar (Versioned (GetOutput i2)) -> STM (Versioned (GetOutput i2))
forall a. TVar a -> STM a
readTVar (i2 -> TVar (Versioned (GetOutput i2))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      STM
  (Versioned (GetOutput i3)
   -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
       Versioned (GetOutput i3)))
-> STM (Versioned (GetOutput i3))
-> STM
     (Versioned (GetOutput i1), Versioned (GetOutput i2),
      Versioned (GetOutput i3))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar (Versioned (GetOutput i3)) -> STM (Versioned (GetOutput i3))
forall a. TVar a -> STM a
readTVar (i3 -> TVar (Versioned (GetOutput i3))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)

    Versioned o -> STM (TVar (Versioned o))
forall a. a -> STM (TVar a)
newTVar Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = Versioned (GetOutput i1)
-> Versioned (GetOutput i2)
-> Versioned (GetOutput i3)
-> Vector Word64
forall {a} {a} {a}.
Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2 Versioned (GetOutput i3)
initial3
      , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f (Versioned (GetOutput i1) -> GetOutput i1
forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (Versioned (GetOutput i2) -> GetOutput i2
forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2) (Versioned (GetOutput i3) -> GetOutput i3
forall a. Versioned a -> a
vData Versioned (GetOutput i3)
initial3)
      }

  ThreadId
worker <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Versioned (GetOutput i1)
next1 <- TVar (Versioned (GetOutput i1)) -> STM (Versioned (GetOutput i1))
forall a. TVar a -> STM a
readTVar (i1 -> TVar (Versioned (GetOutput i1))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      Versioned (GetOutput i2)
next2 <- TVar (Versioned (GetOutput i2)) -> STM (Versioned (GetOutput i2))
forall a. TVar a -> STM a
readTVar (i2 -> TVar (Versioned (GetOutput i2))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      Versioned (GetOutput i3)
next3 <- TVar (Versioned (GetOutput i3)) -> STM (Versioned (GetOutput i3))
forall a. TVar a -> STM a
readTVar (i3 -> TVar (Versioned (GetOutput i3))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
      Versioned o
old <- TVar (Versioned o) -> STM (Versioned o)
forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output

      let nextVersion :: Vector Word64
nextVersion = Versioned (GetOutput i1)
-> Versioned (GetOutput i2)
-> Versioned (GetOutput i3)
-> Vector Word64
forall {a} {a} {a}.
Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2 Versioned (GetOutput i3)
next3

      if Vector Word64
nextVersion Vector Word64 -> Vector Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Versioned o -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
        TVar (Versioned o) -> Versioned o -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f (Versioned (GetOutput i1) -> GetOutput i1
forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (Versioned (GetOutput i2) -> GetOutput i2
forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2) (Versioned (GetOutput i3) -> GetOutput i3
forall a. Versioned a -> a
vData Versioned (GetOutput i3)
next3)
          }
      else
        STM ()
forall a. STM a
retrySTM

  pure Merge :: forall o. ThreadId -> TVar (Versioned o) -> Merge o
Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
    }
  where
    mkVersion :: Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b Versioned a
c = [Vector Word64] -> Vector Word64
forall a. Monoid a => [a] -> a
mconcat [Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
a, Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
b, Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
c]

spawnMerge4
  :: ( MonadUnliftIO m
     , HasOutput i1
     , HasOutput i2
     , HasOutput i3
     , HasOutput i4
     )
  => (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o)
  -> i1
  -> i2
  -> i3
  -> i4
  -> m (Merge o)
spawnMerge4 :: forall (m :: * -> *) i1 i2 i3 i4 o.
(MonadUnliftIO m, HasOutput i1, HasOutput i2, HasOutput i3,
 HasOutput i4) =>
(GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o)
-> i1 -> i2 -> i3 -> i4 -> m (Merge o)
spawnMerge4 GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f i1
i1 i2
i2 i3
i3 i4
i4 = do
  TVar (Versioned o)
output <- STM (TVar (Versioned o)) -> m (TVar (Versioned o))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    (Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2, Versioned (GetOutput i3)
initial3, Versioned (GetOutput i4)
initial4) <- (,,,)
      (Versioned (GetOutput i1)
 -> Versioned (GetOutput i2)
 -> Versioned (GetOutput i3)
 -> Versioned (GetOutput i4)
 -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
     Versioned (GetOutput i3), Versioned (GetOutput i4)))
-> STM (Versioned (GetOutput i1))
-> STM
     (Versioned (GetOutput i2)
      -> Versioned (GetOutput i3)
      -> Versioned (GetOutput i4)
      -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
          Versioned (GetOutput i3), Versioned (GetOutput i4)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Versioned (GetOutput i1)) -> STM (Versioned (GetOutput i1))
forall a. TVar a -> STM a
readTVar (i1 -> TVar (Versioned (GetOutput i1))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      STM
  (Versioned (GetOutput i2)
   -> Versioned (GetOutput i3)
   -> Versioned (GetOutput i4)
   -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
       Versioned (GetOutput i3), Versioned (GetOutput i4)))
-> STM (Versioned (GetOutput i2))
-> STM
     (Versioned (GetOutput i3)
      -> Versioned (GetOutput i4)
      -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
          Versioned (GetOutput i3), Versioned (GetOutput i4)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar (Versioned (GetOutput i2)) -> STM (Versioned (GetOutput i2))
forall a. TVar a -> STM a
readTVar (i2 -> TVar (Versioned (GetOutput i2))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      STM
  (Versioned (GetOutput i3)
   -> Versioned (GetOutput i4)
   -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
       Versioned (GetOutput i3), Versioned (GetOutput i4)))
-> STM (Versioned (GetOutput i3))
-> STM
     (Versioned (GetOutput i4)
      -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
          Versioned (GetOutput i3), Versioned (GetOutput i4)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar (Versioned (GetOutput i3)) -> STM (Versioned (GetOutput i3))
forall a. TVar a -> STM a
readTVar (i3 -> TVar (Versioned (GetOutput i3))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
      STM
  (Versioned (GetOutput i4)
   -> (Versioned (GetOutput i1), Versioned (GetOutput i2),
       Versioned (GetOutput i3), Versioned (GetOutput i4)))
-> STM (Versioned (GetOutput i4))
-> STM
     (Versioned (GetOutput i1), Versioned (GetOutput i2),
      Versioned (GetOutput i3), Versioned (GetOutput i4))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar (Versioned (GetOutput i4)) -> STM (Versioned (GetOutput i4))
forall a. TVar a -> STM a
readTVar (i4 -> TVar (Versioned (GetOutput i4))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i4
i4)

    Versioned o -> STM (TVar (Versioned o))
forall a. a -> STM (TVar a)
newTVar Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = Versioned (GetOutput i1)
-> Versioned (GetOutput i2)
-> Versioned (GetOutput i3)
-> Versioned (GetOutput i4)
-> Vector Word64
forall {a} {a} {a} {a}.
Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2 Versioned (GetOutput i3)
initial3 Versioned (GetOutput i4)
initial4
      , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f (Versioned (GetOutput i1) -> GetOutput i1
forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (Versioned (GetOutput i2) -> GetOutput i2
forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2) (Versioned (GetOutput i3) -> GetOutput i3
forall a. Versioned a -> a
vData Versioned (GetOutput i3)
initial3) (Versioned (GetOutput i4) -> GetOutput i4
forall a. Versioned a -> a
vData Versioned (GetOutput i4)
initial4)
      }

  ThreadId
worker <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Versioned (GetOutput i1)
next1 <- TVar (Versioned (GetOutput i1)) -> STM (Versioned (GetOutput i1))
forall a. TVar a -> STM a
readTVar (i1 -> TVar (Versioned (GetOutput i1))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
      Versioned (GetOutput i2)
next2 <- TVar (Versioned (GetOutput i2)) -> STM (Versioned (GetOutput i2))
forall a. TVar a -> STM a
readTVar (i2 -> TVar (Versioned (GetOutput i2))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
      Versioned (GetOutput i3)
next3 <- TVar (Versioned (GetOutput i3)) -> STM (Versioned (GetOutput i3))
forall a. TVar a -> STM a
readTVar (i3 -> TVar (Versioned (GetOutput i3))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
      Versioned (GetOutput i4)
next4 <- TVar (Versioned (GetOutput i4)) -> STM (Versioned (GetOutput i4))
forall a. TVar a -> STM a
readTVar (i4 -> TVar (Versioned (GetOutput i4))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i4
i4)
      Versioned o
old <- TVar (Versioned o) -> STM (Versioned o)
forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output

      let nextVersion :: Vector Word64
nextVersion = Versioned (GetOutput i1)
-> Versioned (GetOutput i2)
-> Versioned (GetOutput i3)
-> Versioned (GetOutput i4)
-> Vector Word64
forall {a} {a} {a} {a}.
Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2 Versioned (GetOutput i3)
next3 Versioned (GetOutput i4)
next4

      if Vector Word64
nextVersion Vector Word64 -> Vector Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Versioned o -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
        TVar (Versioned o) -> Versioned o -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: o
vData    = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f (Versioned (GetOutput i1) -> GetOutput i1
forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (Versioned (GetOutput i2) -> GetOutput i2
forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2) (Versioned (GetOutput i3) -> GetOutput i3
forall a. Versioned a -> a
vData Versioned (GetOutput i3)
next3) (Versioned (GetOutput i4) -> GetOutput i4
forall a. Versioned a -> a
vData Versioned (GetOutput i4)
next4)
          }
      else
        STM ()
forall a. STM a
retrySTM

  pure Merge :: forall o. ThreadId -> TVar (Versioned o) -> Merge o
Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
    }
  where
    mkVersion :: Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b Versioned a
c Versioned a
d = [Vector Word64] -> Vector Word64
forall a. Monoid a => [a] -> a
mconcat [Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
a, Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
b, Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
c, Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
d]

{- |
  Spawn a merge over a homogeneous traversable collection of processes.

  A merging function will receive a collection of outputs to summarize.
-}
spawnMergeT
  :: ( Traversable t
     , HasOutput input
     , MonadUnliftIO m
     )
  => (t (GetOutput input) -> output)
  -> t input
  -> m (Merge output)
spawnMergeT :: forall (t :: * -> *) input (m :: * -> *) output.
(Traversable t, HasOutput input, MonadUnliftIO m) =>
(t (GetOutput input) -> output) -> t input -> m (Merge output)
spawnMergeT t (GetOutput input) -> output
f t input
inputs = do
  TVar (Versioned output)
output <- STM (TVar (Versioned output)) -> m (TVar (Versioned output))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
    t (Versioned (GetOutput input))
initial <- (input -> STM (Versioned (GetOutput input)))
-> t input -> STM (t (Versioned (GetOutput input)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (TVar (Versioned (GetOutput input))
-> STM (Versioned (GetOutput input))
forall a. TVar a -> STM a
readTVar (TVar (Versioned (GetOutput input))
 -> STM (Versioned (GetOutput input)))
-> (input -> TVar (Versioned (GetOutput input)))
-> input
-> STM (Versioned (GetOutput input))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. input -> TVar (Versioned (GetOutput input))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput) t input
inputs

    Versioned output -> STM (TVar (Versioned output))
forall a. a -> STM (TVar a)
newTVar Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = (Versioned (GetOutput input) -> Vector Word64)
-> t (Versioned (GetOutput input)) -> Vector Word64
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap Versioned (GetOutput input) -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion t (Versioned (GetOutput input))
initial
      , $sel:vData:Versioned :: output
vData    = t (GetOutput input) -> output
f ((Versioned (GetOutput input) -> GetOutput input)
-> t (Versioned (GetOutput input)) -> t (GetOutput input)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Versioned (GetOutput input) -> GetOutput input
forall a. Versioned a -> a
vData t (Versioned (GetOutput input))
initial)
      }

  ThreadId
worker <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
      t (Versioned (GetOutput input))
next <- (input -> STM (Versioned (GetOutput input)))
-> t input -> STM (t (Versioned (GetOutput input)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (TVar (Versioned (GetOutput input))
-> STM (Versioned (GetOutput input))
forall a. TVar a -> STM a
readTVar (TVar (Versioned (GetOutput input))
 -> STM (Versioned (GetOutput input)))
-> (input -> TVar (Versioned (GetOutput input)))
-> input
-> STM (Versioned (GetOutput input))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. input -> TVar (Versioned (GetOutput input))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput) t input
inputs
      Versioned output
old <- TVar (Versioned output) -> STM (Versioned output)
forall a. TVar a -> STM a
readTVar TVar (Versioned output)
output

      let nextVersion :: Vector Word64
nextVersion = (Versioned (GetOutput input) -> Vector Word64)
-> t (Versioned (GetOutput input)) -> Vector Word64
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap Versioned (GetOutput input) -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion t (Versioned (GetOutput input))
next

      if Vector Word64
nextVersion Vector Word64 -> Vector Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Versioned output -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned output
old then
        TVar (Versioned output) -> Versioned output -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned output)
output Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
          { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
          , $sel:vData:Versioned :: output
vData    = t (GetOutput input) -> output
f ((Versioned (GetOutput input) -> GetOutput input)
-> t (Versioned (GetOutput input)) -> t (GetOutput input)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Versioned (GetOutput input) -> GetOutput input
forall a. Versioned a -> a
vData t (Versioned (GetOutput input))
next)
          }
      else
        STM ()
forall a. STM a
retrySTM

  pure Merge :: forall o. ThreadId -> TVar (Versioned o) -> Merge o
Merge
    { $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
    , $sel:mOutput:Merge :: TVar (Versioned output)
mOutput = TVar (Versioned output)
output
    }

-- * Demand

type ObserverIO a = IORef (Versioned a)

newObserverIO :: MonadIO m => a -> m (ObserverIO a)
newObserverIO :: forall (m :: * -> *) a. MonadIO m => a -> m (ObserverIO a)
newObserverIO a
initial = Versioned a -> m (IORef (Versioned a))
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
  { $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
forall a. Monoid a => a
mempty
  , $sel:vData:Versioned :: a
vData    = a
initial
  }

observeIO
  :: (MonadUnliftIO m, HasOutput output)
  => output
  -> ObserverIO a
  -> (a -> GetOutput output -> m a)
  -> m a
observeIO :: forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a
observeIO output
output ObserverIO a
currentRef a -> GetOutput output -> m a
action = do
  Versioned (GetOutput output)
outputV <- TVar (Versioned (GetOutput output))
-> m (Versioned (GetOutput output))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (output -> TVar (Versioned (GetOutput output))
forall a. HasOutput a => a -> Var (GetOutput a)
getOutput output
output)
  Versioned a
currentV <- ObserverIO a -> m (Versioned a)
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef ObserverIO a
currentRef
  if Versioned (GetOutput output) -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput output)
outputV Vector Word64 -> Vector Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Versioned a -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned a
currentV then do
    a
derived <- a -> GetOutput output -> m a
action (Versioned a -> a
forall a. Versioned a -> a
vData Versioned a
currentV) (Versioned (GetOutput output) -> GetOutput output
forall a. Versioned a -> a
vData Versioned (GetOutput output)
outputV)
    ObserverIO a -> Versioned a -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
atomicWriteIORef ObserverIO a
currentRef Versioned :: forall a. Vector Word64 -> a -> Versioned a
Versioned
      { $sel:vVersion:Versioned :: Vector Word64
vVersion = Versioned (GetOutput output) -> Vector Word64
forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput output)
outputV
      , $sel:vData:Versioned :: a
vData    = a
derived
      }
    pure a
derived
  else
    a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Versioned a -> a
forall a. Versioned a -> a
vData Versioned a
currentV)

observeIO_
  :: (MonadUnliftIO m, HasOutput output)
  => output
  -> ObserverIO a
  -> (a -> GetOutput output -> m a)
  -> m ()
observeIO_ :: forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m ()
observeIO_ output
output ObserverIO a
currentRef =
  m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m a -> m ())
-> ((a -> GetOutput output -> m a) -> m a)
-> (a -> GetOutput output -> m a)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a
forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a
observeIO output
output ObserverIO a
currentRef

{-# INLINEABLE readObservedIO #-}
readObservedIO :: (MonadUnliftIO m) => IORef (Versioned a) -> m a
readObservedIO :: forall (m :: * -> *) a.
MonadUnliftIO m =>
IORef (Versioned a) -> m a
readObservedIO = (Versioned a -> a) -> m (Versioned a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Versioned a -> a
forall a. Versioned a -> a
vData (m (Versioned a) -> m a)
-> (IORef (Versioned a) -> m (Versioned a))
-> IORef (Versioned a)
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef (Versioned a) -> m (Versioned a)
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef

-- * PubSub

newtype Source a = Source (UnagiPrim.InChan a)

newSource :: (MonadUnliftIO m, UnagiPrim a) => m (Source a)
newSource :: forall (m :: * -> *) a.
(MonadUnliftIO m, UnagiPrim a) =>
m (Source a)
newSource = ((InChan a, OutChan a) -> Source a)
-> m (InChan a, OutChan a) -> m (Source a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (InChan a -> Source a
forall a. InChan a -> Source a
Source (InChan a -> Source a)
-> ((InChan a, OutChan a) -> InChan a)
-> (InChan a, OutChan a)
-> Source a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (InChan a, OutChan a) -> InChan a
forall a b. (a, b) -> a
fst) (m (InChan a, OutChan a) -> m (Source a))
-> m (InChan a, OutChan a) -> m (Source a)
forall a b. (a -> b) -> a -> b
$ IO (InChan a, OutChan a) -> m (InChan a, OutChan a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (InChan a, OutChan a)
forall a. UnagiPrim a => IO (InChan a, OutChan a)
UnagiPrim.newChan

pubSource :: (MonadUnliftIO m, UnagiPrim a) => Source a -> a -> m ()
pubSource :: forall (m :: * -> *) a.
(MonadUnliftIO m, UnagiPrim a) =>
Source a -> a -> m ()
pubSource (Source InChan a
ic) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (a -> IO ()) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InChan a -> a -> IO ()
forall a. UnagiPrim a => InChan a -> a -> IO ()
UnagiPrim.writeChan InChan a
ic

subSource :: MonadUnliftIO m => Source a -> m (UnagiPrim.OutChan a)
subSource :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Source a -> m (OutChan a)
subSource (Source InChan a
ic) = IO (OutChan a) -> m (OutChan a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (OutChan a) -> m (OutChan a))
-> IO (OutChan a) -> m (OutChan a)
forall a b. (a -> b) -> a -> b
$ InChan a -> IO (OutChan a)
forall a. InChan a -> IO (OutChan a)
UnagiPrim.dupChan InChan a
ic

-- * Utils

class HasWorker a where
  getWorker :: a -> ThreadId

instance HasWorker (Cell i o) where
  getWorker :: Cell i o -> ThreadId
getWorker = Merge o -> ThreadId
forall o. Merge o -> ThreadId
mWorker (Merge o -> ThreadId)
-> (Cell i o -> Merge o) -> Cell i o -> ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Cell i o -> Merge o
forall a b. (a, b) -> b
snd

instance HasWorker (Timed c o) where
  getWorker :: Timed c o -> ThreadId
getWorker = Timed c o -> ThreadId
forall config output. Timed config output -> ThreadId
tWorker

instance HasWorker (Merge o) where
  getWorker :: Merge o -> ThreadId
getWorker = Merge o -> ThreadId
forall o. Merge o -> ThreadId
mWorker

register
  :: ( MonadResource m
     , HasWorker process
     )
  => process
  -> m Resource.ReleaseKey
register :: forall (m :: * -> *) process.
(MonadResource m, HasWorker process) =>
process -> m ReleaseKey
register = IO () -> m ReleaseKey
forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register (IO () -> m ReleaseKey)
-> (process -> IO ()) -> process -> m ReleaseKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread (ThreadId -> IO ()) -> (process -> ThreadId) -> process -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. process -> ThreadId
forall a. HasWorker a => a -> ThreadId
getWorker

registerCollection
  :: ( MonadResource m
     , HasWorker process
     , Foldable t
     )
  => t process
  -> m Resource.ReleaseKey
registerCollection :: forall (m :: * -> *) process (t :: * -> *).
(MonadResource m, HasWorker process, Foldable t) =>
t process -> m ReleaseKey
registerCollection = IO () -> m ReleaseKey
forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register (IO () -> m ReleaseKey)
-> (t process -> IO ()) -> t process -> m ReleaseKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (process -> IO ()) -> t process -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread (ThreadId -> IO ()) -> (process -> ThreadId) -> process -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. process -> ThreadId
forall a. HasWorker a => a -> ThreadId
getWorker)

registered :: (MonadResource m, HasWorker a) => m a -> m (Resource.ReleaseKey, a)
registered :: forall (m :: * -> *) a.
(MonadResource m, HasWorker a) =>
m a -> m (ReleaseKey, a)
registered m a
spawn = do
  a
worker <- m a
spawn
  ReleaseKey
key <- IO () -> m ReleaseKey
forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register (IO () -> m ReleaseKey) -> IO () -> m ReleaseKey
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread (a -> ThreadId
forall a. HasWorker a => a -> ThreadId
getWorker a
worker)
  pure (ReleaseKey
key, a
worker)

registeredCollection
  :: ( MonadResource m
     , HasWorker process
     , Traversable t
     )
  => (input -> m process)
  -> t input
  -> m (Resource.ReleaseKey, t process)
registeredCollection :: forall (m :: * -> *) process (t :: * -> *) input.
(MonadResource m, HasWorker process, Traversable t) =>
(input -> m process) -> t input -> m (ReleaseKey, t process)
registeredCollection input -> m process
spawn t input
inputs = do
  t process
workers <- (input -> m process) -> t input -> m (t process)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse input -> m process
spawn t input
inputs

  ReleaseKey
key <- IO () -> m ReleaseKey
forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register (IO () -> m ReleaseKey) -> IO () -> m ReleaseKey
forall a b. (a -> b) -> a -> b
$
    (process -> IO ()) -> t process -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread (ThreadId -> IO ()) -> (process -> ThreadId) -> process -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. process -> ThreadId
forall a. HasWorker a => a -> ThreadId
getWorker) t process
workers

  pure (ReleaseKey
key, t process
workers)