{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

-- | Read and write values of types that implement 'Binary.Binary'.
module Data.Binary.IO.Lifted
  ( -- * Reader
    ReaderError (..)

  , Reader (..)
  , newReader
  , newReaderWith
  , mapReader

    -- * Writer
  , Writer (..)
  , newWriter
  , newWriterWith
  , mapWriter

    -- * Pipe
  , newPipe

    -- * Duplex
  , Duplex (..)
  , newDuplex
  , newDuplexWith
  , mapDuplex

    -- * Classes
  , CanGet (..)
  , read
  , isEmpty

  , CanPut (..)
  , write
  )
where

import Prelude hiding (read)

import           Control.Arrow ((&&&))
import qualified Control.Concurrent.Classy as Concurrent
import           Control.Monad (join, unless)
import qualified Control.Monad.Catch as Catch
import           Control.Monad.IO.Class (MonadIO (liftIO))
import           Control.Monad.Trans.Class (MonadTrans (lift))
import           Control.Monad.Trans.Except (ExceptT, except, runExceptT)
import qualified Data.Binary as Binary
import qualified Data.Binary.Get as Get
import           Data.Binary.IO.Internal.AwaitNotify (newAwaitNotify, runAwait, runNotify)
import qualified Data.Binary.Put as Put
import           Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import           Data.ByteString.Lazy (toStrict)
import           Data.IORef (atomicModifyIORef', mkWeakIORef, newIORef)
import qualified Deque.Strict as Deque
import           System.IO (Handle, hSetBinaryMode)
import           System.Mem.Weak (deRefWeak)

-- * Reader

-- | An error that can occur during reading
--
-- @since 0.4.0
data ReaderError = ReaderGetError -- ^ Error from the 'Binary.Get' operation
  { ReaderError -> ByteString
readerErrorRemaining :: !ByteString
  -- ^ Unconsumed part of the byte stream
  --
  -- @since 0.4.0

  , ReaderError -> ByteOffset
readerErrorOffset :: !Get.ByteOffset
  -- ^ Error location represented as an offset into the input
  --
  -- @since 0.4.0

  , ReaderError -> ByteString
readerErrorInput :: !ByteString
  -- ^ Input to the 'Binary.Get' operation
  --
  -- @since 0.4.0

  , ReaderError -> String
readerErrorMessage :: !String
  -- ^ Error message
  --
  -- @since 0.4.0
  }
  deriving stock Int -> ReaderError -> ShowS
[ReaderError] -> ShowS
ReaderError -> String
(Int -> ReaderError -> ShowS)
-> (ReaderError -> String)
-> ([ReaderError] -> ShowS)
-> Show ReaderError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReaderError] -> ShowS
$cshowList :: [ReaderError] -> ShowS
show :: ReaderError -> String
$cshow :: ReaderError -> String
showsPrec :: Int -> ReaderError -> ShowS
$cshowsPrec :: Int -> ReaderError -> ShowS
Show
  deriving anyclass Show ReaderError
Typeable ReaderError
(Typeable ReaderError, Show ReaderError) =>
(ReaderError -> SomeException)
-> (SomeException -> Maybe ReaderError)
-> (ReaderError -> String)
-> Exception ReaderError
SomeException -> Maybe ReaderError
ReaderError -> String
ReaderError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
displayException :: ReaderError -> String
$cdisplayException :: ReaderError -> String
fromException :: SomeException -> Maybe ReaderError
$cfromException :: SomeException -> Maybe ReaderError
toException :: ReaderError -> SomeException
$ctoException :: ReaderError -> SomeException
$cp2Exception :: Show ReaderError
$cp1Exception :: Typeable ReaderError
Catch.Exception

newtype StationaryReader m = StationaryReader
  { StationaryReader m
-> forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a)
runStationaryReader
      :: forall a
      .  Binary.Get a
      -> ExceptT ReaderError m (StationaryReader m, a)
  }

newStationaryReaderWith
  :: forall m
  .  Concurrent.MonadConc m
  => m ByteString
  -> m (StationaryReader m)
newStationaryReaderWith :: m ByteString -> m (StationaryReader m)
newStationaryReaderWith getChunk :: m ByteString
getChunk = do
  IORef m ByteString
