module Data.Columbia.SeekableStream (newTable, SeekableStream, seekableStream, hoistStream, getWord8,
makeIoStream, makeIoStreamChar, unshimmedIOStream, makeByteStringStream, makeGenericStream,
_getPosition, _seek, _consumeToken, _consumeIntegralToken, _seekAtEnd, _isLockLive, getPosition, seek, consumeToken, consumeIntegralToken, seekAtEnd, isLockLive, relSeek, peekStream, streamToList) where
import Foreign.Marshal.Utils
import Foreign.Storable
import Data.Word
import Data.Maybe
import Data.Char
import Data.Bits
import Data.IORef
import System.IO
import System.IO.Error
import System.FileLock
import Control.Monad.State
import Control.Monad.Reader
import Control.Monad.Loops
import Control.Monad
import Control.Exception
import Control.Parallel.Strategies
import qualified Data.ByteString as B
import System.IO.Unsafe
import Unsafe.Coerce
import Data.Columbia.Mapper
import Data.Columbia.MyEndianness
data SeekableStream m c = SeekableStream
{ __consumeToken :: !(m(Maybe c)), __consumeIntegralToken :: !(m(Maybe Word32)), __seek :: !(Word32 -> m()), __getPosition :: !(m Word32), __seekAtEnd :: !(m()), __isLockLive :: !(m Bool) } deriving Functor
seekableStream :: m(Maybe c) -> m(Maybe Word32) -> (Word32 -> m()) -> m Word32 -> m() -> m Bool -> SeekableStream m c
seekableStream = SeekableStream
hoistStream :: (forall t. m t -> m2 t) -> SeekableStream m c -> SeekableStream m2 c
hoistStream f s = SeekableStream(f(_consumeToken s)) (f(_consumeIntegralToken s)) (\n -> f(_seek s n)) (f(_getPosition s)) (f(_seekAtEnd s)) (f(_isLockLive s))
getWord8 :: Handle -> IO Word8
getWord8 h = with 0$ \p -> do
n <- hGetBuf h p 1
when(n==0)$void$hGetChar h
peek p
data LockLike = LockLike !() !(IORef Bool)
getLiveReference :: FileLock -> IORef Bool
getLiveReference lock = let LockLike _ r = unsafeCoerce lock in r
makeIoStream :: IORef(Pointer,Pointer) -> Table -> FileLock -> SeekableStream IO Word8
makeIoStream ref table lock = SeekableStream
(readIORef ref>>= \(n,sz)->if n>=sz then
return mzero
else
mapBlock table n>>=peek>>= \x->(writeIORef ref$!(succ n,sz))>>(return$!return x))
(return$error"makeIoStream: unimplemented")
(\n->modifyIORef' ref(\(_,sz)->(n,sz)))
(liftM fst$readIORef ref)
(modifyIORef' ref(\(_,sz)->(sz,sz)))
(atomicModifyIORef'(getLiveReference lock)$ \b -> (b, b))
makeIoStreamChar :: IORef(Pointer,Pointer) -> Table -> FileLock -> SeekableStream IO Char
makeIoStreamChar ref t = fmap(chr.fromIntegral).makeIoStream ref t
bytesToIntegral x1 x2 x3 x4 = return$!shiftL(fromIntegral x1) 24 .|.
shiftL(fromIntegral x2) 16 .|.
shiftL(fromIntegral x3) 8 .|.
fromIntegral x4
unshimmedIOStream :: Handle -> FileLock -> SeekableStream IO Word8
unshimmedIOStream handle lock = SeekableStream
(liftM(return$!) (getWord8 handle))
(liftM4 bytesToIntegral(getWord8 handle) (getWord8 handle) (getWord8 handle) (getWord8 handle))
(hSeek handle AbsoluteSeek. toInteger)
(liftM fromInteger(hTell handle))
(hSeek handle SeekFromEnd 0)
(atomicModifyIORef'(getLiveReference lock)$ \b -> (b, b))
makeByteStringStream :: B.ByteString -> SeekableStream(State Word32) Word8
makeByteStringStream b = SeekableStream
(do
n <- get
let n' = fromIntegral n
if n' == B.length b then
return mzero
else do
put$!succ n
return$!return$!B.index b n')
(return$error"makeByteStringStream: unimplemented")
put
get
(put$!fromIntegral$B.length b)
(return True)
_fst3 (x, _, _) = x
makeGenericStream :: SeekableStream(State(Word32, [t], [t])) t
makeGenericStream = SeekableStream
(do
(m, ls, ls2) <- get
case ls2 of
x:xs -> do
put$!(succ m, x:ls, xs)
return$!return x
[] -> return mzero)
(return$error"makeGenericStream: unimplemented")
(\n -> do
(m, ls, ls2) <- get
let n' = fromIntegral nfromIntegral m
if n' < 0 then
let (lsa, lsb) = splitAt(n') ls in
put$!using(n, lsb, reverse lsa ++ ls2) (evalTuple3 rseq rseq rseq)
else let (ls2a, ls2b) = splitAt n' ls2 in
put$!using(n, reverse ls2a ++ ls, ls2b) (evalTuple3 rseq rseq rseq))
(liftM _fst3 get)
(modify(\(n, ls, ls2) -> using(n+fromIntegral(length ls2), reverse ls2++ls, []) (evalTuple3 rseq rseq rseq)))
(return True)
_consumeToken = __consumeToken
_consumeIntegralToken = __consumeIntegralToken
_seek = __seek
_getPosition = __getPosition
_seekAtEnd = __seekAtEnd
_isLockLive = __isLockLive
getPosition :: (Monad m) => ReaderT(SeekableStream m c) m Word32
getPosition = ask>>=lift._getPosition
seek :: (Monad m) => Word32 -> ReaderT(SeekableStream m c) m ()
seek n = ask>>=lift.(`_seek` n)
consumeToken :: (Monad m) => ReaderT(SeekableStream m c) m c
consumeToken = do
s <- ask
liftM(maybe(error"consumeToken: end of stream") id)$lift$_consumeToken s
consumeIntegralToken :: (Monad m) => ReaderT(SeekableStream m c) m Word32
consumeIntegralToken = do
s <- ask
liftM(maybe(error"consumeIntegralToken: end of stream") id)$lift$_consumeIntegralToken s
seekAtEnd :: (Monad m) => ReaderT(SeekableStream m c) m ()
seekAtEnd = ask>>=lift._seekAtEnd
isLockLive :: (Monad m) => ReaderT(SeekableStream m c) m Bool
isLockLive = ask>>=lift._isLockLive
relSeek n = do
m <- getPosition
seek$!m+n
peekStream :: (Monad m) => ReaderT(SeekableStream m c) m c
peekStream = do
x <- consumeToken
relSeek(1)
return x
streamToList :: (Monad m) => SeekableStream m c -> m[c]
streamToList = unfoldM._consumeToken