\section{RSAGL.ThreadedArrow}
A ThreadedArrow is an extension of the SwitchedArrow. In addition to switching, ThreadedArrow threads can spawn new threads and
terminate themselves. All threads recieve the same input.
ThreadedArrow also supports a model of thread management that allows an arbitrary, external function to view all threads and cull unwanted threads
either before they begin or sometime after, and even control the order in which threads exectute.
While the ThreadedArrow has conceptual similarities to the threading systems provided by an operating system,
it provides neither parallelism nor concurency.
\begin{code}
module RSAGL.ThreadedArrow
(ThreadIdentity,
nullaryThreadIdentity,
maybeThreadIdentity,
unionThreadIdentity,
ThreadedFunction,
ThreadedArrow,
RSAGL.ThreadedArrow.switchContinue,
RSAGL.ThreadedArrow.switchTerminate,
spawnThreads,
killThreadIf,
threadIdentity,
RSAGL.ThreadedArrow.statefulForm)
where
import Control.Arrow
import Control.Arrow.Operations
import Control.Arrow.Transformer
import Control.Arrow.Transformer.State
import RSAGL.SwitchedArrow as SwitchedArrow
import RSAGL.StatefulArrow as StatefulArrow
import Data.Maybe
import Data.List
type ThreadIdentity t = forall i x o. i -> [(t,x)] -> [(t,(o,x))] -> [x]
nullaryThreadIdentity :: ThreadIdentity ()
nullaryThreadIdentity _ news olds = map (snd . snd) olds ++ map snd news
maybeThreadIdentity :: ThreadIdentity t -> ThreadIdentity (Maybe t)
maybeThreadIdentity manageThreads i news olds = manageThreads i (justs news) (justs olds) ++ nothings
where nothings = map snd (filter (isNothing . fst) news) ++ map (snd . snd) (filter (isNothing . fst) olds)
justs = map (first fromJust) . filter (isJust . fst)
unionThreadIdentity :: (t -> t -> Bool) -> ThreadIdentity t
unionThreadIdentity predicate _ news olds_ = map snd $ unionBy (\x y -> fst x `predicate` fst y) olds news
where olds = map (second snd) olds_
newtype ThreadedArrow t i o a j p = ThreadedArrow (SwitchedArrow i (Maybe o) (StateArrow (t,[(t,ThreadedArrow t i o a i (Maybe o))]) a) j p)
type ThreadedFunction i o j p = ThreadedArrow () i o (->) j p
instance (ArrowChoice a) => Arrow (ThreadedArrow t i o a) where
(>>>) (ThreadedArrow ta1) (ThreadedArrow ta2) = ThreadedArrow $ ta1 >>> ta2
arr = ThreadedArrow . arr
first (ThreadedArrow f) = ThreadedArrow $ first f
instance (Arrow a,ArrowChoice a) => ArrowTransformer (ThreadedArrow t i o) a where
lift = ThreadedArrow . lift . lift
instance (ArrowChoice a) => ArrowChoice (ThreadedArrow t i o a) where
left (ThreadedArrow a) = ThreadedArrow $ left a
instance (ArrowChoice a,ArrowApply a) => ArrowApply (ThreadedArrow t i o a) where
app = ThreadedArrow $ proc (ThreadedArrow a,b) -> app -< (a,b)
statefulForm :: (ArrowChoice a,ArrowApply a) => (forall x. i -> [(t,x)] -> [(t,(o,x))] -> [x]) -> [(t,ThreadedArrow t i o a i o)] -> StatefulArrow a i [(t,o)]
statefulForm manageThreads initial_threads = flip stateContext (map (second (>>> arr Just)) initial_threads) $ proc i ->
do threads <- fetch -< ()
olds <- lift (runThreadsLoop manageThreads) -< (i,[],threads)
store -< map (second snd) olds
returnA -< map (second fst) olds
manageOldNewThreads :: (forall ex. i -> [(t,ex)] -> [(t,(o,ex))] -> [ex]) -> i -> [(t,x)] -> [(t,(o,x))] -> ([(t,x)],[(t,(o,x))])
manageOldNewThreads manageThreads i news_ olds_ = (concat *** concat) $ unzip $ map (either (\l -> ([l],[])) (\r -> ([],[r]))) result
where news = map (\x -> second (const $ Left x) x) news_
olds = map (\x -> second (second $ const $ Right x) x) olds_
result = manageThreads i news olds
runThreadsLoop :: (ArrowChoice a,ArrowApply a) =>
(forall x. i -> [(t,x)] -> [(t,(o,x))] -> [x]) ->
a (i,[(t,(o,ThreadedArrow t i o a i (Maybe o)))],[(t,ThreadedArrow t i o a i (Maybe o))])
[(t,(o,ThreadedArrow t i o a i (Maybe o)))]
runThreadsLoop manageThreads = proc (i,finished_threads,current_threads) ->
do (freshly_finished_threads,(_,freshly_spawned_threads)) <- runState runThreads -<
((i,current_threads),(error "runThreadsLoop: undefined thread identity",[]))
let (managed_spawned_threads,managed_finished_threads) = manageOldNewThreads manageThreads i
freshly_spawned_threads
(freshly_finished_threads ++ finished_threads)
if null managed_spawned_threads
then returnA -< managed_finished_threads
else runThreadsLoop manageThreads -< (i,managed_finished_threads,managed_spawned_threads)
runThreads :: (ArrowChoice a,ArrowApply a) => StateArrow (t,[(t,ThreadedArrow t i o a i (Maybe o))]) a
(i,[(t,ThreadedArrow t i o a i (Maybe o))])
[(t,(o,ThreadedArrow t i o a i (Maybe o)))]
runThreads = proc (i,threads) ->
do case threads of
[] -> returnA -< []
((ident,ThreadedArrow switchedA):rest_in) ->
do x <- fetch -< ()
store -< first (const ident) x
(m_o,newA) <- app -< (runStateful switchedA,i)
rest_out <- runThreads -< (i,rest_in)
returnA -< (maybe id (\o -> ((ident,(o,ThreadedArrow newA)) :)) m_o) rest_out
switchContinue :: (Arrow a,ArrowChoice a,ArrowApply a) => ThreadedArrow t i o a (Maybe (ThreadedArrow t i o a i o),i) i
switchContinue = (arr $ first $ fmap (\(ThreadedArrow thread) -> thread >>> arr Just)) >>> (ThreadedArrow SwitchedArrow.switchContinue)
switchTerminate :: (Arrow a,ArrowChoice a) => ThreadedArrow t i o a (Maybe (ThreadedArrow t i o a i o),o) o
switchTerminate = proc (m_thread,o) ->
do ThreadedArrow $ SwitchedArrow.switchTerminate -< (fmap (\(ThreadedArrow thread) -> thread >>> arr Just) m_thread,Just o)
returnA -< o
spawnThreads :: (Arrow a,ArrowChoice a) => ThreadedArrow t i o a [(t,ThreadedArrow t i o a i o)] ()
spawnThreads = ThreadedArrow $ lift $ proc new_spawned ->
do x <- fetch -< ()
store -< second (map (second (>>> arr Just)) new_spawned ++) x
returnA -< ()
killThreadIf :: (Arrow a,ArrowChoice a,ArrowApply a) => ThreadedArrow t i o a Bool ()
killThreadIf = proc b ->
do ThreadedArrow SwitchedArrow.switchContinue -< (if b then (Just $ arr (const Nothing)) else Nothing,error "ThreadedArrow.killThreadIf: this thread has been killed")
returnA -< ()
threadIdentity :: (ArrowChoice a) => ThreadedArrow t i o a () t
threadIdentity = arr fst <<< ThreadedArrow (lift fetch)
\end{code}