inputRef <- ByteString -> m (IORef m ByteString)
forall (m :: * -> *) a. MonadConc m => a -> m (IORef m a)
Concurrent.newIORef ByteString
ByteString.empty

  let
    make :: StationaryReader m
make = (forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
forall (m :: * -> *).
(forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
StationaryReader ((forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
 -> StationaryReader m)
-> (forall a.
    Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
forall a b. (a -> b) -> a -> b
$ \get :: Get a
get -> do
      ByteString
input <- m ByteString -> ExceptT ReaderError m ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m ByteString -> ExceptT ReaderError m ByteString)
-> m ByteString -> ExceptT ReaderError m ByteString
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> m ByteString
forall (m :: * -> *) a. MonadConc m => IORef m a -> m a
Concurrent.readIORef IORef m ByteString
inputRef
      Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a.
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop (Decoder a -> ExceptT ReaderError m (StationaryReader m, a))
-> Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ Decoder a -> ByteString -> Decoder a
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk (Get a -> Decoder a
forall a. Get a -> Decoder a
Get.runGetIncremental Get a
get) ByteString
input

    loop :: Get.Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
    loop :: Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop = \case
      Get.Fail remainingBody :: ByteString
remainingBody offset :: ByteOffset
offset errorMessage :: String
errorMessage -> do
        ByteString
input <- m ByteString -> ExceptT ReaderError m ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m ByteString -> ExceptT ReaderError m ByteString)
-> m ByteString -> ExceptT ReaderError m ByteString
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> m ByteString
forall (m :: * -> *) a. MonadConc m => IORef m a -> m a
Concurrent.readIORef IORef m ByteString
inputRef
        Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either ReaderError (StationaryReader m, a)
 -> ExceptT ReaderError m (StationaryReader m, a))
-> Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ ReaderError -> Either ReaderError (StationaryReader m, a)
forall a b. a -> Either a b
Left $WReaderGetError :: ByteString -> ByteOffset -> ByteString -> String -> ReaderError
ReaderGetError
          { readerErrorRemaining :: ByteString
readerErrorRemaining = ByteString
remainingBody
          , readerErrorOffset :: ByteOffset
readerErrorOffset = ByteOffset
offset
          , readerErrorInput :: ByteString
readerErrorInput = ByteString
input
          , readerErrorMessage :: String
readerErrorMessage = String
errorMessage
          }

      Get.Done remainingBody :: ByteString
remainingBody _ value :: a
value -> ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *) a. MonadMask m => m a -> m a
Catch.mask_ (ExceptT ReaderError m (StationaryReader m, a)
 -> ExceptT ReaderError m (StationaryReader m, a))
-> ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ do
        m () -> ExceptT ReaderError m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT ReaderError m ())
-> m () -> ExceptT ReaderError m ()
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> ByteString -> m ()
forall (m :: * -> *) a. MonadConc m => IORef m a -> a -> m ()
Concurrent.writeIORef IORef m ByteString
inputRef ByteString
remainingBody
        (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StationaryReader m
make, a
value)

      Get.Partial continue :: Maybe ByteString -> Decoder a
continue -> do
        Maybe ByteString
chunk <- m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString))
-> m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString)
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask (((forall a. m a -> m a) -> m (Maybe ByteString))
 -> m (Maybe ByteString))
