{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Data.Binary.IO.Lifted
(
ReaderError (..)
, Reader (..)
, newReader
, newReaderWith
, mapReader
, Writer (..)
, newWriter
, newWriterWith
, mapWriter
, newPipe
, Duplex (..)
, newDuplex
, newDuplexWith
, mapDuplex
, CanGet (..)
, read
, isEmpty
, CanPut (..)
, write
)
where
import Prelude hiding (read)
import Control.Arrow ((&&&))
import qualified Control.Concurrent.Classy as Concurrent
import Control.Monad (join, unless)
import qualified Control.Monad.Catch as Catch
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.Trans.Except (ExceptT, except, runExceptT)
import qualified Data.Binary as Binary
import qualified Data.Binary.Get as Get
import Data.Binary.IO.Internal.AwaitNotify (newAwaitNotify, runAwait, runNotify)
import qualified Data.Binary.Put as Put
import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import Data.ByteString.Lazy (toStrict)
import Data.IORef (atomicModifyIORef', mkWeakIORef, newIORef)
import qualified Deque.Strict as Deque
import System.IO (Handle, hSetBinaryMode)
import System.Mem.Weak (deRefWeak)
data ReaderError = ReaderGetError
{ ReaderError -> ByteString
readerErrorRemaining :: !ByteString
, ReaderError -> ByteOffset
readerErrorOffset :: !Get.ByteOffset
, ReaderError -> ByteString
readerErrorInput :: !ByteString
, ReaderError -> String
readerErrorMessage :: !String
}
deriving stock Int -> ReaderError -> ShowS
[ReaderError] -> ShowS
ReaderError -> String
(Int -> ReaderError -> ShowS)
-> (ReaderError -> String)
-> ([ReaderError] -> ShowS)
-> Show ReaderError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReaderError] -> ShowS
$cshowList :: [ReaderError] -> ShowS
show :: ReaderError -> String
$cshow :: ReaderError -> String
showsPrec :: Int -> ReaderError -> ShowS
$cshowsPrec :: Int -> ReaderError -> ShowS
Show
deriving anyclass Show ReaderError
Typeable ReaderError
(Typeable ReaderError, Show ReaderError) =>
(ReaderError -> SomeException)
-> (SomeException -> Maybe ReaderError)
-> (ReaderError -> String)
-> Exception ReaderError
SomeException -> Maybe ReaderError
ReaderError -> String
ReaderError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
displayException :: ReaderError -> String
$cdisplayException :: ReaderError -> String
fromException :: SomeException -> Maybe ReaderError
$cfromException :: SomeException -> Maybe ReaderError
toException :: ReaderError -> SomeException
$ctoException :: ReaderError -> SomeException
$cp2Exception :: Show ReaderError
$cp1Exception :: Typeable ReaderError
Catch.Exception
newtype StationaryReader m = StationaryReader
{ StationaryReader m
-> forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a)
runStationaryReader
:: forall a
. Binary.Get a
-> ExceptT ReaderError m (StationaryReader m, a)
}
newStationaryReaderWith
:: forall m
. Concurrent.MonadConc m
=> m ByteString
-> m (StationaryReader m)
newStationaryReaderWith :: m ByteString -> m (StationaryReader m)
newStationaryReaderWith getChunk :: m ByteString
getChunk = do
IORef m ByteString
inputRef <- ByteString -> m (IORef m ByteString)
forall (m :: * -> *) a. MonadConc m => a -> m (IORef m a)
Concurrent.newIORef ByteString
ByteString.empty
let
make :: StationaryReader m
make = (forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
forall (m :: * -> *).
(forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
StationaryReader ((forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m)
-> (forall a.
Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
forall a b. (a -> b) -> a -> b
$ \get :: Get a
get -> do
ByteString
input <- m ByteString -> ExceptT ReaderError m ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m ByteString -> ExceptT ReaderError m ByteString)
-> m ByteString -> ExceptT ReaderError m ByteString
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> m ByteString
forall (m :: * -> *) a. MonadConc m => IORef m a -> m a
Concurrent.readIORef IORef m ByteString
inputRef
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a.
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop (Decoder a -> ExceptT ReaderError m (StationaryReader m, a))
-> Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ Decoder a -> ByteString -> Decoder a
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk (Get a -> Decoder a
forall a. Get a -> Decoder a
Get.runGetIncremental Get a
get) ByteString
input
loop :: Get.Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop :: Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop = \case
Get.Fail remainingBody :: ByteString
remainingBody offset :: ByteOffset
offset errorMessage :: String
errorMessage -> do
ByteString
input <- m ByteString -> ExceptT ReaderError m ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m ByteString -> ExceptT ReaderError m ByteString)
-> m ByteString -> ExceptT ReaderError m ByteString
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> m ByteString
forall (m :: * -> *) a. MonadConc m => IORef m a -> m a
Concurrent.readIORef IORef m ByteString
inputRef
Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a))
-> Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ ReaderError -> Either ReaderError (StationaryReader m, a)
forall a b. a -> Either a b
Left $WReaderGetError :: ByteString -> ByteOffset -> ByteString -> String -> ReaderError
ReaderGetError
{ readerErrorRemaining :: ByteString
readerErrorRemaining = ByteString
remainingBody
, readerErrorOffset :: ByteOffset
readerErrorOffset = ByteOffset
offset
, readerErrorInput :: ByteString
readerErrorInput = ByteString
input
, readerErrorMessage :: String
readerErrorMessage = String
errorMessage
}
Get.Done remainingBody :: ByteString
remainingBody _ value :: a
value -> ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *) a. MonadMask m => m a -> m a
Catch.mask_ (ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a))
-> ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ do
m () -> ExceptT ReaderError m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT ReaderError m ())
-> m () -> ExceptT ReaderError m ()
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> ByteString -> m ()
forall (m :: * -> *) a. MonadConc m => IORef m a -> a -> m ()
Concurrent.writeIORef IORef m ByteString
inputRef ByteString
remainingBody
(StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StationaryReader m
make, a
value)
Get.Partial continue :: Maybe ByteString -> Decoder a
continue -> do
Maybe ByteString
chunk <- m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString))
-> m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString)
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask (((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString))
-> ((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \restore :: forall a. m a -> m a
restore -> do
ByteString
chunk <- m ByteString -> m ByteString
forall a. m a -> m a
restore m ByteString
getChunk
if ByteString -> Bool
ByteString.null ByteString
chunk then
Maybe ByteString -> m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing
else
IORef m ByteString
-> (ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString)
forall (m :: * -> *) a b.
MonadConc m =>
IORef m a -> (a -> (a, b)) -> m b
Concurrent.atomicModifyIORef' IORef m ByteString
inputRef ((ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString))
-> (ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
chunk) (ByteString -> ByteString)
-> (ByteString -> Maybe ByteString)
-> ByteString
-> (ByteString, Maybe ByteString)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& Maybe ByteString -> ByteString -> Maybe ByteString
forall a b. a -> b -> a
const (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
chunk)
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a.
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop (Decoder a -> ExceptT ReaderError m (StationaryReader m, a))
-> Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ Maybe ByteString -> Decoder a
continue Maybe ByteString
chunk
StationaryReader m -> m (StationaryReader m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure StationaryReader m
make
newtype Reader m = Reader
{ Reader m -> forall a. Get a -> m a
runReader :: forall a. Binary.Get a -> m a }
mapReader :: (forall a. m a -> n a) -> Reader m -> Reader n
mapReader :: (forall a. m a -> n a) -> Reader m -> Reader n
mapReader f :: forall a. m a -> n a
f (Reader run :: forall a. Get a -> m a
run) = (forall a. Get a -> n a) -> Reader n
forall (m :: * -> *). (forall a. Get a -> m a) -> Reader m
Reader (m a -> n a
forall a. m a -> n a
f (m a -> n a) -> (Get a -> m a) -> Get a -> n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Get a -> m a
forall a. Get a -> m a
run)
newReaderWith
:: Concurrent.MonadConc m
=> m ByteString
-> m (Reader m)
newReaderWith :: m ByteString -> m (Reader m)
newReaderWith getChunk :: m ByteString
getChunk = do
StationaryReader m
posReader <- m ByteString -> m (StationaryReader m)
forall (m :: * -> *).
MonadConc m =>
m ByteString -> m (StationaryReader m)
newStationaryReaderWith m ByteString
getChunk
MVar m (StationaryReader m)
mvar <- StationaryReader m -> m (MVar m (StationaryReader m))
forall (m :: * -> *) a. MonadConc m => a -> m (MVar m a)
Concurrent.newMVar StationaryReader m
posReader
Reader m -> m (Reader m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Reader m -> m (Reader m)) -> Reader m -> m (Reader m)
forall a b. (a -> b) -> a -> b
$ (forall a. Get a -> m a) -> Reader m
forall (m :: * -> *). (forall a. Get a -> m a) -> Reader m
Reader ((forall a. Get a -> m a) -> Reader m)
-> (forall a. Get a -> m a) -> Reader m
forall a b. (a -> b) -> a -> b
$ \get :: Get a
get ->
MVar m (StationaryReader m)
-> (StationaryReader m -> m (StationaryReader m, a)) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m (a, b)) -> m b
Concurrent.modifyMVar MVar m (StationaryReader m)
mvar ((StationaryReader m -> m (StationaryReader m, a)) -> m a)
-> (StationaryReader m -> m (StationaryReader m, a)) -> m a
forall a b. (a -> b) -> a -> b
$ \posReader :: StationaryReader m
posReader -> do
Either ReaderError (StationaryReader m, a)
result <- ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a)))
-> ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a))
forall a b. (a -> b) -> a -> b
$ StationaryReader m
-> Get a -> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *).
StationaryReader m
-> forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a)
runStationaryReader StationaryReader m
posReader Get a
get
(ReaderError -> m (StationaryReader m, a))
-> ((StationaryReader m, a) -> m (StationaryReader m, a))
-> Either ReaderError (StationaryReader m, a)
-> m (StationaryReader m, a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ReaderError -> m (StationaryReader m, a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Catch.throwM (StationaryReader m, a) -> m (StationaryReader m, a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either ReaderError (StationaryReader m, a)
result
newReader
:: (Concurrent.MonadConc m, MonadIO m)
=> Handle
-> m (Reader m)
newReader :: Handle -> m (Reader m)
newReader handle :: Handle
handle = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> Bool -> IO ()
hSetBinaryMode Handle
handle Bool
True
m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith (m ByteString -> m (Reader m)) -> m ByteString -> m (Reader m)
forall a b. (a -> b) -> a -> b
$ IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> m ByteString) -> IO ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
ByteString.hGetSome Handle
handle 4096
class CanGet r m where
runGet :: r -> Binary.Get a -> m a
instance CanGet (Reader m) m where
runGet :: Reader m -> Get a -> m a
runGet = Reader m -> Get a -> m a
forall (m :: * -> *). Reader m -> forall a. Get a -> m a
runReader
instance CanGet (Duplex m) m where
runGet :: Duplex m -> Get a -> m a
runGet = Reader m -> Get a -> m a
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet (Reader m -> Get a -> m a)
-> (Duplex m -> Reader m) -> Duplex m -> Get a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Duplex m -> Reader m
forall (m :: * -> *). Duplex m -> Reader m
duplexReader
read
:: (CanGet r m, Binary.Binary a)
=> r
-> m a
read :: r -> m a
read source :: r
source = r -> Get a -> m a
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet r
source Get a
forall t. Binary t => Get t
Binary.get
isEmpty
:: CanGet r m
=> r
-> m Bool
isEmpty :: r -> m Bool
isEmpty source :: r
source = r -> Get Bool -> m Bool
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet r
source Get Bool
Get.isEmpty
newtype Writer m = Writer
{ Writer m -> Put -> m ()
runWriter :: Binary.Put -> m () }
mapWriter :: (m () -> n ()) -> Writer m -> Writer n
mapWriter :: (m () -> n ()) -> Writer m -> Writer n
mapWriter f :: m () -> n ()
f (Writer write :: Put -> m ()
write) = (Put -> n ()) -> Writer n
forall (m :: * -> *). (Put -> m ()) -> Writer m
Writer (m () -> n ()
f (m () -> n ()) -> (Put -> m ()) -> Put -> n ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> m ()
write)
newWriterWith
:: (ByteString -> m ())
-> Writer m
newWriterWith :: (ByteString -> m ()) -> Writer m
newWriterWith write :: ByteString -> m ()
write = (Put -> m ()) -> Writer m
forall (m :: * -> *). (Put -> m ()) -> Writer m
Writer (ByteString -> m ()
write (ByteString -> m ()) -> (Put -> ByteString) -> Put -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
toStrict (ByteString -> ByteString)
-> (Put -> ByteString) -> Put -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
Put.runPut)
newWriter
:: MonadIO m
=> Handle
-> Writer m
newWriter :: Handle -> Writer m
newWriter handle :: Handle
handle = (ByteString -> m ()) -> Writer m
forall (m :: * -> *). (ByteString -> m ()) -> Writer m
newWriterWith (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> ByteString -> IO ()
ByteString.hPut Handle
handle)
class CanPut w m where
runPut :: w -> Binary.Put -> m ()
instance CanPut (Writer m) m where
runPut :: Writer m -> Put -> m ()
runPut = Writer m -> Put -> m ()
forall (m :: * -> *). Writer m -> Put -> m ()
runWriter
instance CanPut (Duplex m) m where
runPut :: Duplex m -> Put -> m ()
runPut = Writer m -> Put -> m ()
forall w (m :: * -> *). CanPut w m => w -> Put -> m ()
runPut (Writer m -> Put -> m ())
-> (Duplex m -> Writer m) -> Duplex m -> Put -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Duplex m -> Writer m
forall (m :: * -> *). Duplex m -> Writer m
duplexWriter
instance MonadIO m => CanPut Handle m where
runPut :: Handle -> Put -> m ()
runPut handle :: Handle
handle = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Put -> IO ()) -> Put -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> ByteString -> IO ()
ByteString.hPut Handle
handle (ByteString -> IO ()) -> (Put -> ByteString) -> Put -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
toStrict (ByteString -> ByteString)
-> (Put -> ByteString) -> Put -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
Put.runPut
write
:: (CanPut w m, Binary.Binary a)
=> w
-> a
-> m ()
write :: w -> a -> m ()
write sink :: w
sink value :: a
value = w -> Put -> m ()
forall w (m :: * -> *). CanPut w m => w -> Put -> m ()
runPut w
sink (Put -> m ()) -> Put -> m ()
forall a b. (a -> b) -> a -> b
$ a -> Put
forall t. Binary t => t -> Put
Binary.put a
value
newPipe :: (Concurrent.MonadConc m, MonadIO m) => m (Reader m, Writer m)
newPipe :: m (Reader m, Writer m)
newPipe = do
IORef (Deque ByteString)
chan <- IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString)))
-> IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString))
forall a b. (a -> b) -> a -> b
$ Deque ByteString -> IO (IORef (Deque ByteString))
forall a. a -> IO (IORef a)
newIORef Deque ByteString
forall a. Monoid a => a
mempty
Weak (IORef (Deque ByteString))
weakChan <- IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString))))
-> IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString)))
forall a b. (a -> b) -> a -> b
$ IORef (Deque ByteString)
-> IO () -> IO (Weak (IORef (Deque ByteString)))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef (Deque ByteString)
chan (IO () -> IO (Weak (IORef (Deque ByteString))))
-> IO () -> IO (Weak (IORef (Deque ByteString)))
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(await :: Await
await, notify :: Notify
notify) <- IO (Await, Notify) -> m (Await, Notify)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Await, Notify)
newAwaitNotify
let
read :: IO ByteString
read = do
Maybe (IORef (Deque ByteString))
mbChan <- Weak (IORef (Deque ByteString))
-> IO (Maybe (IORef (Deque ByteString)))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (IORef (Deque ByteString))
weakChan
case Maybe (IORef (Deque ByteString))
mbChan of
Nothing -> ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
ByteString.empty
Just chan :: IORef (Deque ByteString)
chan -> IO (IO ByteString) -> IO ByteString
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ByteString) -> IO ByteString)
-> IO (IO ByteString) -> IO ByteString
forall a b. (a -> b) -> a -> b
$
IORef (Deque ByteString)
-> (Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Deque ByteString)
chan ((Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString))
-> (Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString)
forall a b. (a -> b) -> a -> b
$ \queue :: Deque ByteString
queue ->
case Deque ByteString -> Maybe (ByteString, Deque ByteString)
forall a. Deque a -> Maybe (a, Deque a)
Deque.uncons Deque ByteString
queue of
Just (elem :: ByteString
elem, queue :: Deque ByteString
queue) -> (Deque ByteString
queue, ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
elem)
Nothing -> (Deque ByteString
queue, Await -> IO Bool
runAwait Await
await IO Bool -> IO ByteString -> IO ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ByteString
read)
write :: ByteString -> IO ()
write msg :: ByteString
msg =
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
ByteString.null ByteString
msg) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Deque ByteString)
-> (Deque ByteString -> (Deque ByteString, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Deque ByteString)
chan ((Deque ByteString -> (Deque ByteString, ())) -> IO ())
-> (Deque ByteString -> (Deque ByteString, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \queue :: Deque ByteString
queue ->
(ByteString -> Deque ByteString -> Deque ByteString
forall a. a -> Deque a -> Deque a
Deque.snoc ByteString
msg Deque ByteString
queue, ())
Notify -> IO ()
runNotify Notify
notify
Reader m
reader <- m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith (IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ByteString
read)
let writer :: Writer m
writer = (ByteString -> m ()) -> Writer m
forall (m :: * -> *). (ByteString -> m ()) -> Writer m
newWriterWith (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> IO ()
write)
(Reader m, Writer m) -> m (Reader m, Writer m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Reader m
reader, Writer m
writer)
data Duplex m = Duplex
{ Duplex m -> Writer m
duplexWriter :: Writer m
, Duplex m -> Reader m
duplexReader :: Reader m
}
mapDuplex :: (forall a. m a -> n a) -> Duplex m -> Duplex n
mapDuplex :: (forall a. m a -> n a) -> Duplex m -> Duplex n
mapDuplex f :: forall a. m a -> n a
f (Duplex w :: Writer m
w r :: Reader m
r) = Writer n -> Reader n -> Duplex n
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex ((m () -> n ()) -> Writer m -> Writer n
forall (m :: * -> *) (n :: * -> *).
(m () -> n ()) -> Writer m -> Writer n
mapWriter m () -> n ()
forall a. m a -> n a
f Writer m
w) ((forall a. m a -> n a) -> Reader m -> Reader n
forall (m :: * -> *) (n :: * -> *).
(forall a. m a -> n a) -> Reader m -> Reader n
mapReader forall a. m a -> n a
f Reader m
r)
newDuplex
:: (Concurrent.MonadConc m, MonadIO m)
=> Handle
-> m (Duplex m)
newDuplex :: Handle -> m (Duplex m)
newDuplex handle :: Handle
handle = Writer m -> Reader m -> Duplex m
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex (Handle -> Writer m
forall (m :: * -> *). MonadIO m => Handle -> Writer m
newWriter Handle
handle) (Reader m -> Duplex m) -> m (Reader m) -> m (Duplex m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> m (Reader m)
forall (m :: * -> *).
(MonadConc m, MonadIO m) =>
Handle -> m (Reader m)
newReader Handle
handle
newDuplexWith
:: Concurrent.MonadConc m
=> m ByteString
-> (ByteString -> m ())
-> m (Duplex m)
newDuplexWith :: m ByteString -> (ByteString -> m ()) -> m (Duplex m)
newDuplexWith getChunk :: m ByteString
getChunk writeChunk :: ByteString -> m ()
writeChunk = Writer m -> Reader m -> Duplex m
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex ((ByteString -> m ()) -> Writer m
forall (m :: * -> *). (ByteString -> m ()) -> Writer m
newWriterWith ByteString -> m ()
writeChunk) (Reader m -> Duplex m) -> m (Reader m) -> m (Duplex m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith m ByteString
getChunk