{-# LANGUAGE CPP, MultiWayIf, OverloadedStrings #-} -- | A not-very-smart default implementation of transmitting -- compact regions over sockets. module Data.Compact.Socket where import Control.Applicative import Control.Arrow (first) import Control.DeepSeq (NFData) import Control.Monad import Data.Bits import qualified Data.ByteString as B import qualified Data.ByteString.Builder as BB import qualified Data.ByteString.Lazy as LB import qualified Data.ByteString.Unsafe as B import Data.Char (isSpace) import Data.Compact as C import Data.IORef import Data.Maybe (fromMaybe) import Data.Monoid import Data.Word import Foreign.C.Types import Foreign.Marshal.Utils (copyBytes) import Foreign.Ptr (Ptr, castPtr, plusPtr, ptrToWordPtr, wordPtrToPtr) import Network.Socket.ByteString as B import qualified Network.Socket.ByteString.Lazy as LB import Network.Socket.Internal import Network.Socket import System.Directory (doesFileExist, getTemporaryDirectory) import System.FilePath (splitExtension, ()) import System.IO (hGetContents) import System.IO.Unsafe (unsafeInterleaveIO) -------------------------------------------------------------------- -- * Sending/receiving length prefixed ByteStrings(lazy and strict). -- | Call @send@ with a length-prefixed version of @bs@. sendLenPfx :: (LB.ByteString -> IO ()) -> LB.ByteString -> IO () sendLenPfx send bs = send $ BB.toLazyByteString $ BB.word64BE (fromIntegral $ LB.length bs) <> BB.lazyByteString bs -- | Send a length-prefixed bytestring over a 'Socket'. sendLenPfxSock :: Socket -> LB.ByteString -> IO () sendLenPfxSock = sendLenPfx . LB.sendAll -- | Try to read length prefixed message. Second return value is extra data -- read after reading length prefixed message. recvLenPfx :: IO LB.ByteString -> IO (LB.ByteString, LB.ByteString) recvLenPfx recv = pfxLoop "" where pfxLoop :: LB.ByteString -> IO (LB.ByteString, LB.ByteString) pfxLoop bs | LB.length bs >= 8 = do let ([w1,w2,w3,w4,w5,w6,w7,w8], rest) = first LB.unpack $ LB.splitAt 8 bs pfx = mkWord64 w1 w2 w3 w4 w5 w6 w7 w8 loop pfx rest | otherwise = do bs' <- recv if LB.null bs' then return ("", "") else pfxLoop bs' loop :: Word64 -> LB.ByteString -> IO (LB.ByteString, LB.ByteString) loop pfx bs | pfx == fromIntegral (LB.length bs) = return (bs, "") | pfx < fromIntegral (LB.length bs) = do let (msg, extra) = LB.splitAt (fromIntegral pfx) bs return (msg, extra) | otherwise = do bytes <- recv if LB.null bytes then error "Socket is closed before reading the whole message" else loop pfx (bs <> bytes) recvLenPfxSock :: Socket -> IO (LB.ByteString, LB.ByteString) recvLenPfxSock = recvLenPfx . flip LB.recv 8192 -- | 'recv' messages until the socket is closed. Returns lazy 'ByteString'. recvAll :: Socket -> IO LB.ByteString recvAll sock = do bs <- LB.recv sock 8192 if LB.null bs then return bs else (bs <>) <$> recvAll sock recvAllInterleave :: Socket -> IO LB.ByteString recvAllInterleave sock = do bs <- LB.recv sock 8192 if LB.null bs then return bs else (bs <>) <$> unsafeInterleaveIO (recvAllInterleave sock) mkWord32 :: Word8 -> Word8 -> Word8 -> Word8 -> Word32 mkWord32 w1 w2 w3 w4 = fromIntegral w1 `shiftL` 24 + fromIntegral w2 `shiftL` 16 + fromIntegral w3 `shiftL` 8 + fromIntegral w4 {-# INLINE mkWord32 #-} mkWord64 :: Word8 -> Word8 -> Word8 -> Word8 -> Word8 -> Word8 -> Word8 -> Word8 -> Word64 mkWord64 w1 w2 w3 w4 w5 w6 w7 w8 = fromIntegral (mkWord32 w1 w2 w3 w4) `shiftL` 32 + fromIntegral (mkWord32 w5 w6 w7 w8) {-# INLINE mkWord64 #-} ---------------------------------------- -- * Compact <-> ByteString convresions. compactToBS :: NFData a => C.Compact a -> IO LB.ByteString compactToBS c = C.withCompactPtrs c $ \(C.SerializedCompact blocks root) -> do bss <- forM blocks $ \(ptr, len) -> B.unsafePackCStringLen (castPtr ptr, fromIntegral len) return . BB.toLazyByteString . mconcat $ -- root pointer BB.int64BE (fromIntegral $ ptrToWordPtr root) -- (ptr, length, bytes) triplets : zipWith mkPair blocks bss where mkPair :: (Ptr a, Word) -> B.ByteString -> BB.Builder mkPair (ptr, len) bs = BB.word64BE (fromIntegral $ ptrToWordPtr ptr) <> BB.word64BE (fromIntegral len) <> BB.byteString bs bsToCompact :: B.ByteString -> IO (C.Compact a) bsToCompact bs = do let (rootPtr, r) = first (wordPtrToPtr . fromIntegral . mkW64 . B.unpack) $ B.splitAt 8 bs blocks = parseBlocks r pws = map (\(p, w, _) -> (p, w)) blocks bss = map (\(_, _, b) -> b) blocks fromMaybe (error "Can't generate Compact from bytes.") <$> #ifdef CNF_DONT_USE_TRUSTED #warning "compiling with compactImportByteStrings" C.compactImportByteStrings (C.SerializedCompact pws rootPtr) bss #else #warning "compiling with compactImportByteStringsTrusted" C.compactImportByteStringsTrusted (C.SerializedCompact pws rootPtr) bss #endif where parseBlocks :: B.ByteString -> [(Ptr a, Word, B.ByteString)] parseBlocks "" = [] parseBlocks bbs = let (ptr, r ) = first (wordPtrToPtr . fromIntegral . mkW64 . B.unpack) $ B.splitAt 8 bbs (len, r' ) = first (fromIntegral . mkW64 . B.unpack) $ B.splitAt 8 r (bytes, r'') = B.splitAt (fromIntegral len) r' rest = parseBlocks r'' in (ptr, len, bytes) : rest -------------------------------------------------------------------- -- * Utilities for sending and receiving Compacts without converting -- Compact blocks to ByteStrings first. -- | Send a compact region over the network via 'Socket'. sendCompact :: NFData a => Socket -> Compact a -> IO () sendCompact sock@(MkSocket s _ _ _ _) c = C.withCompactPtrs c $ \(C.SerializedCompact blocks root) -> do -- we first send root and [(ptr, len)] using standard method sendLenPfxSock sock $ mkInfoMsg root blocks -- we then send blocks, skipping `network` API and ByteString -- conversions. forM_ blocks $ \(ptr, len) -> send s (castPtr ptr) (fromIntegral len) 0 where send :: CInt -> Ptr a -> CSize -> CInt -> IO CInt send s ptr len mode = do sent <- throwSocketErrorWaitWrite sock "sendCompact" $ c_send s ptr len mode if fromIntegral sent < len then do sent' <- send s (ptr `plusPtr` fromIntegral sent) (len - fromIntegral sent) mode return (sent + sent') else return sent mkInfoMsg :: Ptr a -> [(Ptr b, Word)] -> LB.ByteString mkInfoMsg root blocks = BB.toLazyByteString . mconcat $ -- root pointer BB.int64BE (fromIntegral $ ptrToWordPtr root) -- (ptr, length) pairs : map mkPair blocks mkPair :: (Ptr a, Word) -> BB.Builder mkPair (ptr, len) = BB.word64BE (fromIntegral $ ptrToWordPtr ptr) <> BB.word64BE (fromIntegral len) -- | Receive a compact region being sent on 'Socket'. recvCompact :: Socket -> IO (Compact a) recvCompact sock = do -- receive info msg first (sc, extra) <- recvInfoMsg extraRef <- newIORef (LB.toStrict extra) fromMaybe (error "Can't generate Compact from bytes.") <$> C.compactImportTrusted sc (\ptr size -> recv' extraRef (fromIntegral size) (castPtr ptr)) where recvInfoMsg :: IO (C.SerializedCompact a, LB.ByteString) recvInfoMsg = do (bytes', extra) <- recvLenPfxSock sock let bytes = LB.toStrict bytes' let (ptr, r) = first (wordPtrToPtr . fromIntegral . mkW64 . B.unpack) $ B.splitAt 8 bytes rest = parseBlocks r return (C.SerializedCompact rest ptr, extra) recv' :: IORef B.ByteString -> Int -> Ptr Word8 -> IO () recv' _ 0 _ = return () recv' extra size ptr = do extra' <- readIORef extra if | B.null extra' -> do recvd <- recvInner sock size ptr when (recvd == 0) $ error "socket closed" unless (recvd == size) $ recv' extra (size - recvd) (ptr `plusPtr` recvd) | B.length extra' > fromIntegral size -> do let (b, rest) = B.splitAt (fromIntegral size) extra' writeIORef extra rest writeBS b ptr return () | otherwise -> do writeBS extra' ptr writeIORef extra "" recv' extra (size - fromIntegral (B.length extra')) (ptr `plusPtr` fromIntegral (B.length extra')) writeBS :: B.ByteString -> Ptr Word8 -> IO () writeBS bs ptr = B.unsafeUseAsCStringLen bs $ \(cPtr, len) -> copyBytes ptr (castPtr cPtr) len {-# INLINE writeBS #-} parseBlocks :: B.ByteString -> [(Ptr a, Word)] parseBlocks "" = [] parseBlocks bbs = let (ptr, r ) = first (wordPtrToPtr . fromIntegral . mkW64 . B.unpack) $ B.splitAt 8 bbs (len, r') = first (fromIntegral . mkW64 . B.unpack) $ B.splitAt 8 r rest = parseBlocks r' in (ptr, len) : rest compactLen :: NFData a => Compact a -> IO Word compactLen c = C.withCompactPtrs c $ \(SerializedCompact blocks _) -> return $ sum (map snd blocks) --------- -- * Misc mkW64 :: [Word8] -> Word64 mkW64 [w1, w2, w3, w4, w5, w6, w7, w8] = fromIntegral w1 `shiftL` 56 + fromIntegral w2 `shiftL` 48 + fromIntegral w3 `shiftL` 40 + fromIntegral w4 `shiftL` 32 + fromIntegral w5 `shiftL` 24 + fromIntegral w6 `shiftL` 16 + fromIntegral w7 `shiftL` 8 + fromIntegral w8 mkW64 l = error $ "mkW64 is called with " ++ show (length l) ++ " elements." {-# INLINE mkW64 #-} foreign import ccall unsafe "send" c_send :: CInt -> Ptr a -> CSize -> CInt -> IO CInt foreign import ccall unsafe "recv" c_recv :: CInt -> Ptr CChar -> CSize -> CInt -> IO CInt recvInner :: Socket -> Int -> Ptr Word8 -> IO Int recvInner sock@(MkSocket s _ _ _ _) nbytes ptr = fmap fromIntegral $ throwSocketErrorWaitRead sock "recv" $ c_recv s (castPtr ptr) (fromIntegral nbytes) 0