module Control.Concurrent.SCC.ComponentTypes
(
Component (..), BranchComponent (combineBranches), LiftableComponent (liftComponent), Container (..),
AnyComponent (AnyComponent), Performer (..), Consumer (..), Producer(..), Splitter(..), Transducer(..),
ComponentConfiguration(..), Boundary(..), Markup(..), Parser,
liftPerformer, liftConsumer, liftAtomicConsumer, liftProducer, liftAtomicProducer,
liftTransducer, liftAtomicTransducer, lift121Transducer, liftStatelessTransducer, liftFoldTransducer, liftStatefulTransducer,
liftSplitter, liftAtomicSplitter, liftStatelessSplitter, liftStatefulSplitter,
showComponentTree, optimalTwoParallelConfigurations, optimalTwoSequentialConfigurations, optimalThreeParallelConfigurations,
splitToConsumers, splitInputToConsumers
)
where
import Control.Concurrent.SCC.Foundation
import Control.Monad (liftM, when)
import Data.List (minimumBy)
import Data.Maybe
import Data.Typeable (Typeable, cast)
data AnyComponent = forall a. Component a => AnyComponent a
class Component c where
name :: c -> String
subComponents :: c -> [AnyComponent]
maxUsableThreads :: c -> Int
usingThreads :: Int -> c -> c
usedThreads :: c -> Int
cost :: c -> Int
cost c = 1 + sum (map cost (subComponents c))
instance Component AnyComponent where
name (AnyComponent c) = name c
subComponents (AnyComponent c) = subComponents c
maxUsableThreads (AnyComponent c) = maxUsableThreads c
usingThreads n (AnyComponent c) = AnyComponent (usingThreads n c)
usedThreads (AnyComponent c) = usedThreads c
cost (AnyComponent c) = cost c
showComponentTree :: forall c. Component c => c -> String
showComponentTree c = showIndentedComponent 1 c
showIndentedComponent :: forall c. Component c => Int -> c -> String
showIndentedComponent depth c = showRightAligned 4 (cost c) ++ showRightAligned 3 (usedThreads c) ++ replicate depth ' '
++ name c ++ "\n"
++ concatMap (showIndentedComponent (succ depth)) (subComponents c)
showRightAligned :: Show x => Int -> x -> String
showRightAligned width x = let str = show x
in replicate (width length str) ' ' ++ str
data ComponentConfiguration = ComponentConfiguration {componentChildren :: [AnyComponent],
componentThreads :: Int,
componentCost :: Int}
data Performer m r = Performer {performerName :: String,
performerMaxThreads :: Int,
performerConfiguration :: ComponentConfiguration,
performerUsingThreads :: Int -> (ComponentConfiguration, forall c. Pipe c m r),
perform :: forall c. Pipe c m r}
data Consumer m x r = Consumer {consumerName :: String,
consumerMaxThreads :: Int,
consumerConfiguration :: ComponentConfiguration,
consumerUsingThreads :: Int -> (ComponentConfiguration, forall c. Source c x -> Pipe c m r),
consume :: forall c. Source c x -> Pipe c m r}
data Producer m x r = Producer {producerName :: String,
producerMaxThreads :: Int,
producerConfiguration :: ComponentConfiguration,
producerUsingThreads :: Int -> (ComponentConfiguration, forall c. Sink c x -> Pipe c m r),
produce :: forall c. Sink c x -> Pipe c m r}
data Transducer m x y = Transducer {transducerName :: String,
transducerMaxThreads :: Int,
transducerConfiguration :: ComponentConfiguration,
transducerUsingThreads :: Int -> (ComponentConfiguration,
forall c. Source c x -> Sink c y -> Pipe c m [x]),
transduce :: forall c. Source c x -> Sink c y -> Pipe c m [x]}
data Splitter m x b = Splitter {splitterName :: String,
splitterMaxThreads :: Int,
splitterConfiguration :: ComponentConfiguration,
splitterUsingThreads :: Int -> (ComponentConfiguration,
forall c. Source c x -> Sink c x -> Sink c x -> Sink c b
-> Pipe c m [x]),
split :: forall c. Source c x -> Sink c x -> Sink c x -> Sink c b -> Pipe c m [x]}
data Boundary y = Start y | End y | Point y deriving (Eq, Show, Typeable)
data Markup x y = Content x | Markup (Boundary y) deriving (Eq, Typeable)
type Parser m x b = Transducer m x (Markup x b)
instance Functor Boundary where
fmap f (Start b) = Start (f b)
fmap f (End b) = End (f b)
fmap f (Point b) = Point (f b)
instance (Show y) => Show (Markup Char y) where
showsPrec p (Content x) s = x : s
showsPrec p (Markup b) s = '[' : shows b (']' : s)
class Container x y where
unwrap :: ParallelizableMonad m => (Splitter m x (), Transducer m x y)
rewrap :: ParallelizableMonad m => Transducer m y x
instance (Typeable x, Typeable y) => Container (Markup x y) x where
unwrap = (liftStatelessSplitter "isContent" isContent, liftStatelessTransducer "unwrapContent" unwrapContent)
where isContent (Content x) = True
isContent _ = False
unwrapContent (Content x) = [x]
unwrapContent _ = []
rewrap = lift121Transducer "wrapContent" Content
class LiftableComponent cx cy x y | cx -> x, cy -> y, cx y -> cy, cy x -> cx where
liftComponent :: cy -> cx
instance forall m x y. (Container x y, ParallelizableMonad m, Typeable x, Typeable y)
=> LiftableComponent (Transducer m x x) (Transducer m y y) x y where
liftComponent t = liftTransducer "liftComponent" (maxUsableThreads t + maxUsableThreads (rewrap :: Transducer m y x)) $
\threads-> let (configuration, t', w', parallel) = optimalTwoParallelConfigurations threads t wrapper
(wrapper :: Splitter m x (), unwrap' :: Transducer m x y) = unwrap
tx source sink = liftM (const []) $
pipe
(\true-> pipe
(split w' source true sink)
consumeAndSuppress)
(\wrapped-> pipe
(transduce unwrap' wrapped)
(\unwrapped-> pipe
(transduce t' unwrapped)
(\out-> transduce rewrap out sink)))
in (configuration, tx)
instance forall m x y. (Container x y, ParallelizableMonad m, Typeable x, Typeable y)
=> LiftableComponent (Splitter m x ()) (Splitter m y ()) x y where
liftComponent splitter = liftSplitter "liftComponent" (maxUsableThreads splitter + maxUsableThreads (rewrap :: Transducer m y x)) $
\threads-> let (configuration, s', w', parallel) = optimalTwoParallelConfigurations threads splitter wrapper
(wrapper :: Splitter m x (), unwrap' :: Transducer m x y) = unwrap
split' :: forall c. Source c x -> Sink c x -> Sink c x -> Sink c () -> Pipe c m [x]
split' source true false edge
= liftM (fst . fst . fst) $
pipe
(\rewrappedTrue-> pipe
(\rewrappedFalse-> split'' source rewrappedTrue rewrappedFalse false edge)
(flip (transduce rewrap) false))
(flip (transduce rewrap) true)
split'' :: forall c. Source c x -> Sink c y -> Sink c y -> Sink c x -> Sink c () -> Pipe c m ([x], ([x], [y]))
split'' source true1 false1 false2 edge = pipe
(\sink-> split''' source sink false2 edge)
(\source-> pipe
(transduce unwrap' source)
(\source-> split s' source true1 false1 edge))
split''' :: forall c. Source c x -> Sink c x -> Sink c x -> Sink c ()
-> Pipe c m [x]
split''' source true false edge = split w' source true false edge
in (configuration, split')
instance Component (Performer m r) where
name = performerName
subComponents = componentChildren . performerConfiguration
maxUsableThreads = performerMaxThreads
usedThreads = componentThreads . performerConfiguration
usingThreads threads performer = let (configuration', perform' :: forall c. Pipe c m r) = performerUsingThreads performer threads
in performer{performerConfiguration= configuration', perform= perform'}
cost = componentCost . performerConfiguration
instance Component (Consumer m x r) where
name = consumerName
subComponents = componentChildren . consumerConfiguration
maxUsableThreads = consumerMaxThreads
usedThreads = componentThreads . consumerConfiguration
usingThreads threads consumer = let (configuration',
consume' :: forall c. Source c x -> Pipe c m r) = consumerUsingThreads consumer threads
in consumer{consumerConfiguration= configuration', consume= consume'}
cost = componentCost . consumerConfiguration
instance Component (Producer m x r) where
name = producerName
subComponents = componentChildren . producerConfiguration
maxUsableThreads = producerMaxThreads
usedThreads = componentThreads . producerConfiguration
usingThreads threads producer = let (configuration',
produce' :: forall c. Sink c x -> Pipe c m r) = producerUsingThreads producer threads
in producer{producerConfiguration= configuration', produce= produce'}
cost = componentCost . producerConfiguration
instance Component (Transducer m x y) where
name = transducerName
subComponents = componentChildren . transducerConfiguration
maxUsableThreads = transducerMaxThreads
usedThreads = componentThreads . transducerConfiguration
usingThreads threads transducer = let (configuration', transduce' :: forall c. Source c x -> Sink c y -> Pipe c m [x])
= transducerUsingThreads transducer threads
in transducer{transducerConfiguration= configuration', transduce= transduce'}
cost = componentCost . transducerConfiguration
instance Component (Splitter m x b) where
name = splitterName
subComponents = componentChildren . splitterConfiguration
maxUsableThreads = splitterMaxThreads
usedThreads = componentThreads . splitterConfiguration
usingThreads threads splitter = let (configuration',
split' :: forall c. Source c x -> Sink c x -> Sink c x -> Sink c b -> Pipe c m [x])
= splitterUsingThreads splitter threads
in splitter{splitterConfiguration= configuration',
split= split'}
cost = componentCost . splitterConfiguration
class BranchComponent cc m x r | cc -> m x where
combineBranches :: String -> Int
-> (forall c. Bool -> (Source c x -> Pipe c m r) -> (Source c x -> Pipe c m r) -> (Source c x -> Pipe c m r))
-> cc -> cc -> cc
instance forall m x r. Monad m => BranchComponent (Consumer m x r) m x r where
combineBranches name cost combinator c1 c2 = liftConsumer name 1 $
\threads-> (ComponentConfiguration [AnyComponent c1, AnyComponent c2] 1 cost,
combinator False (consume c1) (consume c2))
instance forall m x. Monad m => BranchComponent (Consumer m x ()) m x [x] where
combineBranches name cost combinator c1 c2 = liftConsumer name 1 $
\threads-> (ComponentConfiguration [AnyComponent c1, AnyComponent c2] 1 cost,
liftM (const ())
. combinator False
(\source-> consume c1 source >> return [])
(\source-> consume c2 source >> return []))
instance forall m x y. BranchComponent (Transducer m x y) m x [x] where
combineBranches name cost combinator t1 t2
= liftTransducer name (maxUsableThreads t1 + maxUsableThreads t2) $
\threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
transduce' source sink = combinator parallel
(\source-> transduce t1 source sink)
(\source-> transduce t2 source sink)
source
in (configuration, transduce')
instance forall m x b. (ParallelizableMonad m, Typeable x) => BranchComponent (Splitter m x b) m x [x] where
combineBranches name cost combinator s1 s2
= liftSplitter name (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
split' source true false edge = combinator parallel
(\source-> split s1 source true false edge)
(\source-> split s2 source true false edge)
source
in (configuration, split')
liftPerformer :: String -> Int -> (Int -> (ComponentConfiguration, forall c. Pipe c m r)) -> Performer m r
liftPerformer name maxThreads usingThreads = case usingThreads 1
of (configuration, perform) -> Performer name maxThreads configuration
usingThreads perform
liftConsumer :: String -> Int -> (Int -> (ComponentConfiguration, forall c. Source c x -> Pipe c m r)) -> Consumer m x r
liftConsumer name maxThreads usingThreads = case usingThreads 1
of (configuration, consume) -> Consumer name maxThreads configuration
usingThreads consume
liftProducer :: String -> Int -> (Int -> (ComponentConfiguration, forall c. Sink c x -> Pipe c m r)) -> Producer m x r
liftProducer name maxThreads usingThreads = case usingThreads 1
of (configuration, produce) -> Producer name maxThreads configuration
usingThreads produce
liftTransducer :: String -> Int -> (Int -> (ComponentConfiguration, forall c. Source c x -> Sink c y -> Pipe c m [x]))
-> Transducer m x y
liftTransducer name maxThreads usingThreads = case usingThreads 1
of (configuration, transduce) -> Transducer name maxThreads configuration
usingThreads transduce
liftAtomicConsumer :: String -> Int -> (forall c. Source c x -> Pipe c m r) -> Consumer m x r
liftAtomicConsumer name cost consume = liftConsumer name 1 (\_threads-> (ComponentConfiguration [] 1 cost, consume))
liftAtomicProducer :: String -> Int -> (forall c. Sink c x -> Pipe c m r) -> Producer m x r
liftAtomicProducer name cost produce = liftProducer name 1 (\_threads-> (ComponentConfiguration [] 1 cost, produce))
liftAtomicTransducer :: String -> Int -> (forall c. Source c x -> Sink c y -> Pipe c m [x]) -> Transducer m x y
liftAtomicTransducer name cost transduce = liftTransducer name 1 (\_threads-> (ComponentConfiguration [] 1 cost, transduce))
lift121Transducer :: (Monad m, Typeable x, Typeable y) => String -> (x -> y) -> Transducer m x y
lift121Transducer name f = liftAtomicTransducer name 1 $
\source sink-> let t = canPut sink
>>= flip when (getSuccess source (\x-> put sink (f x) >> t))
in t >> return []
liftStatelessTransducer :: (Monad m, Typeable x, Typeable y) => String -> (x -> [y]) -> Transducer m x y
liftStatelessTransducer name f = liftAtomicTransducer name 1 $
\source sink-> let t = canPut sink
>>= flip when (getSuccess source (\x-> putList (f x) sink >> t))
in t >> return []
liftFoldTransducer :: (Monad m, Typeable x, Typeable y) => String -> (s -> x -> s) -> s -> (s -> y) -> Transducer m x y
liftFoldTransducer name f s0 w = liftAtomicTransducer name 1 $
\source sink-> let t s = canPut sink
>>= flip when (get source
>>= maybe
(put sink (w s) >> return ())
(t . f s))
in t s0 >> return []
liftStatefulTransducer :: (Monad m, Typeable x, Typeable y) => String -> (state -> x -> (state, [y])) -> state -> Transducer m x y
liftStatefulTransducer name f s0 = liftAtomicTransducer name 1 $
\source sink-> let t s = canPut sink
>>= flip when (getSuccess source
(\x-> let (s', ys) = f s x
in putList ys sink >> t s'))
in t s0 >> return []
liftStatelessSplitter :: (ParallelizableMonad m, Typeable x) => String -> (x -> Bool) -> Splitter m x b
liftStatelessSplitter name f = liftAtomicSplitter name 1 $
\source true false edge->
let s = get source
>>= maybe
(return [])
(\x-> put (if f x then true else false) x
>>= cond s (return [x]))
in s
liftStatefulSplitter :: (ParallelizableMonad m, Typeable x) => String -> (state -> x -> (state, Bool)) -> state -> Splitter m x ()
liftStatefulSplitter name f s0 = liftAtomicSplitter name 1 $
\source true false edge->
let split s = get source
>>= maybe
(return [])
(\x-> let (s', truth) = f s x
in put (if truth then true else false) x
>>= cond (split s') (return [x]))
in split s0
liftSplitter :: forall m x b. (Monad m, Typeable x) =>
String -> Int
-> (Int -> (ComponentConfiguration, forall c. Source c x -> Sink c x -> Sink c x -> Sink c b -> Pipe c m [x]))
-> Splitter m x b
liftSplitter name maxThreads usingThreads = case usingThreads 1
of (configuration, split) -> Splitter name maxThreads configuration usingThreads split
liftAtomicSplitter :: forall m x b. (Monad m, Typeable x) =>
String -> Int -> (forall c. Source c x -> Sink c x -> Sink c x -> Sink c b -> Pipe c m [x])
-> Splitter m x b
liftAtomicSplitter name cost split = liftSplitter name 1 (\_threads-> (ComponentConfiguration [] 1 cost, split))
optimalTwoSequentialConfigurations :: (Component c1, Component c2) => Int -> c1 -> c2 -> (ComponentConfiguration, c1, c2)
optimalTwoSequentialConfigurations threads c1 c2 = (configuration, c1', c2')
where configuration = ComponentConfiguration
[AnyComponent c1', AnyComponent c2']
(usedThreads c1' `max` usedThreads c2')
(cost c1' + cost c2')
c1' = usingThreads threads c1
c2' = usingThreads threads c2
optimalTwoParallelConfigurations :: (Component c1, Component c2) => Int -> c1 -> c2 -> (ComponentConfiguration, c1, c2, Bool)
optimalTwoParallelConfigurations threads c1 c2 = (configuration, c1', c2', parallelize)
where parallelize = threads > 1 && parallelCost + 1 < sequentialCost
configuration = ComponentConfiguration
[AnyComponent c1', AnyComponent c2']
(if parallelize then usedThreads c1' + usedThreads c2' else usedThreads c1' `max` usedThreads c2')
(if parallelize then parallelCost + 1 else sequentialCost)
(c1', c2') = if parallelize then (c1p, c2p) else (c1s, c2s)
(c1p, c2p, parallelCost) = minimumBy
(\(_, _, cost1) (_, _, cost2)-> compare cost1 cost2)
[let c2threads = threads c1threads `min` maxUsableThreads c2
c1i = usingThreads c1threads c1
c2i = usingThreads c2threads c2
in (c1i, c2i, cost c1i `max` cost c2i)
| c1threads <- [1 .. threads 1 `min` maxUsableThreads c1]]
c1s = usingThreads threads c1
c2s = usingThreads threads c2
sequentialCost = cost c1s + cost c2s
optimalThreeParallelConfigurations :: (Component c1, Component c2, Component c3) =>
Int -> c1 -> c2 -> c3 -> (ComponentConfiguration, (c1, Bool), (c2, Bool), (c3, Bool))
optimalThreeParallelConfigurations threadCount c1 c2 c3 = undefined
splitToConsumers :: forall c m x b r1 r2 r3. (ParallelizableMonad m, Typeable x, Typeable b)
=> Splitter m x b -> Source c x -> (Source c x -> Pipe c m r1) -> (Source c x -> Pipe c m r2)
-> (Source c b -> Pipe c m r3) -> Pipe c m ([x], r1, r2, r3)
splitToConsumers s source trueConsumer falseConsumer edgeConsumer
= pipe
(\true-> pipe
(\false-> pipe
(split s source true false)
edgeConsumer)
falseConsumer)
trueConsumer
>>= \(((extra, r3), r2), r1)-> return (extra, r1, r2, r3)
splitInputToConsumers :: forall c m x b r1 r2. (ParallelizableMonad m, Typeable x, Typeable b)
=> Bool -> Splitter m x b -> Source c x -> (Source c x -> Pipe c m [x]) -> (Source c x -> Pipe c m [x])
-> Pipe c m [x]
splitInputToConsumers parallel s source trueConsumer falseConsumer
= pipe'
(\false-> pipe'
(\true-> pipe
(split s source true false)
consumeAndSuppress)
trueConsumer)
falseConsumer
>>= \(((extra, _), xs1), xs2)-> return (prependCommonPrefix xs1 xs2 extra)
where pipe' = if parallel then pipeP else pipe
prependCommonPrefix (x:xs) (y:ys) tail = x : prependCommonPrefix xs ys tail
prependCommonPrefix _ _ tail = tail