{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}
module Util.Sink(
HasInvalidate(..),
SinkID,
newSinkID,
Sink,
newSink,
newSinkGeneral,
newParallelSink,
newParallelDelayedSink,
putSink,
putSinkMultiple,
coMapSink,
coMapSink',
coMapIOSink',
CanAddSinks(..),
addNewAction,
ParallelExec,
newParallelExec,
parallelExec,
parallelExecVSem,
) where
import Control.Concurrent
import Control.Exception (try)
import System.IO.Unsafe
import Data.IORef
import Util.Object
import Util.ExtendedPrelude
import Util.Thread
import Util.Computation (done)
import Util.VSem
class HasInvalidate source where
invalidate :: source -> IO ()
data SinkID = SinkID {
SinkID -> ObjectID
oID :: ObjectID,
SinkID -> IORef Bool
interested :: IORef Bool
}
newSinkID :: IO SinkID
newSinkID :: IO SinkID
newSinkID =
do
ObjectID
oID <- IO ObjectID
newObject
IORef Bool
interested <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True
SinkID -> IO SinkID
forall (m :: * -> *) a. Monad m => a -> m a
return (SinkID :: ObjectID -> IORef Bool -> SinkID
SinkID {
oID :: ObjectID
oID = ObjectID
oID,
interested :: IORef Bool
interested = IORef Bool
interested
})
isInterested :: SinkID -> IO Bool
isInterested :: SinkID -> IO Bool
isInterested SinkID
sinkID = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (SinkID -> IORef Bool
interested SinkID
sinkID)
instance HasInvalidate SinkID where
invalidate :: SinkID -> IO ()
invalidate SinkID
sinkID = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SinkID -> IORef Bool
interested SinkID
sinkID) Bool
False
instance Eq SinkID where
== :: SinkID -> SinkID -> Bool
(==) SinkID
sinkID1 SinkID
sinkID2 = (SinkID -> ObjectID
oID SinkID
sinkID1) ObjectID -> ObjectID -> Bool
forall a. Eq a => a -> a -> Bool
== (SinkID -> ObjectID
oID SinkID
sinkID2)
instance Ord SinkID where
compare :: SinkID -> SinkID -> Ordering
compare SinkID
sinkID1 SinkID
sinkID2 = ObjectID -> ObjectID -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (SinkID -> ObjectID
oID SinkID
sinkID1) (SinkID -> ObjectID
oID SinkID
sinkID2)
data Sink x = Sink {
Sink x -> SinkID
sinkID :: SinkID,
Sink x -> x -> IO ()
action :: x -> IO ()
}
newSink :: (x -> IO ()) -> IO (Sink x)
newSink :: (x -> IO ()) -> IO (Sink x)
newSink x -> IO ()
action =
do
SinkID
sinkID <- IO SinkID
newSinkID
SinkID -> (x -> IO ()) -> IO (Sink x)
forall x. SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID x -> IO ()
action
newSinkGeneral :: SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral :: SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID x -> IO ()
action = Sink x -> IO (Sink x)
forall (m :: * -> *) a. Monad m => a -> m a
return (Sink :: forall x. SinkID -> (x -> IO ()) -> Sink x
Sink {sinkID :: SinkID
sinkID = SinkID
sinkID,action :: x -> IO ()
action = x -> IO ()
action})
instance HasInvalidate (Sink x) where
invalidate :: Sink x -> IO ()
invalidate Sink x
sink = SinkID -> IO ()
forall source. HasInvalidate source => source -> IO ()
invalidate (Sink x -> SinkID
forall x. Sink x -> SinkID
sinkID Sink x
sink)
putSink :: Sink x -> x -> IO Bool
putSink :: Sink x -> x -> IO Bool
putSink Sink x
sink x
x =
do
Bool
interested <- SinkID -> IO Bool
isInterested (Sink x -> SinkID
forall x. Sink x -> SinkID
sinkID Sink x
sink)
if Bool
interested then (Sink x -> x -> IO ()
forall x. Sink x -> x -> IO ()
action Sink x
sink x
x) else IO ()
forall (m :: * -> *). Monad m => m ()
done
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
interested
putSinkMultiple :: Sink x -> [x] -> IO Bool
putSinkMultiple :: Sink x -> [x] -> IO Bool
putSinkMultiple Sink x
sink [] = Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
putSinkMultiple Sink x
sink (x
x:[x]
xs) =
do
Bool
interested <- Sink x -> x -> IO Bool
forall x. Sink x -> x -> IO Bool
putSink Sink x
sink x
x
if Bool
interested
then
Sink x -> [x] -> IO Bool
forall x. Sink x -> [x] -> IO Bool
putSinkMultiple Sink x
sink [x]
xs
else
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
interested
coMapSink :: (y -> x) -> Sink x -> Sink y
coMapSink :: (y -> x) -> Sink x -> Sink y
coMapSink y -> x
fn (Sink {sinkID :: forall x. Sink x -> SinkID
sinkID = SinkID
sinkID,action :: forall x. Sink x -> x -> IO ()
action = x -> IO ()
action}) =
Sink :: forall x. SinkID -> (x -> IO ()) -> Sink x
Sink {sinkID :: SinkID
sinkID = SinkID
sinkID,action :: y -> IO ()
action = x -> IO ()
action (x -> IO ()) -> (y -> x) -> y -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> x
fn}
coMapSink' :: (y -> Maybe x) -> Sink x -> Sink y
coMapSink' :: (y -> Maybe x) -> Sink x -> Sink y
coMapSink' y -> Maybe x
fn (Sink {sinkID :: forall x. Sink x -> SinkID
sinkID = SinkID
sinkID,action :: forall x. Sink x -> x -> IO ()
action = x -> IO ()
action}) =
let
action' :: y -> IO ()
action' y
y = case y -> Maybe x
fn y
y of
Maybe x
Nothing -> IO ()
forall (m :: * -> *). Monad m => m ()
done
Just x
x -> x -> IO ()
action x
x
in
Sink :: forall x. SinkID -> (x -> IO ()) -> Sink x
Sink {sinkID :: SinkID
sinkID = SinkID
sinkID,action :: y -> IO ()
action = y -> IO ()
action'}
coMapIOSink' :: (y -> IO (Maybe x)) -> Sink x -> Sink y
coMapIOSink' :: (y -> IO (Maybe x)) -> Sink x -> Sink y
coMapIOSink' y -> IO (Maybe x)
actFn (Sink {sinkID :: forall x. Sink x -> SinkID
sinkID = SinkID
sinkID,action :: forall x. Sink x -> x -> IO ()
action = x -> IO ()
action}) =
let
action' :: y -> IO ()
action' y
y =
do
Maybe x
xOpt <- y -> IO (Maybe x)
actFn y
y
case Maybe x
xOpt of
Maybe x
Nothing -> IO ()
forall (m :: * -> *). Monad m => m ()
done
Just x
x -> x -> IO ()
action x
x
in
Sink :: forall x. SinkID -> (x -> IO ()) -> Sink x
Sink {sinkID :: SinkID
sinkID = SinkID
sinkID,action :: y -> IO ()
action = y -> IO ()
action'}
class CanAddSinks sinkSource x delta | sinkSource -> x,sinkSource -> delta
where
addNewSink :: sinkSource -> (delta -> IO ()) -> IO (x,Sink delta)
addNewSink sinkSource
sinkSource delta -> IO ()
action =
do
ParallelExec
parallelX <- IO ParallelExec
newParallelExec
sinkSource -> (delta -> IO ()) -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> (delta -> IO ()) -> IO (x, Sink delta)
addNewQuickSink sinkSource
sinkSource
(\ delta
delta -> ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX (delta -> IO ()
action delta
delta))
addNewSinkGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
-> IO (x,Sink delta)
addNewSinkGeneral sinkSource
sinkSource delta -> IO ()
action SinkID
sinkID =
do
ParallelExec
parallelX <- IO ParallelExec
newParallelExec
sinkSource
-> (delta -> IO ()) -> SinkID -> ParallelExec -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource
-> (delta -> IO ()) -> SinkID -> ParallelExec -> IO (x, Sink delta)
addNewSinkVeryGeneral sinkSource
sinkSource delta -> IO ()
action SinkID
sinkID ParallelExec
parallelX
addNewSinkVeryGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
-> ParallelExec -> IO (x,Sink delta)
addNewSinkVeryGeneral sinkSource
sinkSource delta -> IO ()
action SinkID
sinkID ParallelExec
parallelX =
sinkSource -> (delta -> IO ()) -> SinkID -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> (delta -> IO ()) -> SinkID -> IO (x, Sink delta)
addNewQuickSinkGeneral
sinkSource
sinkSource
(\ delta
delta -> ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX (
do
Bool
interested <- SinkID -> IO Bool
isInterested SinkID
sinkID
if Bool
interested then delta -> IO ()
action delta
delta else IO ()
forall (m :: * -> *). Monad m => m ()
done
))
SinkID
sinkID
addNewSinkWithInitial :: sinkSource -> (x -> IO ()) -> (delta -> IO ())
-> SinkID -> ParallelExec -> IO (x,Sink delta)
addNewSinkWithInitial sinkSource
sinkSource x -> IO ()
xAction delta -> IO ()
deltaAction SinkID
sinkID ParallelExec
parallelX =
do
MVar x
mVar <- IO (MVar x)
forall a. IO (MVar a)
newEmptyMVar
let
firstAct :: IO ()
firstAct =
do
x
x <- MVar x -> IO x
forall a. MVar a -> IO a
takeMVar MVar x
mVar
x -> IO ()
xAction x
x
ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX IO ()
firstAct
(returnValue :: (x, Sink delta)
returnValue @ (x
x,Sink delta
sink))
<- sinkSource
-> (delta -> IO ()) -> SinkID -> ParallelExec -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource
-> (delta -> IO ()) -> SinkID -> ParallelExec -> IO (x, Sink delta)
addNewSinkVeryGeneral sinkSource
sinkSource delta -> IO ()
deltaAction SinkID
sinkID ParallelExec
parallelX
MVar x -> x -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar x
mVar x
x
(x, Sink delta) -> IO (x, Sink delta)
forall (m :: * -> *) a. Monad m => a -> m a
return (x, Sink delta)
returnValue
addNewQuickSink :: sinkSource -> (delta -> IO ()) -> IO (x,Sink delta)
addNewQuickSink sinkSource
sinkSource delta -> IO ()
action =
do
Sink delta
sink <- (delta -> IO ()) -> IO (Sink delta)
forall x. (x -> IO ()) -> IO (Sink x)
newSink delta -> IO ()
action
x
x <- sinkSource -> Sink delta -> IO x
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> Sink delta -> IO x
addOldSink sinkSource
sinkSource Sink delta
sink
(x, Sink delta) -> IO (x, Sink delta)
forall (m :: * -> *) a. Monad m => a -> m a
return (x
x,Sink delta
sink)
addNewQuickSinkGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
-> IO (x,Sink delta)
addNewQuickSinkGeneral sinkSource
sinkSource delta -> IO ()
action SinkID
sinkID =
do
Sink delta
sink <- SinkID -> (delta -> IO ()) -> IO (Sink delta)
forall x. SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID delta -> IO ()
action
x
x <- sinkSource -> Sink delta -> IO x
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> Sink delta -> IO x
addOldSink sinkSource
sinkSource Sink delta
sink
(x, Sink delta) -> IO (x, Sink delta)
forall (m :: * -> *) a. Monad m => a -> m a
return (x
x,Sink delta
sink)
addOldSink :: sinkSource -> Sink delta -> IO x
addNewAction :: CanAddSinks sinkSource x delta
=> sinkSource -> (delta -> IO Bool) -> IO x
addNewAction :: sinkSource -> (delta -> IO Bool) -> IO x
addNewAction sinkSource
sinkSource delta -> IO Bool
action =
do
MVar (Sink delta)
sinkMVar <- IO (MVar (Sink delta))
forall a. IO (MVar a)
newEmptyMVar
let
deltaAct :: delta -> IO ()
deltaAct delta
delta =
do
Bool
continue <- delta -> IO Bool
action delta
delta
if Bool
continue
then
IO ()
forall (m :: * -> *). Monad m => m ()
done
else
do
Sink delta
sink <- MVar (Sink delta) -> IO (Sink delta)
forall a. MVar a -> IO a
takeMVar MVar (Sink delta)
sinkMVar
Sink delta -> IO ()
forall source. HasInvalidate source => source -> IO ()
invalidate Sink delta
sink
String -> IO ()
BreakFn
simpleFallOut String
""
(x
x,Sink delta
sink) <- sinkSource -> (delta -> IO ()) -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> (delta -> IO ()) -> IO (x, Sink delta)
addNewSink sinkSource
sinkSource delta -> IO ()
deltaAct
MVar (Sink delta) -> Sink delta -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Sink delta)
sinkMVar Sink delta
sink
x -> IO x
forall (m :: * -> *) a. Monad m => a -> m a
return x
x
newtype ParallelExec = ParallelExec (Chan (IO ()))
parallelExecVSem :: VSem
parallelExecVSem :: VSem
parallelExecVSem = IO VSem -> VSem
forall a. IO a -> a
unsafePerformIO IO VSem
newVSem
{-# NOINLINE parallelExecVSem #-}
newParallelExec :: IO ParallelExec
newParallelExec :: IO ParallelExec
newParallelExec =
do
Chan (IO ())
chan <- IO (Chan (IO ()))
forall a. IO (Chan a)
newChan
let
parallelExecThread0 :: IO b
parallelExecThread0 =
do
IO ()
act <- Chan (IO ()) -> IO (IO ())
forall a. Chan a -> IO a
readChan Chan (IO ())
chan
Either Dyn ()
result <- IO () -> IO (Either Dyn ())
forall e a. Exception e => IO a -> IO (Either e a)
try IO ()
act
case Either Dyn ()
result of
Left Dyn
excep -> String -> IO ()
putStrLn (String
"Exception detected: "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ Dyn -> String
showException2 Dyn
excep)
Right () -> IO ()
forall (m :: * -> *). Monad m => m ()
done
VSem -> IO ()
releaseLocal VSem
parallelExecVSem
IO b
parallelExecThread0
parallelExecThread :: IO ()
parallelExecThread =
do
IO Any -> IO (Either String Any)
forall a. IO a -> IO (Either String a)
addSimpleFallOut IO Any
forall b. IO b
parallelExecThread0
IO ()
forall (m :: * -> *). Monad m => m ()
done
IO () -> IO ThreadId
forkIODebug IO ()
parallelExecThread
ParallelExec -> IO ParallelExec
forall (m :: * -> *) a. Monad m => a -> m a
return (Chan (IO ()) -> ParallelExec
ParallelExec Chan (IO ())
chan)
parallelExec :: ParallelExec -> IO () -> IO ()
parallelExec :: ParallelExec -> IO () -> IO ()
parallelExec (ParallelExec Chan (IO ())
chan) IO ()
act =
do
VSem -> IO ()
acquireLocal VSem
parallelExecVSem
Chan (IO ()) -> IO () -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (IO ())
chan IO ()
act
newParallelSink :: (x -> IO ()) -> IO (Sink x)
newParallelSink :: (x -> IO ()) -> IO (Sink x)
newParallelSink x -> IO ()
action =
do
ParallelExec
parallelX <- IO ParallelExec
newParallelExec
SinkID
sinkID <- IO SinkID
newSinkID
SinkID -> (x -> IO ()) -> IO (Sink x)
forall x. SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID (\ x
delta -> ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX (
do
Bool
interested <- SinkID -> IO Bool
isInterested SinkID
sinkID
if Bool
interested then x -> IO ()
action x
delta else IO ()
forall (m :: * -> *). Monad m => m ()
done
))
newParallelDelayedSink :: IO (Sink x,(x -> IO ()) -> IO ())
newParallelDelayedSink :: IO (Sink x, (x -> IO ()) -> IO ())
newParallelDelayedSink =
do
MVar (x -> IO ())
actionMVar <- IO (MVar (x -> IO ()))
forall a. IO (MVar a)
newEmptyMVar
ParallelExec
parallelX <- IO ParallelExec
newParallelExec
SinkID
sinkID <- IO SinkID
newSinkID
Sink x
sink <- SinkID -> (x -> IO ()) -> IO (Sink x)
forall x. SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID (\ x
delta -> ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX (
do
Bool
interested <- SinkID -> IO Bool
isInterested SinkID
sinkID
if Bool
interested
then
do
x -> IO ()
action <- MVar (x -> IO ()) -> IO (x -> IO ())
forall a. MVar a -> IO a
readMVar MVar (x -> IO ())
actionMVar
x -> IO ()
action x
delta
else
IO ()
forall (m :: * -> *). Monad m => m ()
done
))
(Sink x, (x -> IO ()) -> IO ())
-> IO (Sink x, (x -> IO ()) -> IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Sink x
sink,MVar (x -> IO ()) -> (x -> IO ()) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (x -> IO ())
actionMVar)