-> ((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \restore :: forall a. m a -> m a
restore -> do
          ByteString
chunk <- m ByteString -> m ByteString
forall a. m a -> m a
restore m ByteString
getChunk
          if ByteString -> Bool
ByteString.null ByteString
chunk then
            Maybe ByteString -> m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing
          else
            IORef m ByteString
-> (ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString)
forall (m :: * -> *) a b.
MonadConc m =>
IORef m a -> (a -> (a, b)) -> m b
Concurrent.atomicModifyIORef' IORef m ByteString
inputRef ((ByteString -> (ByteString, Maybe ByteString))
 -> m (Maybe ByteString))
-> (ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
chunk) (ByteString -> ByteString)
-> (ByteString -> Maybe ByteString)
-> ByteString
-> (ByteString, Maybe ByteString)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& Maybe ByteString -> ByteString -> Maybe ByteString
forall a b. a -> b -> a
const (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
chunk)

        Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a.
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop (Decoder a -> ExceptT ReaderError m (StationaryReader m, a))
-> Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ Maybe ByteString -> Decoder a
continue Maybe ByteString
chunk

  StationaryReader m -> m (StationaryReader m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure StationaryReader m
make

-- | @since 0.4.0
newtype Reader m = Reader
  { Reader m -> forall a. Get a -> m a
runReader :: forall a. Binary.Get a -> m a }

-- | Transform the underlying functor.
--
-- @since 0.4.0
mapReader :: (forall a. m a -> n a) -> Reader m -> Reader n
mapReader :: (forall a. m a -> n a) -> Reader m -> Reader n
mapReader f :: forall a. m a -> n a
f (Reader run :: forall a. Get a -> m a
run) = (forall a. Get a -> n a) -> Reader n
forall (m :: * -> *). (forall a. Get a -> m a) -> Reader m
Reader (m a -> n a
forall a. m a -> n a
f (m a -> n a) -> (Get a -> m a) -> Get a -> n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Get a -> m a
forall a. Get a -> m a
run)

-- | Create a new 'Reader' using an action that provides the chunks.
--
-- The chunk producers indicates the end of the stream by returning an empty
-- 'ByteString.ByteString'.
--
-- Reading using the 'Reader' may throw 'ReaderError'.
--
-- The internal position of the 'Reader' is not advanced when it throws an exception during reading.
-- This has the consequence that if you're trying to read with the same faulty 'Binary.Get'
-- operation multiple times, you will always receive an exception.
--
-- The 'Reader' is safe to use concurrently.
--
-- @since 0.4.0
newReaderWith
  :: Concurrent.MonadConc m
  => m ByteString -- ^ Chunk provider
  -> m (Reader m)
newReaderWith :: m ByteString -> m (Reader m)
newReaderWith getChunk :: m ByteString
getChunk = do
  StationaryReader m
posReader <- m ByteString -> m (StationaryReader m)
forall (m :: * -> *).
MonadConc m =>
m ByteString -> m (StationaryReader m)
newStationaryReaderWith m ByteString
getChunk
  MVar m (StationaryReader m)
mvar <- StationaryReader m -> m (MVar m (StationaryReader m))
forall (m :: * -> *) a. MonadConc m => a -> m (MVar m a)
Concurrent.newMVar StationaryReader m
posReader
  Reader m -> m (Reader m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Reader m -> m (Reader m)) -> Reader m -> m (Reader m)
forall a b. (a -> b) -> a -> b
$ (forall a. Get a -> m a) -> Reader m
forall (m :: * -> *). (forall a. Get a -> m a) -> Reader m
Reader ((forall a. Get a -> m a) -> Reader m)
-> (forall a. Get a -> m a) -> Reader m
forall a b. (a -> b) -> a -> b
$ \get :: Get a
get ->
    MVar m (StationaryReader m)
-> (StationaryReader m -> m (StationaryReader m, a)) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m (a, b)) -> m b
Concurrent.modifyMVar MVar m (StationaryReader m)
mvar ((StationaryReader m -> m (StationaryReader m, a)) -> m a)
-> (StationaryReader m -> m (StationaryReader m, a)) -> m a
forall a b. (a -> b) -> a -> b
$ \posReader :: StationaryReader m
posReader -> do
      Either ReaderError (StationaryReader m, a)
result <- ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ReaderError m (StationaryReader m, a)
 -> m (Either ReaderError (StationaryReader m, a)))
-> ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a))
forall a b. (a -> b) -> a -> b
$ StationaryReader m
-> Get a -> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *).
StationaryReader m
-> forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a)
runStationaryReader StationaryReader m
posReader Get a
get
      (ReaderError -> m (StationaryReader m, a))
