module Network.Xmpp.Stream where
import Control.Applicative ((<$>), (<*>))
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM
import qualified Control.Exception as Ex
import Control.Exception.Base
import qualified Control.Exception.Lifted as ExL
import Control.Monad
import Control.Monad.Error
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Monad.State.Strict
import Control.Monad.Trans.Class
import qualified Data.ByteString as BS
import Data.ByteString.Base64
import Data.ByteString.Char8 as BSC8
import Data.Conduit
import Data.Conduit.Binary as CB
import qualified Data.Conduit.Internal as DCI
import qualified Data.Conduit.List as CL
import Data.Maybe (fromJust, isJust, isNothing)
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Void (Void)
import Data.XML.Pickle
import Data.XML.Types
import qualified GHC.IO.Exception as GIE
import Network
import Network.Xmpp.Marshal
import Network.Xmpp.Types
import System.IO
import System.IO.Error (tryIOError)
import System.Log.Logger
import Text.XML.Stream.Parse as XP
import Text.XML.Unresolved(InvalidEventStream(..))
import Control.Monad.Trans.Resource as R
import Network.Xmpp.Utilities
import Network.DNS hiding (encode, lookup)
import Data.Ord
import Data.Maybe
import Data.List
import Data.IP
import System.Random
import qualified Network.Socket as NS
readMaybe_ :: (Read a) => String -> Maybe a
readMaybe_ string = case reads string of
[(a, "")] -> Just a
_ -> Nothing
mbl :: Maybe [a] -> [a]
mbl (Just l) = l
mbl Nothing = []
lmb :: [t] -> Maybe [t]
lmb [] = Nothing
lmb x = Just x
streamUnpickleElem :: PU [Node] a
-> Element
-> StreamSink a
streamUnpickleElem p x = do
case unpickleElem p x of
Left l -> do
liftIO $ warningM "Pontarius.Xmpp" $ "streamUnpickleElem: Unpickle error: " ++ ppUnpickleError l
throwError $ XmppOtherFailure
Right r -> return r
type StreamSink a = ErrorT XmppFailure (Pipe Event Event Void () IO) a
throwOutJunk :: Monad m => Sink Event m ()
throwOutJunk = do
next <- CL.peek
case next of
Nothing -> return ()
Just (EventBeginElement _ _) -> return ()
_ -> CL.drop 1 >> throwOutJunk
openElementFromEvents :: StreamSink Element
openElementFromEvents = do
lift throwOutJunk
hd <- lift CL.head
case hd of
Just (EventBeginElement name attrs) -> return $ Element name attrs []
_ -> do
liftIO $ warningM "Pontarius.Xmpp" "openElementFromEvents: Stream ended."
throwError XmppOtherFailure
startStream :: StateT StreamState IO (Either XmppFailure ())
startStream = runErrorT $ do
lift $ lift $ debugM "Pontarius.Xmpp" "Starting stream..."
state <- lift $ get
let expectedTo = case ( streamConnectionState state
, toJid $ streamConfiguration state) of
(Plain, (Just (jid, True))) -> Just jid
(Secured, (Just (jid, _))) -> Just jid
(Plain, Nothing) -> Nothing
(Secured, Nothing) -> Nothing
case streamAddress state of
Nothing -> do
lift $ lift $ errorM "Pontarius.XMPP" "Server sent no hostname."
throwError XmppOtherFailure
Just address -> lift $ do
pushXmlDecl
pushOpenElement $
pickleElem xpStream ( "1.0"
, expectedTo
, Just (Jid Nothing address Nothing)
, Nothing
, preferredLang $ streamConfiguration state
)
response <- ErrorT $ runEventsSink $ runErrorT $ streamS expectedTo
case response of
Left e -> throwError e
Right (Right (ver, from, to, id, lt, features))
| (Text.unpack ver) /= "1.0" ->
closeStreamWithError StreamUnsupportedVersion Nothing
"Unknown version"
| lt == Nothing ->
closeStreamWithError StreamInvalidXml Nothing
"Stream has no language tag"
| isJust from && (from /= Just (Jid Nothing (fromJust $ streamAddress state) Nothing)) ->
closeStreamWithError StreamInvalidFrom Nothing
"Stream from is invalid"
| to /= expectedTo ->
closeStreamWithError StreamUndefinedCondition (Just $ Element "invalid-to" [] [])
"Stream to invalid"
| otherwise -> do
modify (\s -> s{ streamFeatures = features
, streamLang = lt
, streamId = id
, streamFrom = from
} )
return ()
Right (Left (Element name attrs children))
| (nameLocalName name /= "stream") ->
closeStreamWithError StreamInvalidXml Nothing
"Root element is not stream"
| (nameNamespace name /= Just "http://etherx.jabber.org/streams") ->
closeStreamWithError StreamInvalidNamespace Nothing
"Wrong root element name space"
| (isJust $ namePrefix name) && (fromJust (namePrefix name) /= "stream") ->
closeStreamWithError StreamBadNamespacePrefix Nothing
"Root name prefix set and not stream"
| otherwise -> ErrorT $ checkchildren (flattenAttrs attrs)
where
closeStreamWithError :: StreamErrorCondition -> Maybe Element -> String
-> ErrorT XmppFailure (StateT StreamState IO) ()
closeStreamWithError sec el msg = do
lift . pushElement . pickleElem xpStreamError
$ StreamErrorInfo sec Nothing el
lift $ closeStreams'
lift $ lift $ errorM "Pontarius.XMPP" $ "closeStreamWithError: " ++ msg
throwError XmppOtherFailure
checkchildren children =
let to' = lookup "to" children
ver' = lookup "version" children
xl = lookup xmlLang children
in case () of () | Just (Nothing :: Maybe Jid) == (safeRead <$> to') ->
runErrorT $ closeStreamWithError
StreamBadNamespacePrefix Nothing
"stream to not a valid JID"
| Nothing == ver' ->
runErrorT $ closeStreamWithError
StreamUnsupportedVersion Nothing
"stream no version"
| Just (Nothing :: Maybe LangTag) == (safeRead <$> xl) ->
runErrorT $ closeStreamWithError
StreamInvalidXml Nothing
"stream no language tag"
| otherwise ->
runErrorT $ closeStreamWithError
StreamBadFormat Nothing
""
safeRead x = case reads $ Text.unpack x of
[] -> Nothing
[(y,_),_] -> Just y
flattenAttrs :: [(Name, [Content])] -> [(Name, Text.Text)]
flattenAttrs attrs = Prelude.map (\(name, content) ->
( name
, Text.concat $ Prelude.map uncontentify content)
)
attrs
where
uncontentify (ContentText t) = t
uncontentify _ = ""
restartStream :: StateT StreamState IO (Either XmppFailure ())
restartStream = do
lift $ debugM "Pontarius.XMPP" "Restarting stream..."
raw <- gets (streamReceive . streamHandle)
let newSource = DCI.ResumableSource (loopRead raw $= XP.parseBytes def)
(return ())
modify (\s -> s{streamEventSource = newSource })
startStream
where
loopRead read = do
bs <- liftIO (read 4096)
if BS.null bs
then return ()
else yield bs >> loopRead read
streamS :: Maybe Jid -> StreamSink (Either Element ( Text
, Maybe Jid
, Maybe Jid
, Maybe Text
, Maybe LangTag
, StreamFeatures ))
streamS expectedTo = do
header <- xmppStreamHeader
case header of
Right (version, from, to, id, langTag) -> do
features <- xmppStreamFeatures
return $ Right (version, from, to, id, langTag, features)
Left el -> return $ Left el
where
xmppStreamHeader :: StreamSink (Either Element (Text, Maybe Jid, Maybe Jid, Maybe Text.Text, Maybe LangTag))
xmppStreamHeader = do
lift throwOutJunk
el <- openElementFromEvents
case unpickleElem xpStream el of
Left _ -> return $ Left el
Right r -> return $ Right r
xmppStreamFeatures :: StreamSink StreamFeatures
xmppStreamFeatures = do
e <- lift $ elements =$ CL.head
case e of
Nothing -> do
lift $ lift $ errorM "Pontarius.XMPP" "streamS: Stream ended."
throwError XmppOtherFailure
Just r -> streamUnpickleElem xpStreamFeatures r
openStream :: HostName -> StreamConfiguration -> IO (Either XmppFailure (Stream))
openStream realm config = runErrorT $ do
lift $ debugM "Pontarius.XMPP" "Opening stream..."
stream' <- createStream realm config
result <- liftIO $ withStream startStream stream'
return stream'
closeStreams :: Stream -> IO (Either XmppFailure [Element])
closeStreams = withStream closeStreams'
closeStreams' = do
lift $ debugM "Pontarius.XMPP" "Closing stream..."
send <- gets (streamSend . streamHandle)
cc <- gets (streamClose . streamHandle)
liftIO $ send "</stream:stream>"
void $ liftIO $ forkIO $ do
threadDelay 3000000
(Ex.try cc) :: IO (Either Ex.SomeException ())
return ()
collectElems []
where
collectElems :: [Element] -> StateT StreamState IO (Either XmppFailure [Element])
collectElems es = do
result <- pullElement
case result of
Left StreamEndFailure -> return $ Right es
Left e -> return $ Left $ StreamCloseError (es, e)
Right e -> collectElems (e:es)
wrapIOException :: IO a -> StateT StreamState IO (Either XmppFailure a)
wrapIOException action = do
r <- liftIO $ tryIOError action
case r of
Right b -> return $ Right b
Left e -> do
lift $ warningM "Pontarius.XMPP" $ "wrapIOException: Exception wrapped: " ++ (show e)
return $ Left $ XmppIOException e
pushElement :: Element -> StateT StreamState IO (Either XmppFailure Bool)
pushElement x = do
send <- gets (streamSend . streamHandle)
wrapIOException $ send $ renderElement x
pushStanza :: Stanza -> Stream -> IO (Either XmppFailure Bool)
pushStanza s = withStream' . pushElement $ pickleElem xpStanza s
pushXmlDecl :: StateT StreamState IO (Either XmppFailure Bool)
pushXmlDecl = do
con <- gets streamHandle
wrapIOException $ (streamSend con) "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>"
pushOpenElement :: Element -> StateT StreamState IO (Either XmppFailure Bool)
pushOpenElement e = do
sink <- gets (streamSend . streamHandle)
wrapIOException $ sink $ renderOpenElement e
runEventsSink :: Sink Event IO b -> StateT StreamState IO (Either XmppFailure b)
runEventsSink snk = do
src <- gets streamEventSource
(src', r) <- lift $ src $$++ snk
modify (\s -> s{streamEventSource = src'})
return $ Right r
pullElement :: StateT StreamState IO (Either XmppFailure Element)
pullElement = do
ExL.catches (do
e <- runEventsSink (elements =$ await)
case e of
Left f -> return $ Left f
Right Nothing -> do
lift $ errorM "Pontarius.XMPP" "pullElement: No element."
return . Left $ XmppOtherFailure
Right (Just r) -> return $ Right r
)
[ ExL.Handler (\StreamEnd -> return $ Left StreamEndFailure)
, ExL.Handler (\(InvalidXmppXml s)
-> do
lift $ errorM "Pontarius.XMPP" $ "pullElement: Invalid XML: " ++ (show s)
return . Left $ XmppOtherFailure)
, ExL.Handler $ \(e :: InvalidEventStream)
-> do
lift $ errorM "Pontarius.XMPP" $ "pullElement: Invalid event stream: " ++ (show e)
return . Left $ XmppOtherFailure
]
pullUnpickle :: PU [Node] a -> StateT StreamState IO (Either XmppFailure a)
pullUnpickle p = do
elem <- pullElement
case elem of
Left e -> return $ Left e
Right elem' -> do
let res = unpickleElem p elem'
case res of
Left e -> do
lift $ errorM "Pontarius.XMPP" $ "pullUnpickle: Unpickle failed: " ++ (ppUnpickleError e)
return . Left $ XmppOtherFailure
Right r -> return $ Right r
pullStanza :: Stream -> IO (Either XmppFailure Stanza)
pullStanza = withStream $ do
res <- pullUnpickle xpStreamStanza
case res of
Left e -> return $ Left e
Right (Left e) -> return $ Left $ StreamErrorFailure e
Right (Right r) -> return $ Right r
catchPush :: IO () -> IO Bool
catchPush p = ExL.catch
(p >> return True)
(\e -> case GIE.ioe_type e of
GIE.ResourceVanished -> return False
GIE.IllegalOperation -> return False
_ -> ExL.throwIO e
)
xmppNoStream :: StreamState
xmppNoStream = StreamState {
streamConnectionState = Closed
, streamHandle = StreamHandle { streamSend = \_ -> return False
, streamReceive = \_ -> do
errorM "Pontarius.XMPP" "xmppNoStream: No stream on receive."
ExL.throwIO $
XmppOtherFailure
, streamFlush = return ()
, streamClose = return ()
}
, streamEventSource = DCI.ResumableSource zeroSource (return ())
, streamFeatures = StreamFeatures Nothing [] []
, streamAddress = Nothing
, streamFrom = Nothing
, streamId = Nothing
, streamLang = Nothing
, streamJid = Nothing
, streamConfiguration = def
}
where
zeroSource :: Source IO output
zeroSource = liftIO $ do
errorM "Pontarius.XMPP" "zeroSource utilized."
ExL.throwIO XmppOtherFailure
createStream :: HostName -> StreamConfiguration -> ErrorT XmppFailure IO (Stream)
createStream realm config = do
result <- connect realm config
case result of
Just h -> ErrorT $ do
debugM "Pontarius.Xmpp" "Acquired handle."
debugM "Pontarius.Xmpp" "Setting NoBuffering mode on handle."
hSetBuffering h NoBuffering
let eSource = DCI.ResumableSource
((sourceHandle h $= logConduit) $= XP.parseBytes def)
(return ())
let hand = StreamHandle { streamSend = \d -> catchPush $ BS.hPut h d
, streamReceive = \n -> BS.hGetSome h n
, streamFlush = hFlush h
, streamClose = hClose h
}
let stream = StreamState
{ streamConnectionState = Plain
, streamHandle = hand
, streamEventSource = eSource
, streamFeatures = StreamFeatures Nothing [] []
, streamAddress = Just $ Text.pack realm
, streamFrom = Nothing
, streamId = Nothing
, streamLang = Nothing
, streamJid = Nothing
, streamConfiguration = config
}
stream' <- mkStream stream
return $ Right stream'
Nothing -> do
lift $ debugM "Pontarius.Xmpp" "Did not acquire handle."
throwError TcpConnectionFailure
where
logConduit :: Conduit ByteString IO ByteString
logConduit = CL.mapM $ \d -> do
debugM "Pontarius.Xmpp" $ "Received TCP data: " ++ (BSC8.unpack d) ++
"."
return d
connect :: HostName -> StreamConfiguration -> ErrorT XmppFailure IO (Maybe Handle)
connect realm config = do
case socketDetails config of
Just socketDetails' -> lift $ do
debugM "Pontarius.Xmpp" "Connecting to configured SockAddr address..."
connectTcp $ Left socketDetails'
Nothing -> do
case (readMaybe_ realm :: Maybe IPv6, readMaybe_ realm :: Maybe IPv4, hostname (Text.pack realm) :: Maybe Hostname) of
(Just ipv6, Nothing, _) -> lift $ connectTcp $ Right [(show ipv6, 5222)]
(Nothing, Just ipv4, _) -> lift $ connectTcp $ Right [(show ipv4, 5222)]
(Nothing, Nothing, Just (Hostname realm')) -> do
resolvSeed <- lift $ makeResolvSeed (resolvConf config)
lift $ debugM "Pontarius.Xmpp" "Performing SRV lookup..."
srvRecords <- srvLookup realm' resolvSeed
case srvRecords of
Nothing -> do
lift $ debugM "Pontarius.Xmpp" "No SRV records, using fallback process..."
lift $ resolvAndConnectTcp resolvSeed (BSC8.pack $ realm) 5222
Just srvRecords' -> do
lift $ debugM "Pontarius.Xmpp" "SRV records found, performing A/AAAA lookups..."
lift $ resolvSrvsAndConnectTcp resolvSeed srvRecords'
(Nothing, Nothing, Nothing) -> do
lift $ errorM "Pontarius.Xmpp" "The hostname could not be validated."
throwError XmppIllegalTcpDetails
connectTcp :: Either (NS.Socket, NS.SockAddr) [(HostName, Int)] -> IO (Maybe Handle)
connectTcp (Right []) = return Nothing
connectTcp (Right ((address, port):remainder)) = do
result <- try $ (do
debugM "Pontarius.Xmpp" $ "Connecting to " ++ (address) ++ " on port " ++
(show port) ++ "."
connectTo address (PortNumber $ fromIntegral port)) :: IO (Either IOException Handle)
case result of
Right handle -> do
debugM "Pontarius.Xmpp" "Successfully connected to HostName."
return $ Just handle
Left _ -> do
debugM "Pontarius.Xmpp" "Connection to HostName could not be established."
connectTcp $ Right remainder
connectTcp (Left (sock, sockAddr)) = do
result <- try $ (do
NS.connect sock sockAddr
NS.socketToHandle sock ReadWriteMode) :: IO (Either IOException Handle)
case result of
Right handle -> do
debugM "Pontarius.Xmpp" "Successfully connected to SockAddr."
return $ Just handle
Left _ -> do
debugM "Pontarius.Xmpp" "Connection to SockAddr could not be established."
return Nothing
resolvAndConnectTcp :: ResolvSeed -> Domain -> Int -> IO (Maybe Handle)
resolvAndConnectTcp resolvSeed domain port = do
aaaaResults <- (try $ rethrowErrorCall $ withResolver resolvSeed $
\resolver -> lookupAAAA resolver domain) :: IO (Either IOException (Maybe [IPv6]))
handle <- case aaaaResults of
Right Nothing -> return Nothing
Right (Just ipv6s) -> connectTcp $ Right $ Data.List.map (\ipv6 -> (show ipv6, port)) ipv6s
Left e -> return Nothing
case handle of
Nothing -> do
aResults <- (try $ rethrowErrorCall $ withResolver resolvSeed $
\resolver -> lookupA resolver domain) :: IO (Either IOException (Maybe [IPv4]))
handle' <- case aResults of
Right Nothing -> return Nothing
Right (Just ipv4s) -> connectTcp $ Right $ Data.List.map (\ipv4 -> (show ipv4, port)) ipv4s
case handle' of
Nothing -> return Nothing
Just handle'' -> return $ Just handle''
Just handle' -> return $ Just handle'
resolvSrvsAndConnectTcp :: ResolvSeed -> [(Domain, Int)] -> IO (Maybe Handle)
resolvSrvsAndConnectTcp _ [] = return Nothing
resolvSrvsAndConnectTcp resolvSeed ((domain, port):remaining) = do
result <- resolvAndConnectTcp resolvSeed domain port
case result of
Just handle -> return $ Just handle
Nothing -> resolvSrvsAndConnectTcp resolvSeed remaining
rethrowErrorCall :: IO a -> IO a
rethrowErrorCall action = do
result <- try action
case result of
Right result' -> return result'
Left (ErrorCall e) -> ioError $ userError $ "rethrowErrorCall: " ++ e
Left e -> throwIO e
srvLookup :: Text -> ResolvSeed -> ErrorT XmppFailure IO (Maybe [(Domain, Int)])
srvLookup realm resolvSeed = ErrorT $ do
result <- try $ rethrowErrorCall $ withResolver resolvSeed $ \resolver -> do
srvResult <- lookupSRV resolver $ BSC8.pack $ "_xmpp-client._tcp." ++ (Text.unpack realm) ++ "."
case srvResult of
Just srvResult -> do
debugM "Pontarius.Xmpp" $ "SRV result: " ++ (show srvResult)
srvResult' <- orderSrvResult srvResult
return $ Just $ Prelude.map (\(_, _, port, domain) -> (domain, port)) srvResult'
Just [(_, _, _, ".")] -> do
debugM "Pontarius.Xmpp" $ "\".\" SRV result returned."
return $ Just []
Nothing -> do
debugM "Pontarius.Xmpp" "No SRV result returned."
return Nothing
case result of
Right result' -> return $ Right result'
Left e -> return $ Left $ XmppIOException e
where
orderSrvResult :: [(Int, Int, Int, Domain)] -> IO [(Int, Int, Int, Domain)]
orderSrvResult srvResult = do
let srvResult' = sortBy (comparing (\(priority, _, _, _) -> priority)) srvResult
let srvResult'' = Data.List.groupBy (\(priority, _, _, _) (priority', _, _, _) -> priority == priority') srvResult' :: [[(Int, Int, Int, Domain)]]
let srvResult''' = Data.List.map (\sublist -> let (a, b) = partition (\(_, weight, _, _) -> weight == 0) sublist in Data.List.concat [a, b]) srvResult''
srvResult'''' <- mapM orderSublist srvResult'''
return $ Data.List.concat srvResult''''
where
orderSublist :: [(Int, Int, Int, Domain)] -> IO [(Int, Int, Int, Domain)]
orderSublist [] = return []
orderSublist sublist = do
let (total, sublist') = Data.List.mapAccumL (\total (priority, weight, port, domain) -> (total + weight, (priority, weight, port, domain, total + weight))) 0 sublist
randomNumber <- randomRIO (0, total)
let (beginning, ((priority, weight, port, domain, _):end)) = Data.List.break (\(_, _, _, _, running) -> randomNumber <= running) sublist'
let sublist'' = Data.List.map (\(priority, weight, port, domain, _) -> (priority, weight, port, domain)) (Data.List.concat [beginning, end])
tail <- orderSublist sublist''
return $ ((priority, weight, port, domain):tail)
killStream :: Stream -> IO (Either XmppFailure ())
killStream = withStream $ do
cc <- gets (streamClose . streamHandle)
err <- wrapIOException cc
put xmppNoStream
return err
pushIQ :: StanzaID
-> Maybe Jid
-> IQRequestType
-> Maybe LangTag
-> Element
-> Stream
-> IO (Either XmppFailure (Either IQError IQResult))
pushIQ iqID to tp lang body stream = do
pushStanza (IQRequestS $ IQRequest iqID Nothing to lang tp body) stream
res <- pullStanza stream
case res of
Left e -> return $ Left e
Right (IQErrorS e) -> return $ Right $ Left e
Right (IQResultS r) -> do
unless
(iqID == iqResultID r) $ liftIO $ do
errorM "Pontarius.XMPP" $ "pushIQ: ID mismatch (" ++ (show iqID) ++ " /= " ++ (show $ iqResultID r) ++ ")."
ExL.throwIO XmppOtherFailure
return $ Right $ Right r
_ -> do
errorM "Pontarius.XMPP" $ "pushIQ: Unexpected stanza type."
return . Left $ XmppOtherFailure
debugConduit :: Pipe l ByteString ByteString u IO b
debugConduit = forever $ do
s' <- await
case s' of
Just s -> do
liftIO $ debugM "Pontarius.XMPP" $ "debugConduit: In: " ++ (show s)
yield s
Nothing -> return ()
elements :: R.MonadThrow m => Conduit Event m Element
elements = do
x <- await
case x of
Just (EventBeginElement n as) -> do
goE n as >>= yield
elements
Just (EventEndElement streamName) -> lift $ R.monadThrow StreamEnd
Nothing -> return ()
_ -> lift $ R.monadThrow $ InvalidXmppXml $ "not an element: " ++ show x
where
many' f =
go id
where
go front = do
x <- f
case x of
Left x -> return $ (x, front [])
Right y -> go (front . (:) y)
goE n as = do
(y, ns) <- many' goN
if y == Just (EventEndElement n)
then return $ Element n as $ compressNodes ns
else lift $ R.monadThrow $ InvalidXmppXml $
"Missing close tag: " ++ show n
goN = do
x <- await
case x of
Just (EventBeginElement n as) -> (Right . NodeElement) <$> goE n as
Just (EventInstruction i) -> return $ Right $ NodeInstruction i
Just (EventContent c) -> return $ Right $ NodeContent c
Just (EventComment t) -> return $ Right $ NodeComment t
Just (EventCDATA t) -> return $ Right $ NodeContent $ ContentText t
_ -> return $ Left x
compressNodes :: [Node] -> [Node]
compressNodes [] = []
compressNodes [x] = [x]
compressNodes (NodeContent (ContentText x) : NodeContent (ContentText y) : z) =
compressNodes $ NodeContent (ContentText $ x `Text.append` y) : z
compressNodes (x:xs) = x : compressNodes xs
streamName :: Name
streamName = (Name "stream" (Just "http://etherx.jabber.org/streams") (Just "stream"))
withStream :: StateT StreamState IO (Either XmppFailure c) -> Stream -> IO (Either XmppFailure c)
withStream action (Stream stream) = bracketOnError
(atomically $ takeTMVar stream )
(atomically . putTMVar stream)
(\s -> do
(r, s') <- runStateT action s
atomically $ putTMVar stream s'
return r
)
withStream' :: StateT StreamState IO (Either XmppFailure b) -> Stream -> IO (Either XmppFailure b)
withStream' action (Stream stream) = do
stream_ <- atomically $ readTMVar stream
(r, _) <- runStateT action stream_
return r
mkStream :: StreamState -> IO (Stream)
mkStream con = Stream `fmap` (atomically $ newTMVar con)