{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}
module Util.Sink(
HasInvalidate(..),
SinkID,
newSinkID,
Sink,
newSink,
newSinkGeneral,
newParallelSink,
newParallelDelayedSink,
putSink,
putSinkMultiple,
coMapSink,
coMapSink',
coMapIOSink',
CanAddSinks(..),
addNewAction,
ParallelExec,
newParallelExec,
parallelExec,
parallelExecVSem,
) where
import Control.Concurrent
import Control.Exception (try)
import System.IO.Unsafe
import Data.IORef
import Util.Object
import Util.ExtendedPrelude
import Util.Thread
import Util.Computation (done)
import Util.VSem
class HasInvalidate source where
invalidate :: source -> IO ()
data SinkID = SinkID {
oID :: ObjectID,
interested :: IORef Bool
}
newSinkID :: IO SinkID
newSinkID =
do
oID <- newObject
interested <- newIORef True
return (SinkID {
oID = oID,
interested = interested
})
isInterested :: SinkID -> IO Bool
isInterested sinkID = readIORef (interested sinkID)
instance HasInvalidate SinkID where
invalidate sinkID = writeIORef (interested sinkID) False
instance Eq SinkID where
(==) sinkID1 sinkID2 = (oID sinkID1) == (oID sinkID2)
instance Ord SinkID where
compare sinkID1 sinkID2 = compare (oID sinkID1) (oID sinkID2)
data Sink x = Sink {
sinkID :: SinkID,
action :: x -> IO ()
}
newSink :: (x -> IO ()) -> IO (Sink x)
newSink action =
do
sinkID <- newSinkID
newSinkGeneral sinkID action
newSinkGeneral :: SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral sinkID action = return (Sink {sinkID = sinkID,action = action})
instance HasInvalidate (Sink x) where
invalidate sink = invalidate (sinkID sink)
putSink :: Sink x -> x -> IO Bool
putSink sink x =
do
interested <- isInterested (sinkID sink)
if interested then (action sink x) else done
return interested
putSinkMultiple :: Sink x -> [x] -> IO Bool
putSinkMultiple sink [] = return True
putSinkMultiple sink (x:xs) =
do
interested <- putSink sink x
if interested
then
putSinkMultiple sink xs
else
return interested
coMapSink :: (y -> x) -> Sink x -> Sink y
coMapSink fn (Sink {sinkID = sinkID,action = action}) =
Sink {sinkID = sinkID,action = action . fn}
coMapSink' :: (y -> Maybe x) -> Sink x -> Sink y
coMapSink' fn (Sink {sinkID = sinkID,action = action}) =
let
action' y = case fn y of
Nothing -> done
Just x -> action x
in
Sink {sinkID = sinkID,action = action'}
coMapIOSink' :: (y -> IO (Maybe x)) -> Sink x -> Sink y
coMapIOSink' actFn (Sink {sinkID = sinkID,action = action}) =
let
action' y =
do
xOpt <- actFn y
case xOpt of
Nothing -> done
Just x -> action x
in
Sink {sinkID = sinkID,action = action'}
class CanAddSinks sinkSource x delta | sinkSource -> x,sinkSource -> delta
where
addNewSink :: sinkSource -> (delta -> IO ()) -> IO (x,Sink delta)
addNewSink sinkSource action =
do
parallelX <- newParallelExec
addNewQuickSink sinkSource
(\ delta -> parallelExec parallelX (action delta))
addNewSinkGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
-> IO (x,Sink delta)
addNewSinkGeneral sinkSource action sinkID =
do
parallelX <- newParallelExec
addNewSinkVeryGeneral sinkSource action sinkID parallelX
addNewSinkVeryGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
-> ParallelExec -> IO (x,Sink delta)
addNewSinkVeryGeneral sinkSource action sinkID parallelX =
addNewQuickSinkGeneral
sinkSource
(\ delta -> parallelExec parallelX (
do
interested <- isInterested sinkID
if interested then action delta else done
))
sinkID
addNewSinkWithInitial :: sinkSource -> (x -> IO ()) -> (delta -> IO ())
-> SinkID -> ParallelExec -> IO (x,Sink delta)
addNewSinkWithInitial sinkSource xAction deltaAction sinkID parallelX =
do
mVar <- newEmptyMVar
let
firstAct =
do
x <- takeMVar mVar
xAction x
parallelExec parallelX firstAct
(returnValue @ (x,sink))
<- addNewSinkVeryGeneral sinkSource deltaAction sinkID parallelX
putMVar mVar x
return returnValue
addNewQuickSink :: sinkSource -> (delta -> IO ()) -> IO (x,Sink delta)
addNewQuickSink sinkSource action =
do
sink <- newSink action
x <- addOldSink sinkSource sink
return (x,sink)
addNewQuickSinkGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
-> IO (x,Sink delta)
addNewQuickSinkGeneral sinkSource action sinkID =
do
sink <- newSinkGeneral sinkID action
x <- addOldSink sinkSource sink
return (x,sink)
addOldSink :: sinkSource -> Sink delta -> IO x
addNewAction :: CanAddSinks sinkSource x delta
=> sinkSource -> (delta -> IO Bool) -> IO x
addNewAction sinkSource action =
do
sinkMVar <- newEmptyMVar
let
deltaAct delta =
do
continue <- action delta
if continue
then
done
else
do
sink <- takeMVar sinkMVar
invalidate sink
simpleFallOut ""
(x,sink) <- addNewSink sinkSource deltaAct
putMVar sinkMVar sink
return x
newtype ParallelExec = ParallelExec (Chan (IO ()))
parallelExecVSem :: VSem
parallelExecVSem = unsafePerformIO newVSem
{-# NOINLINE parallelExecVSem #-}
newParallelExec :: IO ParallelExec
newParallelExec =
do
chan <- newChan
let
parallelExecThread0 =
do
act <- readChan chan
result <- try act
case result of
Left excep -> putStrLn ("Exception detected: "
++ showException2 excep)
Right () -> done
releaseLocal parallelExecVSem
parallelExecThread0
parallelExecThread =
do
addSimpleFallOut parallelExecThread0
done
forkIODebug parallelExecThread
return (ParallelExec chan)
parallelExec :: ParallelExec -> IO () -> IO ()
parallelExec (ParallelExec chan) act =
do
acquireLocal parallelExecVSem
writeChan chan act
newParallelSink :: (x -> IO ()) -> IO (Sink x)
newParallelSink action =
do
parallelX <- newParallelExec
sinkID <- newSinkID
newSinkGeneral sinkID (\ delta -> parallelExec parallelX (
do
interested <- isInterested sinkID
if interested then action delta else done
))
newParallelDelayedSink :: IO (Sink x,(x -> IO ()) -> IO ())
newParallelDelayedSink =
do
actionMVar <- newEmptyMVar
parallelX <- newParallelExec
sinkID <- newSinkID
sink <- newSinkGeneral sinkID (\ delta -> parallelExec parallelX (
do
interested <- isInterested sinkID
if interested
then
do
action <- readMVar actionMVar
action delta
else
done
))
return (sink,putMVar actionMVar)