module Data.Repa.Flow.Chunked.Base
( Sources, Sinks
, Flow
, Data.Repa.Flow.Chunked.Base.fromList
, fromLists
, toList1
, toLists1
, head_i
, finalize_i, finalize_o)
where
import Data.Repa.Flow.States as F
import Data.Repa.Array.Generic as A
import Data.Repa.Array.Generic.Index as A
import Data.Repa.Array.Meta.Window as A
import Control.Monad
import qualified Data.Repa.Flow.Generic as G
import qualified Data.Sequence as Q
import qualified Data.Foldable as D
import Prelude as P
#include "repa-flow.h"
type Sources i m l e
= G.Sources i m (A.Array l e)
type Sinks i m l e
= G.Sinks i m (A.Array l e)
type Flow i m l a
= (Ord i, Monad m, BulkI l a, States i m)
fromList :: (States i m, A.TargetI l a)
=> Name l -> i -> [a] -> m (Sources i m l a)
fromList nDst n xs
= G.fromList n [A.fromList nDst xs]
fromLists :: (States i m, A.TargetI l a)
=> Name l -> i -> [[a]] -> m (Sources i m l a)
fromLists nDst n xs
= G.fromList n $ P.map (A.fromList nDst) xs
toList1 :: (States i m, A.BulkI l a)
=> i -> Sources i m l a -> m [a]
toList1 i sources
= do chunks <- G.toList1 i sources
return $ P.concat $ P.map A.toList chunks
toLists1 :: (States i m, A.BulkI l a)
=> i -> Sources i m l a -> m [[a]]
toLists1 i sources
= do chunks <- G.toList1 i sources
return $ P.map A.toList chunks
head_i :: (States i m, A.Windowable l a, A.Index l ~ Int)
=> Int -> Sources i m l a -> i -> m ([a], Sources i m l a)
head_i len s0 i
= do
(s1, s2) <- G.connect_i s0
let G.Sources n pull_chunk = s1
refsList <- newRefs n Q.empty
refsChunk <- newRefs n Nothing
let loop_takeList1 !has !acc !mchunk
| has >= len
= do writeRefs refsList i acc
writeRefs refsChunk i mchunk
| otherwise
= pull_chunk i eat_toList eject_toList
where
eat_toList x
= loop_takeList1
(has + A.length x)
(acc Q.>< (Q.fromList $ A.toList x))
(Just x)
eject_toList
= do writeRefs refsList i acc
writeRefs refsChunk i mchunk
loop_takeList1 0 Q.empty Nothing
has <- readRefs refsList i
mFinal <- readRefs refsChunk i
let (here, rest) = Q.splitAt len has
let start = Q.length has Q.length rest
let stash = case mFinal of
Nothing -> []
Just c -> [A.window start (Q.length rest) c]
s2' <- G.prependOn_i (\i' -> i' == i) stash s2
return (D.toList here, s2')
finalize_i
:: States i m
=> (i -> m ())
-> Sources i m l a -> m (Sources i m l a)
finalize_i = G.finalize_i
finalize_o
:: States i m
=> (i -> m ())
-> Sinks i m l a -> m (Sinks i m l a)
finalize_o = G.finalize_o