-> ((StationaryReader m, a) -> m (StationaryReader m, a))
-> Either ReaderError (StationaryReader m, a)
-> m (StationaryReader m, a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ReaderError -> m (StationaryReader m, a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Catch.throwM (StationaryReader m, a) -> m (StationaryReader m, a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either ReaderError (StationaryReader m, a)
result

-- | Create a new reader.
--
-- Inherits properties from 'newReaderWith'.
--
-- Other threads reading from the 'Handle' will interfere with read operations of the 'Reader'.
-- However, the 'Reader' itself is thread-safe and can be utilized concurrently.
--
-- The given 'Handle' will be swiched to binary mode via 'hSetBinaryMode'.
--
-- @since 0.4.0
newReader
  :: (Concurrent.MonadConc m, MonadIO m)
  => Handle -- ^ Handle to read from
  -> m (Reader m)
newReader :: Handle -> m (Reader m)
newReader handle :: Handle
handle = do
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> Bool -> IO ()
hSetBinaryMode Handle
handle Bool
True
  m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith (m ByteString -> m (Reader m)) -> m ByteString -> m (Reader m)
forall a b. (a -> b) -> a -> b
$ IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> m ByteString) -> IO ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
ByteString.hGetSome Handle
handle 4096

-- | @r@ can execute 'Binary.Get' operations in @m@
--
-- @since 0.4.0
class CanGet r m where
  runGet :: r -> Binary.Get a -> m a

instance CanGet (Reader m) m where
  runGet :: Reader m -> Get a -> m a
runGet = Reader m -> Get a -> m a
forall (m :: * -> *). Reader m -> forall a. Get a -> m a
runReader

instance CanGet (Duplex m) m where
  runGet :: Duplex m -> Get a -> m a
runGet = Reader m -> Get a -> m a
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet (Reader m -> Get a -> m a)
-> (Duplex m -> Reader m) -> Duplex m -> Get a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Duplex m -> Reader m
forall (m :: * -> *). Duplex m -> Reader m
duplexReader

-- | Read something from @r@. Inherits properties from 'runGet'.
--
-- @since 0.4.0
read
  :: (CanGet r m, Binary.Binary a)
  => r -- ^ Source to read from
  -> m a
read :: r -> m a
read source :: r
source = r -> Get a -> m a
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet r
source Get a
forall t. Binary t => Get t
Binary.get

-- | Check if there is no more input to consume. This function may block. All properties of 'runGet'
-- apply to this function as well.
--
-- @since 0.4.0
isEmpty
  :: CanGet r m
  => r -- ^ Source to check for stream depletion
  -> m Bool
isEmpty :: r -> m Bool
isEmpty source :: r
source = r -> Get Bool -> m Bool
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet r
source Get Bool
Get.isEmpty

-- * Writer

-- | @since 0.4.0
newtype Writer m = Writer
  { Writer m -> Put -> m ()
runWriter :: Binary.Put -> m () }

-- | Transform the underlying functor.
--
-- @since 0.4.0
mapWriter :: (m () -> n ()) -> Writer m -> Writer n
mapWriter :: (m () -> n ()) -> Writer m -> Writer n
mapWriter f :: m () -> n ()
f (Writer write :: Put -> m ()
write) = (Put -> n ()) -> Writer n
forall (m :: * -> *). (Put -> m ()) -> Writer m
Writer (m () -> n ()
f (m () -> n ()) -> (Put -> m ()) -> Put -> n ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> m ()
write)

-- | Create a writer using a function that handles the output chunks.
--
-- @since 0.4.0
newWriterWith
  :: (ByteString -> m ()) -- ^ Chunk writer
  -> Writer m
newWriterWith :: (ByteString -> m ()) -> Writer m
newWriterWith write :: ByteString -> m ()
write = (Put -> m ()) -> Writer m
forall (m :: * -> *). (Put -> m ()) -> Writer m
Writer (ByteString -> m ()
write (ByteString -> m ()) -> (Put -> ByteString) -> Put -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
toStrict (ByteString -> ByteString)
-> (Put -> ByteString) -> Put -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
Put.runPut)

-- | Create a writer.
--
-- Other threads writing to the same 'Handle' do not interfere with the resulting 'Writer'. The
-- 'Writer' may be used concurrently.
--
-- @since 0.4.0
newWriter
  :: MonadIO m
  => Handle-- ^ Write target
  -> Writer m
newWriter :: Handle -> Writer m
newWriter handle :: Handle
handle = (ByteString -> m ()) -> Writer m
forall (m :: * -> *). (ByteString -> m ()) -> Writer m
newWriterWith (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> ByteString -> IO ()
ByteString.hPut Handle
handle)

-- | @w@ can execute 'Binary.Put' operations in @m@
--
-- @since 0.4.0
class CanPut w m where
  runPut :: w -> Binary.Put -> m ()

instance CanPut (Writer m) m where
  runPut :: Writer m -> Put -> m ()
runPut = Writer m -> Put -> m ()
forall (m :: * -> *). Writer m -> Put -> m ()
runWriter

instance CanPut (Duplex m) m where
  runPut :: Duplex m -> Put -> m ()
runPut = Writer m -> Put -> m ()
forall w (m :: * -> *). CanPut w m => w -> Put -> m ()
runPut (Writer m -> Put -> m ())
-> (Duplex m -> Writer m) -> Duplex m -> Put -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Duplex m -> Writer m
forall (m :: * -> *). Duplex m -> Writer m
duplexWriter

instance MonadIO m => CanPut Handle m where
  runPut :: Handle -> Put -> m ()
runPut handle :: Handle
handle = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Put -> IO ()) -> Put -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> ByteString -> IO ()
ByteString.hPut Handle
handle (ByteString -> IO ()) -> (Put -> ByteString) -> Put -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
toStrict (ByteString -> ByteString)
-> (Put -> ByteString) -> Put -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
Put.runPut

-- | Write something to @w@.
--
-- @since 0.4.0
write
  :: (CanPut w m, Binary.Binary a)
  => w -- ^ Write target
  -> a -- ^ Value to write
  -> m ()
write :: w -> a -> m ()
write sink :: w
sink value :: a
value = w -> Put -> m ()
forall w (m :: * -> *). CanPut w m => w -> Put -> m ()
runPut w
sink (Put -> m ()) -> Put -> m ()
forall a b. (a -> b) -> a -> b
$ a -> Put
forall t. Binary t => t -> Put
Binary.put a
value

-- * Pipe

-- | Create a connected pair of 'Reader' and 'Writer'.
--
-- The 'Reader' will automatically end the stream if the 'Writer' goes out of scope.
--
-- @since 0.4.0
newPipe :: (Concurrent.MonadConc m, MonadIO m) => m (Reader m, Writer m)
newPipe :: m (Reader m, Writer m)
newPipe = do
  IORef (Deque ByteString)
chan <- IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString)))
-> IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString))
forall a b. (a -> b) -> a -> b
$ Deque ByteString -> IO (IORef (Deque ByteString))
forall a. a -> IO (IORef a)
newIORef Deque ByteString
forall a. Monoid a => a
mempty
  Weak (IORef (Deque ByteString))
