\section{RSAGL.ThreadedArrow} A ThreadedArrow is an extension of the SwitchedArrow. In addition to switching, ThreadedArrow threads can spawn new threads and terminate themselves. All threads recieve the same input. ThreadedArrow also supports a model of thread management that allows an arbitrary, external function to view all threads and cull unwanted threads either before they begin or sometime after, and even control the order in which threads exectute. While the ThreadedArrow has conceptual similarities to the threading systems provided by an operating system, it provides neither parallelism nor concurency. \begin{code} {-# OPTIONS_GHC -farrows -fglasgow-exts #-} module RSAGL.ThreadedArrow (ThreadIdentity, nullaryThreadIdentity, maybeThreadIdentity, unionThreadIdentity, ThreadedFunction, ThreadedArrow, RSAGL.ThreadedArrow.switchContinue, RSAGL.ThreadedArrow.switchTerminate, spawnThreads, killThreadIf, threadIdentity, RSAGL.ThreadedArrow.statefulForm) where import Control.Arrow import Control.Arrow.Operations import Control.Arrow.Transformer import Control.Arrow.Transformer.State import RSAGL.SwitchedArrow as SwitchedArrow import RSAGL.StatefulArrow as StatefulArrow import Data.Maybe import Data.List type ThreadIdentity t = forall i x o. i -> [(t,x)] -> [(t,(o,x))] -> [x] nullaryThreadIdentity :: ThreadIdentity () nullaryThreadIdentity _ news olds = map (snd . snd) olds ++ map snd news maybeThreadIdentity :: ThreadIdentity t -> ThreadIdentity (Maybe t) maybeThreadIdentity manageThreads i news olds = manageThreads i (justs news) (justs olds) ++ nothings where nothings = map snd (filter (isNothing . fst) news) ++ map (snd . snd) (filter (isNothing . fst) olds) justs = map (first fromJust) . filter (isJust . fst) unionThreadIdentity :: (t -> t -> Bool) -> ThreadIdentity t unionThreadIdentity predicate _ news olds_ = map snd $ unionBy (\x y -> fst x `predicate` fst y) olds news where olds = map (second snd) olds_ newtype ThreadedArrow t i o a j p = ThreadedArrow (SwitchedArrow i (Maybe o) (StateArrow (t,[(t,ThreadedArrow t i o a i (Maybe o))]) a) j p) type ThreadedFunction i o j p = ThreadedArrow () i o (->) j p instance (ArrowChoice a) => Arrow (ThreadedArrow t i o a) where (>>>) (ThreadedArrow ta1) (ThreadedArrow ta2) = ThreadedArrow $ ta1 >>> ta2 arr = ThreadedArrow . arr first (ThreadedArrow f) = ThreadedArrow $ first f instance (Arrow a,ArrowChoice a) => ArrowTransformer (ThreadedArrow t i o) a where lift = ThreadedArrow . lift . lift instance (ArrowChoice a) => ArrowChoice (ThreadedArrow t i o a) where left (ThreadedArrow a) = ThreadedArrow $ left a instance (ArrowChoice a,ArrowApply a) => ArrowApply (ThreadedArrow t i o a) where app = ThreadedArrow $ proc (ThreadedArrow a,b) -> app -< (a,b) statefulForm :: (ArrowChoice a,ArrowApply a) => (forall x. i -> [(t,x)] -> [(t,(o,x))] -> [x]) -> [(t,ThreadedArrow t i o a i o)] -> StatefulArrow a i [(t,o)] statefulForm manageThreads initial_threads = flip stateContext (map (second (>>> arr Just)) initial_threads) $ proc i -> do threads <- fetch -< () olds <- lift (runThreadsLoop manageThreads) -< (i,[],threads) store -< map (second snd) olds returnA -< map (second fst) olds manageOldNewThreads :: (forall ex. i -> [(t,ex)] -> [(t,(o,ex))] -> [ex]) -> i -> [(t,x)] -> [(t,(o,x))] -> ([(t,x)],[(t,(o,x))]) manageOldNewThreads manageThreads i news_ olds_ = (concat *** concat) $ unzip $ map (either (\l -> ([l],[])) (\r -> ([],[r]))) result where news = map (\x -> second (const $ Left x) x) news_ olds = map (\x -> second (second $ const $ Right x) x) olds_ result = manageThreads i news olds runThreadsLoop :: (ArrowChoice a,ArrowApply a) => (forall x. i -> [(t,x)] -> [(t,(o,x))] -> [x]) -> a (i,[(t,(o,ThreadedArrow t i o a i (Maybe o)))],[(t,ThreadedArrow t i o a i (Maybe o))]) [(t,(o,ThreadedArrow t i o a i (Maybe o)))] runThreadsLoop manageThreads = proc (i,finished_threads,current_threads) -> do (freshly_finished_threads,(_,freshly_spawned_threads)) <- runState runThreads -< ((i,current_threads),(error "runThreadsLoop: undefined thread identity",[])) let (managed_spawned_threads,managed_finished_threads) = manageOldNewThreads manageThreads i freshly_spawned_threads (freshly_finished_threads ++ finished_threads) if null managed_spawned_threads then returnA -< managed_finished_threads else runThreadsLoop manageThreads -< (i,managed_finished_threads,managed_spawned_threads) runThreads :: (ArrowChoice a,ArrowApply a) => StateArrow (t,[(t,ThreadedArrow t i o a i (Maybe o))]) a (i,[(t,ThreadedArrow t i o a i (Maybe o))]) [(t,(o,ThreadedArrow t i o a i (Maybe o)))] runThreads = proc (i,threads) -> do case threads of [] -> returnA -< [] ((ident,ThreadedArrow switchedA):rest_in) -> do x <- fetch -< () store -< first (const ident) x (m_o,newA) <- app -< (runStateful switchedA,i) rest_out <- runThreads -< (i,rest_in) returnA -< (maybe id (\o -> ((ident,(o,ThreadedArrow newA)) :)) m_o) rest_out switchContinue :: (Arrow a,ArrowChoice a,ArrowApply a) => ThreadedArrow t i o a (Maybe (ThreadedArrow t i o a i o),i) i switchContinue = (arr $ first $ fmap (\(ThreadedArrow thread) -> thread >>> arr Just)) >>> (ThreadedArrow SwitchedArrow.switchContinue) switchTerminate :: (Arrow a,ArrowChoice a) => ThreadedArrow t i o a (Maybe (ThreadedArrow t i o a i o),o) o switchTerminate = proc (m_thread,o) -> do ThreadedArrow $ SwitchedArrow.switchTerminate -< (fmap (\(ThreadedArrow thread) -> thread >>> arr Just) m_thread,Just o) returnA -< o spawnThreads :: (Arrow a,ArrowChoice a) => ThreadedArrow t i o a [(t,ThreadedArrow t i o a i o)] () spawnThreads = ThreadedArrow $ lift $ proc new_spawned -> do x <- fetch -< () store -< second (map (second (>>> arr Just)) new_spawned ++) x returnA -< () killThreadIf :: (Arrow a,ArrowChoice a,ArrowApply a) => ThreadedArrow t i o a Bool () killThreadIf = proc b -> do ThreadedArrow SwitchedArrow.switchContinue -< (if b then (Just $ arr (const Nothing)) else Nothing,error "ThreadedArrow.killThreadIf: this thread has been killed") returnA -< () threadIdentity :: (ArrowChoice a) => ThreadedArrow t i o a () t threadIdentity = arr fst <<< ThreadedArrow (lift fetch) \end{code}