module Streamly.Internal.Network.Inet.TCP
(
acceptOnAddr
, acceptOnAddrWith
, acceptOnPort
, acceptOnPortLocal
, acceptorOnAddr
, acceptorOnAddrWith
, acceptorOnPort
, acceptorOnPortWith
, acceptorOnPortLocal
, connect
, withConnectionM
, usingConnection
, reader
, withConnection
, read
, write
, writeWithBufferOf
, putBytes
, putBytesWithBufferOf
, writeChunks
, putChunks
, pipeBytes
)
where
#include "inline.hs"
import Control.Exception (onException)
import Control.Monad.Catch (MonadCatch, MonadMask, bracket)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Network.Socket
(Socket, PortNumber, SocketOption(..), Family(..), SockAddr(..),
SocketType(..), defaultProtocol, maxListenQueue, tupleToHostAddress,
socket)
import Prelude hiding (read)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Control.ForkLifted (fork)
import Streamly.Internal.Data.Array.Type (Array(..), writeNUnsafe)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import Streamly.Internal.Data.Unboxed (Unbox)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import Streamly.Internal.Network.Socket (SockSpec(..), accept, acceptor)
import Streamly.Internal.System.IO (defaultChunkSize)
import qualified Control.Monad.Catch as MC
import qualified Network.Socket as Net
import qualified Streamly.Data.Array as A
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Data.Unfold as UF
import qualified Streamly.Internal.Data.Array.Type as A
import qualified Streamly.Internal.Data.Unfold as UF (first, bracketIO)
import qualified Streamly.Internal.Data.Fold.Type as FL (Step(..), reduce)
import qualified Streamly.Internal.Data.Stream.StreamD as S
import qualified Streamly.Internal.Data.Stream.Exception.Lifted as S
import qualified Streamly.Internal.Network.Socket as ISK
{-# INLINE acceptorOnAddrWith #-}
acceptorOnAddrWith
:: MonadIO m
=> [(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptorOnAddrWith :: forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptorOnAddrWith [(SocketOption, Int)]
opts = forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
UF.lmap ((Word8, Word8, Word8, Word8), PortNumber)
-> (Int, SockSpec, SockAddr)
f forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, SockSpec, SockAddr) Socket
acceptor
where
f :: ((Word8, Word8, Word8, Word8), PortNumber)
-> (Int, SockSpec, SockAddr)
f ((Word8, Word8, Word8, Word8)
addr, PortNumber
port) =
(Int
maxListenQueue
, SockSpec
{ sockFamily :: Family
sockFamily = Family
AF_INET
, sockType :: SocketType
sockType = SocketType
Stream
, sockProto :: ProtocolNumber
sockProto = ProtocolNumber
defaultProtocol
, sockOpts :: [(SocketOption, Int)]
sockOpts = [(SocketOption, Int)]
opts
}
, PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
tupleToHostAddress (Word8, Word8, Word8, Word8)
addr)
)
{-# INLINE acceptorOnAddr #-}
acceptorOnAddr
:: MonadIO m
=> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptorOnAddr :: forall (m :: * -> *).
MonadIO m =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptorOnAddr = forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptorOnAddrWith []
{-# INLINE acceptorOnPortWith #-}
acceptorOnPortWith :: MonadIO m
=> [(SocketOption, Int)]
-> Unfold m PortNumber Socket
acceptorOnPortWith :: forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)] -> Unfold m PortNumber Socket
acceptorOnPortWith [(SocketOption, Int)]
opts = forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.first (Word8
0,Word8
0,Word8
0,Word8
0) (forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptorOnAddrWith [(SocketOption, Int)]
opts)
{-# INLINE acceptorOnPort #-}
acceptorOnPort :: MonadIO m => Unfold m PortNumber Socket
acceptorOnPort :: forall (m :: * -> *). MonadIO m => Unfold m PortNumber Socket
acceptorOnPort = forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.first (Word8
0,Word8
0,Word8
0,Word8
0) forall (m :: * -> *).
MonadIO m =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptorOnAddr
{-# INLINE acceptorOnPortLocal #-}
acceptorOnPortLocal :: MonadIO m => Unfold m PortNumber Socket
acceptorOnPortLocal :: forall (m :: * -> *). MonadIO m => Unfold m PortNumber Socket
acceptorOnPortLocal = forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.first (Word8
127,Word8
0,Word8
0,Word8
1) forall (m :: * -> *).
MonadIO m =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptorOnAddr
{-# INLINE acceptOnAddrWith #-}
acceptOnAddrWith
:: MonadIO m
=> [(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Socket
acceptOnAddrWith :: forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Stream m Socket
acceptOnAddrWith [(SocketOption, Int)]
opts (Word8, Word8, Word8, Word8)
addr PortNumber
port =
forall (m :: * -> *).
MonadIO m =>
Int -> SockSpec -> SockAddr -> Stream m Socket
accept Int
maxListenQueue SockSpec
{ sockFamily :: Family
sockFamily = Family
AF_INET
, sockType :: SocketType
sockType = SocketType
Stream
, sockProto :: ProtocolNumber
sockProto = ProtocolNumber
defaultProtocol
, sockOpts :: [(SocketOption, Int)]
sockOpts = [(SocketOption, Int)]
opts
}
(PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
tupleToHostAddress (Word8, Word8, Word8, Word8)
addr))
{-# INLINE acceptOnAddr #-}
acceptOnAddr
:: MonadIO m
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Socket
acceptOnAddr :: forall (m :: * -> *).
MonadIO m =>
(Word8, Word8, Word8, Word8) -> PortNumber -> Stream m Socket
acceptOnAddr = forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Stream m Socket
acceptOnAddrWith []
{-# INLINE acceptOnPort #-}
acceptOnPort :: MonadIO m => PortNumber -> Stream m Socket
acceptOnPort :: forall (m :: * -> *). MonadIO m => PortNumber -> Stream m Socket
acceptOnPort = forall (m :: * -> *).
MonadIO m =>
(Word8, Word8, Word8, Word8) -> PortNumber -> Stream m Socket
acceptOnAddr (Word8
0,Word8
0,Word8
0,Word8
0)
{-# INLINE acceptOnPortLocal #-}
acceptOnPortLocal :: MonadIO m => PortNumber -> Stream m Socket
acceptOnPortLocal :: forall (m :: * -> *). MonadIO m => PortNumber -> Stream m Socket
acceptOnPortLocal = forall (m :: * -> *).
MonadIO m =>
(Word8, Word8, Word8, Word8) -> PortNumber -> Stream m Socket
acceptOnAddr (Word8
127,Word8
0,Word8
0,Word8
1)
connect :: (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect :: (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port = do
Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket Family
AF_INET SocketType
Stream ProtocolNumber
defaultProtocol
Socket -> SockAddr -> IO ()
Net.connect Socket
sock (PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
Net.tupleToHostAddress (Word8, Word8, Word8, Word8)
addr))
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
sock
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
{-# INLINABLE withConnectionM #-}
withConnectionM :: (MonadMask m, MonadIO m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> (Socket -> m ()) -> m ()
withConnectionM :: forall (m :: * -> *).
(MonadMask m, MonadIO m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> m ()) -> m ()
withConnectionM (Word8, Word8, Word8, Word8)
addr PortNumber
port =
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port) (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> IO ()
Net.close)
{-# INLINE usingConnection #-}
usingConnection :: (MonadCatch m, MonadAsync m)
=> Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection :: forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection = forall (m :: * -> *) a c d b.
(MonadIO m, MonadCatch m) =>
(a -> IO c) -> (c -> IO d) -> Unfold m c b -> Unfold m a b
UF.bracketIO (\((Word8, Word8, Word8, Word8)
addr, PortNumber
port) -> (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port) Socket -> IO ()
Net.close
{-# INLINE withConnection #-}
withConnection :: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> (Socket -> Stream m a)
-> Stream m a
withConnection :: forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> Stream m a) -> Stream m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port = forall (m :: * -> *) b c a.
(MonadIO m, MonadCatch m) =>
IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
S.bracketIO ((Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port) Socket -> IO ()
Net.close
{-# INLINE reader #-}
reader :: (MonadCatch m, MonadAsync m)
=> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Word8
reader :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Word8
reader = forall (m :: * -> *) b c a.
Monad m =>
Unfold m b c -> Unfold m a b -> Unfold m a c
UF.many forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.reader (forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection forall (m :: * -> *). MonadIO m => Unfold m Socket (Array Word8)
ISK.chunkReader)
{-# INLINE concatChunks #-}
concatChunks :: (Monad m, Unbox a) => Stream m (Array a) -> Stream m a
concatChunks :: forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Stream m (Array a) -> Stream m a
concatChunks = forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
S.unfoldMany forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.reader
{-# INLINE read #-}
read :: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> Stream m Word8
read :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8) -> PortNumber -> Stream m Word8
read (Word8, Word8, Word8, Word8)
addr PortNumber
port = forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Stream m (Array a) -> Stream m a
concatChunks forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> Stream m a) -> Stream m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port forall (m :: * -> *). MonadIO m => Socket -> Stream m (Array Word8)
ISK.readChunks
{-# INLINE putChunks #-}
putChunks
:: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m (Array Word8)
-> m ()
putChunks :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Stream m (Array Word8) -> m ()
putChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port Stream m (Array Word8)
xs =
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
S.fold forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> Stream m a) -> Stream m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port (\Socket
sk -> forall (m :: * -> *) a. Applicative m => m a -> Stream m a
S.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Socket -> Stream m (Array a) -> m ()
ISK.putChunks Socket
sk Stream m (Array Word8)
xs)
{-# INLINE writeChunks #-}
writeChunks
:: (MonadIO m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Fold m (Array Word8) ()
writeChunks :: forall (m :: * -> *).
(MonadIO m, MonadCatch m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Fold m (Array Word8) ()
writeChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *} {a} {b} {b}.
(MonadCatch m, MonadIO m) =>
Tuple' (Fold m a b) Socket
-> a -> m (Step (Tuple' (Fold m a b) Socket) b)
step forall {b}. m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b)
initial forall {m :: * -> *} {a} {b}.
MonadIO m =>
Tuple' (Fold m a b) Socket -> m b
extract
where
initial :: m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b)
initial = do
Socket
skt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ((Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port)
Fold m (Array Word8) ()
fld <- forall (m :: * -> *) a b. Monad m => Fold m a b -> m (Fold m a b)
FL.reduce (forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Socket -> Fold m (Array a) ()
ISK.writeChunks Socket
skt)
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
skt)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial (forall a b. a -> b -> Tuple' a b
Tuple' Fold m (Array Word8) ()
fld Socket
skt)
step :: Tuple' (Fold m a b) Socket
-> a -> m (Step (Tuple' (Fold m a b) Socket) b)
step (Tuple' Fold m a b
fld Socket
skt) a
x = do
Fold m a b
r <- forall (m :: * -> *) a b.
Monad m =>
a -> Fold m a b -> m (Fold m a b)
FL.addOne a
x Fold m a b
fld forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
skt)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial (forall a b. a -> b -> Tuple' a b
Tuple' Fold m a b
r Socket
skt)
extract :: Tuple' (Fold m a b) Socket -> m b
extract (Tuple' (Fold s -> a -> m (Step s b)
_ m (Step s b)
initial1 s -> m b
extract1) Socket
skt) = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
Net.close Socket
skt
Step s b
res <- m (Step s b)
initial1
case Step s b
res of
FL.Partial s
fs -> s -> m b
extract1 s
fs
FL.Done b
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return b
fb
{-# INLINE putBytesWithBufferOf #-}
putBytesWithBufferOf
:: (MonadCatch m, MonadAsync m)
=> Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> m ()
putBytesWithBufferOf :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> m ()
putBytesWithBufferOf Int
n (Word8, Word8, Word8, Word8)
addr PortNumber
port Stream m Word8
m = forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Stream m (Array Word8) -> m ()
putChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (Array a)
A.chunksOf Int
n Stream m Word8
m
{-# INLINE writeWithBufferOf #-}
writeWithBufferOf
:: (MonadIO m, MonadCatch m)
=> Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Fold m Word8 ()
writeWithBufferOf :: forall (m :: * -> *).
(MonadIO m, MonadCatch m) =>
Int
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
writeWithBufferOf Int
n (Word8, Word8, Word8, Word8)
addr PortNumber
port =
forall (m :: * -> *) a b c.
Monad m =>
Int -> Fold m a b -> Fold m b c -> Fold m a c
FL.groupsOf Int
n (forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (Array a)
writeNUnsafe Int
n) (forall (m :: * -> *).
(MonadIO m, MonadCatch m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Fold m (Array Word8) ()
writeChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port)
{-# INLINE putBytes #-}
putBytes :: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> Stream m Word8 -> m ()
putBytes :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Stream m Word8 -> m ()
putBytes = forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> m ()
putBytesWithBufferOf Int
defaultChunkSize
{-# INLINE write #-}
write :: (MonadIO m, MonadCatch m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
write :: forall (m :: * -> *).
(MonadIO m, MonadCatch m) =>
(Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
write = forall (m :: * -> *).
(MonadIO m, MonadCatch m) =>
Int
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
writeWithBufferOf Int
defaultChunkSize
{-# INLINE withInputConnect #-}
withInputConnect
:: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> (Socket -> Stream m a)
-> Stream m a
withInputConnect :: forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> (Socket -> Stream m a)
-> Stream m a
withInputConnect (Word8, Word8, Word8, Word8)
addr PortNumber
port Stream m Word8
input Socket -> Stream m a
f = forall (m :: * -> *) b c a.
(MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
S.bracket m (Socket, ThreadId)
pre forall {m :: * -> *} {b}. MonadIO m => (Socket, b) -> m ()
post forall {b}. (Socket, b) -> Stream m a
handler
where
pre :: m (Socket, ThreadId)
pre = do
Socket
sk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port
ThreadId
tid <- forall (m :: * -> *). MonadRunInIO m => m () -> m ThreadId
fork (forall (m :: * -> *). MonadIO m => Socket -> Stream m Word8 -> m ()
ISK.putBytes Socket
sk Stream m Word8
input)
forall (m :: * -> *) a. Monad m => a -> m a
return (Socket
sk, ThreadId
tid)
handler :: (Socket, b) -> Stream m a
handler (Socket
sk, b
_) = Socket -> Stream m a
f Socket
sk
post :: (Socket, b) -> m ()
post (Socket
sk, b
_) = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
Net.close Socket
sk
{-# INLINE pipeBytes #-}
pipeBytes
:: (MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> Stream m Word8
pipeBytes :: forall (m :: * -> *).
(MonadAsync m, MonadCatch m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Stream m Word8 -> Stream m Word8
pipeBytes (Word8, Word8, Word8, Word8)
addr PortNumber
port Stream m Word8
input = forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> (Socket -> Stream m a)
-> Stream m a
withInputConnect (Word8, Word8, Word8, Word8)
addr PortNumber
port Stream m Word8
input forall (m :: * -> *). MonadIO m => Socket -> Stream m Word8
ISK.read