module Data.Conduit.Vector
(
sourceVector,
sourceMVector,
consumeVector,
consumeMVector,
takeVector,
takeMVector,
thawConduit,
freezeConduit
)
where
import Control.Monad.Primitive
import Control.Monad.ST
import Data.Conduit
import qualified Data.Conduit.List as L
import Data.Conduit.Util
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as M
import qualified Data.Vector.Fusion.Stream as S
import qualified Data.Vector.Fusion.Stream.Monadic as SM
sourceVector :: (Monad m, V.Vector v a) => v a -> Source m a
sourceVector vec = sourceState (V.stream vec) f
where f stream | S.null stream = return StateClosed
| otherwise = return $ StateOpen (S.tail stream) (S.head stream)
sourceMVector :: (PrimMonad m, M.MVector v a)
=> v (PrimState m) a
-> Source m a
sourceMVector vec = sourceState (M.mstream vec) f
where f stream = do isNull <- SM.null stream
if isNull
then return StateClosed
else do x <- SM.head stream
return $ StateOpen (SM.tail stream) x
consumeVector :: (PrimMonad m, V.Vector v a)
=> Sink a m (v a)
consumeVector = sinkState (Nothing, 0) push close
where push (v, index) x = do v' <- case v of
Nothing -> M.new 10
Just vec -> return vec
let len = M.length v'
v'' <- if index >= len
then M.grow v' len
else return v'
M.write v'' index x
return $ StateProcessing (Just v'', index + 1)
close (Nothing, index) = return $ V.fromList []
close (Just v, index) = V.unsafeFreeze $ M.take index v
takeVector :: (PrimMonad m, V.Vector v a)
=> Int -> Sink a m (v a)
takeVector n = sinkState (Nothing, 0) push close
where push (v, index) x = do
v' <- case v of
Nothing -> M.new n
Just vec -> return vec
if index >= n
then do v'' <- V.unsafeFreeze v'
return $ StateDone Nothing v''
else do M.write v' index x
return $ StateProcessing (Just v', index + 1)
close (Nothing, index) = return $ V.fromList []
close (Just v, index) = V.unsafeFreeze v
consumeMVector :: (PrimMonad m, M.MVector v a)
=> Sink a m (v (PrimState m) a)
consumeMVector = sinkState (Nothing, 0) push close
where push (v, index) x = do v' <- case v of
Nothing -> M.new 10
Just vec -> return vec
let len = M.length v'
v'' <- if index >= len
then M.grow v' len
else return v'
M.write v'' index x
return $ StateProcessing (Just v'', index + 1)
close (Nothing, index) = M.new 0
close (Just v, index) = return $ M.take index v
takeMVector :: (PrimMonad m, M.MVector v a)
=> Int -> Sink a m (v (PrimState m) a)
takeMVector n = sinkState (Nothing, 0) push close
where push (v, index) x =
do v' <- case v of
Nothing -> M.new n
Just vec -> return vec
if index >= n
then return $ StateDone Nothing v'
else do M.write v' index x
return $ StateProcessing (Just v', index + 1)
close (Nothing, index) = M.new 0
close (Just v, index) = return v
thawConduit :: (PrimMonad m, V.Vector v a)
=> Conduit (v a) m (V.Mutable v (PrimState m) a)
thawConduit = L.mapM V.unsafeThaw
freezeConduit :: (PrimMonad m, V.Vector v a)
=> Conduit (V.Mutable v (PrimState m) a) m (v a)
freezeConduit = L.mapM V.unsafeFreeze