module Data.Store.Streaming
(
Message (..)
, encodeMessage
, PeekMessage (..)
, peekMessage
, decodeMessage
, conduitEncode
, conduitDecode
) where
import Control.Exception (assert)
import Control.Monad (liftM)
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Internal as BS
import qualified Data.Conduit as C
import qualified Data.Conduit.List as C
import Data.Store
import Data.Store.Impl (Peek (..), Poke (..), tooManyBytes, getSize)
import Data.Word
import Foreign.Ptr
import qualified Foreign.Storable as Storable
import Prelude
import System.IO.ByteBuffer (ByteBuffer)
import qualified System.IO.ByteBuffer as BB
newtype Message a = Message { fromMessage :: a } deriving (Eq, Show)
type SizeTag = Int
tagLength :: Int
tagLength = Storable.sizeOf (undefined :: SizeTag)
encodeMessage :: Store a => Message a -> ByteString
encodeMessage (Message x) =
let l = getSize x
totalLength = tagLength + l
in BS.unsafeCreate
totalLength
(\p -> do (offset, ()) <- runPoke (poke l >> poke x) p 0
assert (offset == totalLength) (return ()))
data PeekMessage m a = Done (Message a)
| NeedMoreInput (ByteString -> m (PeekMessage m a))
peekSized :: (MonadIO m, Store a) => ByteBuffer -> Int -> m (PeekMessage m a)
peekSized bb n =
BB.unsafeConsume bb n >>= \case
Right ptr -> liftM (Done . Message) $ decodeFromPtr ptr n
Left _ -> return $ NeedMoreInput (\ bs -> BB.copyByteString bb bs
>> peekSized bb n)
peekSizeTag :: MonadIO m => ByteBuffer -> m (PeekMessage m SizeTag)
peekSizeTag bb = peekSized bb tagLength
peekMessage :: (MonadIO m, Store a) => ByteBuffer -> m (PeekMessage m a)
peekMessage bb =
peekSizeTag bb >>= \case
(Done (Message n)) -> peekSized bb n
NeedMoreInput _ ->
return $ NeedMoreInput (\ bs -> BB.copyByteString bb bs
>> peekMessage bb)
decodeMessage :: (MonadIO m, Store a)
=> ByteBuffer -> m (Maybe ByteString) -> m (Maybe (Message a))
decodeMessage bb getBs =
decodeSizeTag bb getBs >>= \case
Nothing -> return Nothing
Just n -> decodeSized bb getBs n
decodeSizeTag :: MonadIO m
=> ByteBuffer
-> m (Maybe ByteString)
-> m (Maybe SizeTag)
decodeSizeTag bb getBs =
peekSizeTag bb >>= \case
(Done (Message n)) -> return (Just n)
(NeedMoreInput _) -> getBs >>= \case
Just bs -> BB.copyByteString bb bs >> decodeSizeTag bb getBs
Nothing -> BB.availableBytes bb >>= \case
0 -> return Nothing
n -> liftIO $ tooManyBytes tagLength n "Data.Store.Message.SizeTag"
decodeSized :: (MonadIO m, Store a)
=> ByteBuffer
-> m (Maybe ByteString)
-> Int
-> m (Maybe (Message a))
decodeSized bb getBs n =
peekSized bb n >>= \case
Done message -> return (Just message)
NeedMoreInput _ -> getBs >>= \case
Just bs -> BB.copyByteString bb bs >> decodeSized bb getBs n
Nothing -> BB.availableBytes bb >>= \ available ->
liftIO $ tooManyBytes n available "Data.Store.Message.Message"
decodeFromPtr :: (MonadIO m, Store a) => Ptr Word8 -> Int -> m a
decodeFromPtr ptr n =
liftIO (liftM snd $ runPeek peek (ptr `plusPtr` n) ptr)
conduitEncode :: (Monad m, Store a) => C.Conduit (Message a) m ByteString
conduitEncode = C.map encodeMessage
conduitDecode :: (MonadIO m, MonadResource m, Store a)
=> Maybe Int
-> C.Conduit ByteString m (Message a)
conduitDecode bufSize =
C.bracketP
(BB.new bufSize)
BB.free
go
where
go buffer = do
mmessage <- decodeMessage buffer C.await
case mmessage of
Nothing -> return ()
Just message -> C.yield message >> go buffer