weakChan <- IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef (Deque ByteString)))
 -> m (Weak (IORef (Deque ByteString))))
-> IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString)))
forall a b. (a -> b) -> a -> b
$ IORef (Deque ByteString)
-> IO () -> IO (Weak (IORef (Deque ByteString)))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef (Deque ByteString)
chan (IO () -> IO (Weak (IORef (Deque ByteString))))
-> IO () -> IO (Weak (IORef (Deque ByteString)))
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  (await :: Await
await, notify :: Notify
notify) <- IO (Await, Notify) -> m (Await, Notify)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Await, Notify)
newAwaitNotify

  let
    read :: IO ByteString
read = do
      Maybe (IORef (Deque ByteString))
mbChan <- Weak (IORef (Deque ByteString))
-> IO (Maybe (IORef (Deque ByteString)))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (IORef (Deque ByteString))
weakChan
      case Maybe (IORef (Deque ByteString))
mbChan of
        Nothing -> ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
ByteString.empty
        Just chan :: IORef (Deque ByteString)
chan -> IO (IO ByteString) -> IO ByteString
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ByteString) -> IO ByteString)
-> IO (IO ByteString) -> IO ByteString
forall a b. (a -> b) -> a -> b
$
          IORef (Deque ByteString)
-> (Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Deque ByteString)
chan ((Deque ByteString -> (Deque ByteString, IO ByteString))
 -> IO (IO ByteString))
-> (Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString)
forall a b. (a -> b) -> a -> b
$ \queue :: Deque ByteString
queue ->
            case Deque ByteString -> Maybe (ByteString, Deque ByteString)
