module Control.Concurrent.CHP.Arrow (ProcessPipeline, runPipeline, arrowProcess, arrStrict,
ProcessPipelineLabel, runPipelineLabel, arrowProcessLabel, arrLabel, arrStrictLabel,
(*>>>*), (*<<<*), (*&&&*), (*****)) where
#if __GLASGOW_HASKELL__ >= 609
import Control.Category
import Prelude hiding ((.), id)
#endif
import Control.Arrow
#if __GLASGOW_HASKELL__ < 610
hiding (pure)
#endif
import Control.DeepSeq
import Control.Monad
import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Concurrent.CHP.Connect
data ProcessPipelineLabel a b = ProcessPipelineLabel
{ runPipelineLabel :: Chanin a -> Chanout b -> CHP ()
, _pipelineLabels :: (String, String)
}
arrowProcessLabel :: String -> (Chanin a -> Chanout b -> CHP ()) -> ProcessPipelineLabel a b
arrowProcessLabel l p = ProcessPipelineLabel p (l, l)
arrLabel :: String -> (a -> b) -> ProcessPipelineLabel a b
arrLabel l = arrowProcessLabel l . CHP.map
arrStrictLabel :: NFData b => String -> (a -> b) -> ProcessPipelineLabel a b
arrStrictLabel l = arrowProcessLabel l . CHP.map'
(*>>>*) :: Show b => ProcessPipelineLabel a b -> ProcessPipelineLabel b c
-> ProcessPipelineLabel a c
(*>>>*) (ProcessPipelineLabel p (pl, pr)) (ProcessPipelineLabel q (ql, qr))
= ProcessPipelineLabel (connectWith (chanLabel $ pr ++ "->" ++ ql) p q) (pl, qr)
(*<<<*) :: Show b => ProcessPipelineLabel b c -> ProcessPipelineLabel a b
-> ProcessPipelineLabel a c
(*<<<*) = flip (*>>>*)
(*&&&*) :: (Show b, Show c, Show c') => ProcessPipelineLabel b c -> ProcessPipelineLabel b c' -> ProcessPipelineLabel b (c, c')
(*&&&*) (ProcessPipelineLabel p (pl, pr))
(ProcessPipelineLabel q (ql, qr))
= ProcessPipelineLabel proc (mix pl ql, mix pr qr)
where
mix a b = "(" ++ a ++ "*&&&*" ++ b ++ ")"
proc input output
= do deltaP <- oneToOneChannel' $ chanLabel $ pl ++ ".in"
deltaQ <- oneToOneChannel' $ chanLabel $ ql ++ ".in"
joinP <- oneToOneChannel' $ chanLabel $ pr ++ ".out"
joinQ <- oneToOneChannel' $ chanLabel $ qr ++ ".out"
runParallel_
[CHP.parDelta input (writers [deltaP, deltaQ])
,p (reader deltaP) (writer joinP)
,q (reader deltaQ) (writer joinQ)
,CHP.join (,) (reader joinP) (reader joinQ) output
]
(*****) :: (Show b, Show b', Show c, Show c') => ProcessPipelineLabel b c -> ProcessPipelineLabel b' c'
-> ProcessPipelineLabel (b, b') (c, c')
(*****) (ProcessPipelineLabel p (pl, pr))
(ProcessPipelineLabel q (ql, qr))
= ProcessPipelineLabel proc (mix pl ql, mix pr qr)
where
mix a b = "(" ++ a ++ "*****" ++ b ++ ")"
proc input output
= do deltaP <- oneToOneChannel' $ chanLabel $ mix pl ql ++ "->" ++ pl
deltaQ <- oneToOneChannel' $ chanLabel $ mix pl ql ++ "->" ++ ql
joinP <- oneToOneChannel' $ chanLabel $ pr ++ "->" ++ mix pr qr
joinQ <- oneToOneChannel' $ chanLabel $ qr ++ "->" ++ mix pr qr
runParallel_
[CHP.split input (writer deltaP) (writer deltaQ)
,p (reader deltaP) (writer joinP)
,q (reader deltaQ) (writer joinQ)
,CHP.join (,) (reader joinP) (reader joinQ) output
]
data ProcessPipeline a b = ProcessPipeline
{ runPipeline :: Chanin a -> Chanout b -> CHP ()
}
arrowProcess :: (Chanin a -> Chanout b -> CHP ()) -> ProcessPipeline a b
arrowProcess = ProcessPipeline
arrStrict :: NFData b => (a -> b) -> ProcessPipeline a b
arrStrict = ProcessPipeline . CHP.map'
instance Functor (ProcessPipeline a) where
fmap f x = x >>> arr f
#if __GLASGOW_HASKELL__ >= 609
instance Category ProcessPipeline where
(ProcessPipeline q) . (ProcessPipeline p) = ProcessPipeline (p <=> q)
id = ProcessPipeline CHP.id
#endif
instance Arrow ProcessPipeline where
#if __GLASGOW_HASKELL__ < 609
(ProcessPipeline p) >>> (ProcessPipeline q) = ProcessPipeline (p <=> q)
#endif
arr = ProcessPipeline . CHP.map
first (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
c <- newChannel
c' <- newChannel
d <- newChannel
runParallel_
[ CHP.split in_ (writer c) (writer d)
, p (reader c) (writer c')
, CHP.join (,) (reader c') (reader d) out
]
second (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
c <- newChannel
c' <- newChannel
d <- newChannel
runParallel_
[ CHP.split in_ (writer d) (writer c)
, p (reader c) (writer c')
, CHP.join (,) (reader d) (reader c') out
]
(ProcessPipeline p) *** (ProcessPipeline q) = ProcessPipeline $ \in_ out -> do
c <- newChannel
c' <- newChannel
d <- newChannel
d' <- newChannel
runParallel_
[ CHP.split in_ (writer c) (writer d)
, p (reader c) (writer c')
, q (reader d) (writer d')
, CHP.join (,) (reader c') (reader d') out
]
(ProcessPipeline p) &&& (ProcessPipeline q) = ProcessPipeline $ \in_ out -> do
c <- newChannel
c' <- newChannel
d <- newChannel
d' <- newChannel
runParallel_
[ CHP.parDelta in_ [writer c, writer d]
, p (reader c) (writer c')
, q (reader d) (writer d')
, CHP.join (,) (reader c') (reader d') out
]
instance ArrowChoice ProcessPipeline where
left (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
c <- oneToOneChannel
d <- oneToOneChannel
(forever $ do x <- readChannel in_
case x of
Left l -> do writeChannel (writer c) l
l' <- readChannel (reader d)
writeChannel out (Left l')
Right r -> writeChannel out (Right r)
) <|*|> p (reader c) (writer d)
return ()
right (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
c <- oneToOneChannel
d <- oneToOneChannel
(forever $ do x <- readChannel in_
case x of
Right r -> do writeChannel (writer c) r
r' <- readChannel (reader d)
writeChannel out (Right r')
Left l -> writeChannel out (Left l)
) <|*|> p (reader c) (writer d)
return ()
(ProcessPipeline p) ||| (ProcessPipeline q)
= ProcessPipeline $ \in_ out -> do
c <- oneToOneChannel
c' <- oneToOneChannel
d <- oneToOneChannel
d' <- oneToOneChannel
runParallel_
[ forever $ do x <- readChannel in_
x' <- case x of
Left l -> do writeChannel (writer c) l
readChannel (reader c')
Right r -> do writeChannel (writer d) r
readChannel (reader d')
writeChannel out x'
, p (reader c) (writer c')
, q (reader d) (writer d')
]
(ProcessPipeline p) +++ (ProcessPipeline q)
= ProcessPipeline $ \in_ out -> do
c <- oneToOneChannel
c' <- oneToOneChannel
d <- oneToOneChannel
d' <- oneToOneChannel
runParallel_
[ forever $ do x <- readChannel in_
x' <- case x of
Left l -> do writeChannel (writer c) l
l' <- readChannel (reader c')
return (Left l')
Right r -> do writeChannel (writer d) r
r' <- readChannel (reader d')
return (Right r')
writeChannel out x'
, p (reader c) (writer c')
, q (reader d) (writer d')
]