{-# LANGUAGE DeriveFunctor, Rank2Types, NoMonomorphismRestriction, Trustworthy #-} module Data.Columbia.SeekableStream (newTable, SeekableStream, seekableStream, hoistStream, getWord8, -- ** Stream forming functions makeIoStream, makeIoStreamChar, unshimmedIOStream, makeByteStringStream, makeGenericStream, -- ** Stream management functions _getPosition, _seek, _consumeToken, _consumeIntegralToken, _seekAtEnd, _isLockLive, getPosition, seek, consumeToken, consumeIntegralToken, seekAtEnd, isLockLive, relSeek, peekStream, streamToList) where import Foreign.Marshal.Utils import Foreign.Storable import Data.Word import Data.Maybe import Data.Char import Data.Bits import Data.IORef import System.IO import System.IO.Error import System.FileLock import Control.Monad.State import Control.Monad.Reader import Control.Monad.Loops import Control.Monad import Control.Exception import Control.Parallel.Strategies import qualified Data.ByteString as B import System.IO.Unsafe import Unsafe.Coerce import Data.Columbia.Mapper import Data.Columbia.MyEndianness data SeekableStream m c = SeekableStream { __consumeToken :: !(m(Maybe c)), __consumeIntegralToken :: !(m(Maybe Word32)), __seek :: !(Word32 -> m()), __getPosition :: !(m Word32), __seekAtEnd :: !(m()), __isLockLive :: !(m Bool) } deriving Functor seekableStream :: m(Maybe c) -> m(Maybe Word32) -> (Word32 -> m()) -> m Word32 -> m() -> m Bool -> SeekableStream m c seekableStream = SeekableStream hoistStream :: (forall t. m t -> m2 t) -> SeekableStream m c -> SeekableStream m2 c hoistStream f s = SeekableStream(f(_consumeToken s)) (f(_consumeIntegralToken s)) (\n -> f(_seek s n)) (f(_getPosition s)) (f(_seekAtEnd s)) (f(_isLockLive s)) {-# INLINE getWord8 #-} getWord8 :: Handle -> IO Word8 getWord8 h = with 0$ \p -> do n <- hGetBuf h p 1 when(n==0)$void$hGetChar h -- Cause EOF exception peek p data LockLike = LockLike !() !(IORef Bool) getLiveReference :: FileLock -> IORef Bool getLiveReference lock = let LockLike _ r = unsafeCoerce lock in r makeIoStream :: IORef(Pointer,Pointer) -> Table -> FileLock -> SeekableStream IO Word8 makeIoStream ref table lock = SeekableStream (readIORef ref>>= \(n,sz)->if n>=sz then return mzero else mapBlock table n>>=peek>>= \x->(writeIORef ref$!(succ n,sz))>>(return$!return x)) {- TODO: not complete yet: (readIORef ref>>= \(n,sz)->if n+3>=sz then return mzero else mapBlock table n>>=peek>>= \x->(writeIORef ref$!(n+4,sz))>>(return$!return$!swapEndian' x))-} (return$error"makeIoStream: unimplemented") (\n->modifyIORef' ref(\(_,sz)->(n,sz))) (liftM fst$readIORef ref) (modifyIORef' ref(\(_,sz)->(sz,sz))) (atomicModifyIORef'(getLiveReference lock)$ \b -> (b, b)) makeIoStreamChar :: IORef(Pointer,Pointer) -> Table -> FileLock -> SeekableStream IO Char makeIoStreamChar ref t = fmap(chr.fromIntegral).makeIoStream ref t bytesToIntegral x1 x2 x3 x4 = return$!shiftL(fromIntegral x1) 24 .|. shiftL(fromIntegral x2) 16 .|. shiftL(fromIntegral x3) 8 .|. fromIntegral x4 unshimmedIOStream :: Handle -> FileLock -> SeekableStream IO Word8 unshimmedIOStream handle lock = SeekableStream (liftM(return$!) (getWord8 handle)) (liftM4 bytesToIntegral(getWord8 handle) (getWord8 handle) (getWord8 handle) (getWord8 handle)) (hSeek handle AbsoluteSeek. toInteger) (liftM fromInteger(hTell handle)) (hSeek handle SeekFromEnd 0) (atomicModifyIORef'(getLiveReference lock)$ \b -> (b, b)) makeByteStringStream :: B.ByteString -> SeekableStream(State Word32) Word8 makeByteStringStream b = SeekableStream (do n <- get let n' = fromIntegral n if n' == B.length b then return mzero else do put$!succ n return$!return$!B.index b n') (return$error"makeByteStringStream: unimplemented") put get (put$!fromIntegral$B.length b) (return True) _fst3 (x, _, _) = x makeGenericStream :: SeekableStream(State(Word32, [t], [t])) t makeGenericStream = SeekableStream (do (m, ls, ls2) <- get case ls2 of x:xs -> do put$!(succ m, x:ls, xs) return$!return x [] -> return mzero) (return$error"makeGenericStream: unimplemented") (\n -> do (m, ls, ls2) <- get let n' = fromIntegral n-fromIntegral m if n' < 0 then let (lsa, lsb) = splitAt(-n') ls in put$!using(n, lsb, reverse lsa ++ ls2) (evalTuple3 rseq rseq rseq) else let (ls2a, ls2b) = splitAt n' ls2 in put$!using(n, reverse ls2a ++ ls, ls2b) (evalTuple3 rseq rseq rseq)) (liftM _fst3 get) (modify(\(n, ls, ls2) -> using(n+fromIntegral(length ls2), reverse ls2++ls, []) (evalTuple3 rseq rseq rseq))) (return True) _consumeToken = __consumeToken _consumeIntegralToken = __consumeIntegralToken _seek = __seek _getPosition = __getPosition _seekAtEnd = __seekAtEnd _isLockLive = __isLockLive {-# INLINE getPosition #-} getPosition :: (Monad m) => ReaderT(SeekableStream m c) m Word32 getPosition = ask>>=lift._getPosition {-# INLINE seek #-} seek :: (Monad m) => Word32 -> ReaderT(SeekableStream m c) m () seek n = ask>>=lift.(`_seek` n) {-# INLINE consumeToken #-} consumeToken :: (Monad m) => ReaderT(SeekableStream m c) m c consumeToken = do s <- ask liftM(maybe(error"consumeToken: end of stream") id)$lift$_consumeToken s {-# INLINE consumeIntegralToken #-} consumeIntegralToken :: (Monad m) => ReaderT(SeekableStream m c) m Word32 consumeIntegralToken = do s <- ask liftM(maybe(error"consumeIntegralToken: end of stream") id)$lift$_consumeIntegralToken s {-# INLINE seekAtEnd #-} seekAtEnd :: (Monad m) => ReaderT(SeekableStream m c) m () seekAtEnd = ask>>=lift._seekAtEnd isLockLive :: (Monad m) => ReaderT(SeekableStream m c) m Bool isLockLive = ask>>=lift._isLockLive {-# INLINE relSeek #-} relSeek n = do m <- getPosition seek$!m+n peekStream :: (Monad m) => ReaderT(SeekableStream m c) m c peekStream = do x <- consumeToken relSeek(-1) return x streamToList :: (Monad m) => SeekableStream m c -> m[c] streamToList = unfoldM._consumeToken