forall a. Deque a -> Maybe (a, Deque a)
Deque.uncons Deque ByteString
queue of
              Just (elem :: ByteString
elem, queue :: Deque ByteString
queue) -> (Deque ByteString
queue, ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
elem)
              Nothing -> (Deque ByteString
queue, Await -> IO Bool
runAwait Await
await IO Bool -> IO ByteString -> IO ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ByteString
read)

    write :: ByteString -> IO ()
write msg :: ByteString
msg =
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
ByteString.null ByteString
msg) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        IORef (Deque ByteString)
-> (Deque ByteString -> (Deque ByteString, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Deque ByteString)
chan ((Deque ByteString -> (Deque ByteString, ())) -> IO ())
-> (Deque ByteString -> (Deque ByteString, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \queue :: Deque ByteString
queue ->
          (ByteString -> Deque ByteString -> Deque ByteString
forall a. a -> Deque a -> Deque a
Deque.snoc ByteString
msg Deque ByteString
queue, ())
        Notify -> IO ()
runNotify Notify
notify

  Reader m
reader <- m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith (IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ByteString
read)
  let writer :: Writer m
writer = (ByteString -> m ()) -> Writer m
forall (m :: * -> *). (ByteString -> m ()) -> Writer m
newWriterWith (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> IO ()
write)

  (Reader m, Writer m) -> m (Reader m, Writer m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Reader m
reader, Writer m
writer)

-- * Duplex

-- | Pair of 'Reader' and 'Writer'
--
-- @since 0.4.0
data Duplex m = Duplex
  { Duplex m -> Writer m
duplexWriter :: Writer m
  , Duplex m -> Reader m
duplexReader :: Reader m
  }

-- | Transform the underlying functor.
--
-- @since 0.4.0
mapDuplex :: (forall a. m a -> n a) -> Duplex m -> Duplex n
mapDuplex :: (forall a. m a -> n a) -> Duplex m -> Duplex n
mapDuplex f :: forall a. m a -> n a
f (Duplex w :: Writer m
w r :: Reader m
r) = Writer n -> Reader n -> Duplex n
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex ((m () -> n ()) -> Writer m -> Writer n
forall (m :: * -> *) (n :: * -> *).
(m () -> n ()) -> Writer m -> Writer n
mapWriter m () -> n ()
forall a. m a -> n a
f Writer m
w) ((forall a. m a -> n a) -> Reader m -> Reader n
forall (m :: * -> *) (n :: * -> *).
(forall a. m a -> n a) -> Reader m -> Reader n
mapReader forall a. m a -> n a
f Reader m
r)

-- | Create a new duplex. The 'Duplex' inherits all the properties of 'Reader' and 'Writer' when
-- created with 'newReader' and 'newWriter'.
--
-- @since 0.4.0
newDuplex
  :: (Concurrent.MonadConc m, MonadIO m)
   => Handle -- ^ Handle to read from and write to
   -> m (Duplex m)
newDuplex :: Handle -> m (Duplex m)
newDuplex handle :: Handle
handle = Writer m -> Reader m -> Duplex m
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex (Handle -> Writer m
forall (m :: * -> *). MonadIO m => Handle -> Writer m
newWriter Handle
handle) (Reader m -> Duplex m) -> m (Reader m) -> m (Duplex m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> m (Reader m)
forall (m :: * -> *).
(MonadConc m, MonadIO m) =>
Handle -> m (Reader m)
newReader Handle
handle

-- | Combines 'newReaderWith' and 'newWriterWith'.
--
-- @since 0.4.0
newDuplexWith
  :: Concurrent.MonadConc m
  => m ByteString -- ^ Input chunk producer for 'Reader'
  -> (ByteString -> m ()) -- ^ Chunk writer for 'Writer'
  -> m (Duplex m)
newDuplexWith :: m ByteString -> (ByteString -> m ()) -> m (Duplex m)
newDuplexWith getChunk :: m ByteString
getChunk writeChunk :: ByteString -> m ()
writeChunk = Writer m -> Reader m -> Duplex m
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex ((ByteString -> m ()) -> Writer m
forall (m :: * -> *). (ByteString -> m ()) -> Writer m
newWriterWith ByteString -> m ()
writeChunk) (Reader m -> Duplex m) -> m (Reader m) -> m (Duplex m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith m ByteString
getChunk