module Data.Repa.Flow.Generic.Operator
(
project_i
, project_o
, funnel_o
, repeat_i
, replicate_i
, prepend_i, prependOn_i
, head_i
, groups_i
, pack_ii
, folds_ii
, watch_i
, watch_o
, trigger_o
, capture_o
, rcapture_o
, ignore_o
, abandon_o
, trace_o)
where
import Data.Repa.Flow.Generic.Eval
import Data.Repa.Flow.Generic.List
import Data.Repa.Flow.Generic.Connect
import Data.Repa.Flow.Generic.Base
import Data.Repa.Array.Generic.Index as A
import Data.Repa.Array.Generic as A
import Data.IORef
import Control.Monad
import Debug.Trace
import GHC.Exts
import Prelude as P
#include "repa-flow.h"
project_i :: Monad m
=> i -> Sources i m a -> m (Sources () m a)
project_i ix (Sources _ pull)
= return $ Sources () pull_project
where pull_project _ eat eject
= pull ix eat eject
project_o :: Monad m
=> i -> Sinks i m a -> m (Sinks () m a)
project_o ix (Sinks _ push eject)
= return $ Sinks () push_project eject_project
where
push_project _ v = push ix v
eject_project _ = eject ix
repeat_i :: Monad m
=> i -> (i -> a)
-> m (Sources i m a)
repeat_i n f
= return $ Sources n pull_repeat
where pull_repeat i eat _eject
= eat (f i)
replicate_i
:: States i m
=> i -> Int -> (i -> a)
-> m (Sources i m a)
replicate_i n len f
= do
refs <- newRefs n 0
let pull_replicate i eat eject
= do !n' <- readRefs refs i
if n' >= len
then eject
else eat (f i)
return $ Sources n pull_replicate
prepend_i :: States i m
=> [a] -> Sources i m a -> m (Sources i m a)
prepend_i xs (Sources n pullX)
= do
refs <- newRefs n xs
let pull_prepend i eat eject
= do xs' <- readRefs refs i
case xs' of
x : xs'' -> do
writeRefs refs i xs''
eat x
[] -> pullX i eat eject
return (Sources n pull_prepend)
prependOn_i
:: States i m
=> (i -> Bool) -> [a] -> Sources i m a -> m (Sources i m a)
prependOn_i p xs (Sources n pullX)
= do
refs <- newRefs n xs
let pull_prependOn i eat eject
| p i
= do xs' <- readRefs refs i
case xs' of
x : xs'' -> do
writeRefs refs i xs''
eat x
[] -> pullX i eat eject
| otherwise
= pullX i eat eject
return (Sources n pull_prependOn)
head_i :: States i m
=> Int -> Sources i m a -> i -> m ([a], Sources i m a)
head_i len s0 i
= do
(s1, s2) <- connect_i s0
xs <- takeList1 len i s1
return (xs, s2)
groups_i
:: (Ord i, Monad m, Eq a)
=> Sources i m a -> m (Sources i m Int)
groups_i (Sources n pullV)
= return $ Sources n pull_n
where
pull_n i eat eject
= loop_groups Nothing 1#
where
loop_groups !mx !count
= pullV i eat_v eject_v
where eat_v v
= case mx of
Nothing -> loop_groups (Just v) count
Just x -> if x == v
then loop_groups (Just x) (count +# 1#)
else eat (I# count)
eject_v
= case mx of
Nothing -> eject
Just _ -> eat (I# count)
pack_ii :: (Ord i, Monad m)
=> Sources i m Bool -> Sources i m a -> m (Sources i m a)
pack_ii (Sources nF pullF) (Sources nX pullX)
= return $ Sources (min nF nX) pull_pack
where
pull_pack i eat eject
= pullF i eat_f eject_f
where eat_f f = pack_x f
eject_f = eject
pack_x f
= pullX i eat_x eject_x
where eat_x x = if f then eat x
else pull_pack i eat eject
eject_x = eject
folds_ii
:: (Ord i, Monad m)
=> (a -> a -> a) -> a
-> Sources i m Int
-> Sources i m a
-> m (Sources i m a)
folds_ii f z (Sources nL pullLen)
(Sources nX pullX)
= return $ Sources (min nL nX) pull_folds
where
pull_folds i eat eject
= pullLen i eat_len eject_len
where
eat_len (I# len) = loop_folds len z
eject_len = eject
loop_folds !c !acc
| tagToEnum# (c ==# 0#) = eat acc
| otherwise
= pullX i eat_x eject_x
where
eat_x x = loop_folds (c -# 1#) (f acc x)
eject_x = eject
watch_i :: Monad m
=> (i -> a -> m ())
-> Sources i m a -> m (Sources i m a)
watch_i f (Sources n pullX)
= return $ Sources n pull_watch
where
pull_watch i eat eject
= pullX i eat_watch eject_watch
where
eat_watch x = f i x >> eat x
eject_watch = eject
watch_o :: Monad m
=> (i -> a -> m ())
-> Sinks i m a -> m (Sinks i m a)
watch_o f (Sinks n push eject)
= return $ Sinks n push_watch eject_watch
where
push_watch !i !x = f i x >> push i x
eject_watch !i = eject i
capture_o
:: (Target lDst (i, a), Index lDst ~ Int)
=> Name lDst
-> i
-> (Sinks i IO a -> IO ())
-> IO (Array lDst (i, a))
capture_o nDst n use
= liftM fst $ rcapture_o nDst n use
rcapture_o
:: (Target lDst (i, a), Index lDst ~ Int)
=> Name lDst
-> i
-> (Sinks i IO a -> IO b)
-> IO (Array lDst (i, a), b)
rcapture_o nDst n use
= do
ref <- newIORef []
let capture_eat i x
= atomicModifyIORef ref (\old -> ((i, x) : old, ()))
k0 <- ignore_o n
k1 <- watch_o capture_eat k0
x <- use k1
result <- readIORef ref
let !arr = A.fromList nDst $ P.reverse result
return (arr, x)
trigger_o :: Monad m
=> i -> (i -> a -> m ()) -> m (Sinks i m a)
trigger_o i f
= ignore_o i >>= watch_o f
ignore_o :: Monad m
=> i -> m (Sinks i m a)
ignore_o n
= return $ Sinks n push_ignore eject_ignore
where
push_ignore !_ !_ = return ()
eject_ignore !_ = return ()
abandon_o :: Monad m
=> i -> m (Sinks i m a)
abandon_o n
= return $ Sinks n push_abandon eject_abandon
where
push_abandon _ _ = return ()
eject_abandon _ = return ()
trace_o :: (Show i, Show a, Monad m)
=> i -> m (Sinks i m a)
trace_o nSinks
= trigger_o nSinks eat
where
eat i x
= trace ("repa-flow trace_o: " ++ show i ++ "; " ++ show x)
(return ())