{-# LANGUAGE MultiParamTypeClasses   #-}
{-# LANGUAGE FlexibleInstances       #-}
{-# LANGUAGE RecordWildCards         #-}
{-# LANGUAGE LambdaCase              #-}
{-# LANGUAGE DataKinds               #-}
{-# LANGUAGE GADTs                   #-}
-----------------------------------------------------------------------------
-- |
-- Module      :  Network.XMPP.Concurrent
-- Copyright   :  (c) pierre, 2007
-- License     :  BSD-style (see the file libraries/base/LICENSE)
-- Copyright   :  (c) riskbook, 2020
-- SPDX-License-Identifier:  BSD3
-- 
-- Maintainer  :  k.pierre.k@gmail.com
-- Stability   :  experimental
-- Portability :  portable
--
-- Concurrent actions over single IO channel
--
-----------------------------------------------------------------------------
module Network.XMPP.Concurrent
  ( Thread
  , XmppThreadT
  , runThreaded
  , readChanS
  , writeChanS
  , withNewThread
  , loop
  , waitFor
  ) where

import Control.Concurrent
import Control.Monad.State
import Control.Monad.Reader

import Network.XMPP.Stream
import Network.XMPP.Types
import Network.XMPP.Utils
import Network.XMPP.XML
import UnliftIO.Async          (Async, async)
import UnliftIO                (TChan, MonadUnliftIO, atomically, newTChan,
                                writeTChan, readTChan, dupTChan)

import System.IO

data Thread e = Thread
  { Thread e -> TChan (Either XmppError (SomeStanza e))
tInCh  :: TChan (Either XmppError (SomeStanza e))
  , Thread e -> TChan (SomeStanza ())
tOutCh :: TChan (SomeStanza ())
  }

type XmppThreadT m a e = ReaderT (Thread e) m a

instance MonadIO m => XmppSendable (ReaderT (Thread e) m) (Stanza t 'Outgoing ()) where
  xmppSend :: Stanza t 'Outgoing () -> ReaderT (Thread e) m ()
xmppSend = SomeStanza () -> ReaderT (Thread e) m ()
forall (m :: * -> *) e.
MonadIO m =>
SomeStanza () -> XmppThreadT m () e
writeChanS (SomeStanza () -> ReaderT (Thread e) m ())
-> (Stanza t 'Outgoing () -> SomeStanza ())
-> Stanza t 'Outgoing ()
-> ReaderT (Thread e) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stanza t 'Outgoing () -> SomeStanza ()
forall e (a :: StanzaType) (p :: StanzaPurpose).
Stanza a p e -> SomeStanza e
SomeStanza

-- Two streams: input and output. Threads read from input stream and write to output stream.
-- | Runs thread in XmppState monad
--   blocks forever.
runThreaded
  :: (FromXML e, MonadIO m, MonadUnliftIO m)
  => XmppThreadT m () e
  -> XmppMonad m ()
runThreaded :: XmppThreadT m () e -> XmppMonad m ()
runThreaded XmppThreadT m () e
action = do
  (TChan (Either XmppError (SomeStanza e))
in', TChan (SomeStanza ())
out')  <- STM
  (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
-> XmppMonad
     m (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM
   (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
 -> XmppMonad
      m (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ())))
-> STM
     (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
-> XmppMonad
     m (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
forall a b. (a -> b) -> a -> b
$ (,) (TChan (Either XmppError (SomeStanza e))
 -> TChan (SomeStanza ())
 -> (TChan (Either XmppError (SomeStanza e)),
     TChan (SomeStanza ())))
-> STM (TChan (Either XmppError (SomeStanza e)))
-> STM
     (TChan (SomeStanza ())
      -> (TChan (Either XmppError (SomeStanza e)),
          TChan (SomeStanza ())))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TChan (Either XmppError (SomeStanza e)))
forall a. STM (TChan a)
newTChan STM
  (TChan (SomeStanza ())
   -> (TChan (Either XmppError (SomeStanza e)),
       TChan (SomeStanza ())))
-> STM (TChan (SomeStanza ()))
-> STM
     (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TChan (SomeStanza ()))
forall a. STM (TChan a)
newTChan
  s :: Stream
s@Stream{Int
[Token]
Handle
lexemes :: Stream -> [Token]
idx :: Stream -> Int
handle :: Stream -> Handle
lexemes :: [Token]
idx :: Int
handle :: Handle
..} <- XmppMonad m Stream
forall s (m :: * -> *). MonadState s m => m s
get
  XmppMonad m (Async ()) -> XmppMonad m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (XmppMonad m (Async ()) -> XmppMonad m ())
-> XmppMonad m (Async ()) -> XmppMonad m ()
forall a b. (a -> b) -> a -> b
$ m (Async ()) -> XmppMonad m (Async ())
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Async ()) -> XmppMonad m (Async ()))
-> m (Async ()) -> XmppMonad m (Async ())
forall a b. (a -> b) -> a -> b
$
    m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (XmppThreadT m () e -> Thread e -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT XmppThreadT m () e
action (Thread e -> m ()) -> Thread e -> m ()
forall a b. (a -> b) -> a -> b
$ TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ()) -> Thread e
forall e.
TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ()) -> Thread e
Thread TChan (Either XmppError (SomeStanza e))
in' TChan (SomeStanza ())
out') m (Async ()) -> m (Async ()) -> m (Async ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
    m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (m (Async ((), Stream)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Async ((), Stream)) -> m ()) -> m (Async ((), Stream)) -> m ()
forall a b. (a -> b) -> a -> b
$ m ((), Stream) -> m (Async ((), Stream))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (m ((), Stream) -> m (Async ((), Stream)))
-> m ((), Stream) -> m (Async ((), Stream))
forall a b. (a -> b) -> a -> b
$ Stream -> XmppMonad m () -> m ((), Stream)
forall (m :: * -> *) a.
MonadIO m =>
Stream -> XmppMonad m a -> m (a, Stream)
runXmppMonad' Stream
s (XmppMonad m () -> m ((), Stream))
-> XmppMonad m () -> m ((), Stream)
forall a b. (a -> b) -> a -> b
$ TChan (SomeStanza ()) -> XmppMonad m ()
forall (m :: * -> *) e.
MonadIO m =>
TChan (SomeStanza e) -> XmppMonad m ()
loopWrite TChan (SomeStanza ())
out') m (Async ()) -> m (Async ()) -> m (Async ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
    m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (Handle -> m ()
forall (m :: * -> *). MonadIO m => Handle -> m ()
connPersist Handle
handle)
  TChan (Either XmppError (SomeStanza e)) -> XmppMonad m ()
forall (m :: * -> *) e.
(FromXML e, MonadIO m) =>
TChan (Either XmppError (SomeStanza e)) -> XmppMonad m ()
loopRead TChan (Either XmppError (SomeStanza e))
in'
 where
  loopRead :: TChan (Either XmppError (SomeStanza e)) -> XmppMonad m ()
loopRead TChan (Either XmppError (SomeStanza e))
in' = do
    Either XmppError (SomeStanza e)
msg <- XmppMonad m (Either XmppError (SomeStanza e))
forall e (m :: * -> *).
(FromXML e, MonadIO m) =>
XmppMonad m (Either XmppError (SomeStanza e))
parseM
    STM () -> XmppMonad m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> XmppMonad m ()) -> STM () -> XmppMonad m ()
forall a b. (a -> b) -> a -> b
$ TChan (Either XmppError (SomeStanza e))
-> Either XmppError (SomeStanza e) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Either XmppError (SomeStanza e))
in' Either XmppError (SomeStanza e)
msg
    case Either XmppError (SomeStanza e)
msg of
      Left XmppError
StreamClosedError -> () -> XmppMonad m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Left XmppError
RanOutOfInput     -> () -> XmppMonad m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Either XmppError (SomeStanza e)
_                      -> TChan (Either XmppError (SomeStanza e)) -> XmppMonad m ()
loopRead TChan (Either XmppError (SomeStanza e))
in'
  loopWrite :: MonadIO m => TChan (SomeStanza e) -> XmppMonad m ()
  loopWrite :: TChan (SomeStanza e) -> XmppMonad m ()
loopWrite TChan (SomeStanza e)
out'= do
    IO (SomeStanza e) -> XmppMonad m (SomeStanza e)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (STM (SomeStanza e) -> IO (SomeStanza e)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (SomeStanza e) -> IO (SomeStanza e))
-> STM (SomeStanza e) -> IO (SomeStanza e)
forall a b. (a -> b) -> a -> b
$ TChan (SomeStanza e) -> STM (SomeStanza e)
forall a. TChan a -> STM a
readTChan TChan (SomeStanza e)
out') XmppMonad m (SomeStanza e)
-> (SomeStanza e -> XmppMonad m ()) -> XmppMonad m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      SomeStanza stnz :: Stanza a p e
stnz@MkMessage { mPurpose :: forall (p :: StanzaPurpose) ext. Stanza 'Message p ext -> Sing p
mPurpose = Sing p
SOutgoing } -> Stanza a p e -> XmppMonad m ()
forall (t :: * -> *) a. (XmppSendable t a, Monad t) => a -> t ()
xmppSend Stanza a p e
stnz
      SomeStanza stnz :: Stanza a p e
stnz@MkPresence { pPurpose :: forall (p :: StanzaPurpose) ext. Stanza 'Presence p ext -> Sing p
pPurpose = Sing p
SOutgoing } -> Stanza a p e -> XmppMonad m ()
forall (t :: * -> *) a. (XmppSendable t a, Monad t) => a -> t ()
xmppSend Stanza a p e
stnz
      SomeStanza stnz :: Stanza a p e
stnz@MkIQ { iqPurpose :: forall (p :: StanzaPurpose) ext. Stanza 'IQ p ext -> Sing p
iqPurpose = Sing p
SOutgoing } -> Stanza a p e -> XmppMonad m ()
forall (t :: * -> *) a. (XmppSendable t a, Monad t) => a -> t ()
xmppSend Stanza a p e
stnz
      SomeStanza e
_ -> () -> XmppMonad m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Won't happen, but we gotta make compiler happy
    TChan (SomeStanza e) -> XmppMonad m ()
forall (m :: * -> *) e.
MonadIO m =>
TChan (SomeStanza e) -> XmppMonad m ()
loopWrite TChan (SomeStanza e)
out'

readChanS :: MonadIO m => XmppThreadT m (Either XmppError (SomeStanza e)) e
readChanS :: XmppThreadT m (Either XmppError (SomeStanza e)) e
readChanS = (Thread e -> TChan (Either XmppError (SomeStanza e)))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Thread e -> TChan (Either XmppError (SomeStanza e))
forall e. Thread e -> TChan (Either XmppError (SomeStanza e))
tInCh ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
-> (TChan (Either XmppError (SomeStanza e))
    -> XmppThreadT m (Either XmppError (SomeStanza e)) e)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Either XmppError (SomeStanza e))
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either XmppError (SomeStanza e))
 -> XmppThreadT m (Either XmppError (SomeStanza e)) e)
-> (TChan (Either XmppError (SomeStanza e))
    -> IO (Either XmppError (SomeStanza e)))
-> TChan (Either XmppError (SomeStanza e))
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Either XmppError (SomeStanza e))
-> IO (Either XmppError (SomeStanza e))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Either XmppError (SomeStanza e))
 -> IO (Either XmppError (SomeStanza e)))
-> (TChan (Either XmppError (SomeStanza e))
    -> STM (Either XmppError (SomeStanza e)))
-> TChan (Either XmppError (SomeStanza e))
-> IO (Either XmppError (SomeStanza e))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (Either XmppError (SomeStanza e))
-> STM (Either XmppError (SomeStanza e))
forall a. TChan a -> STM a
readTChan

writeChanS :: MonadIO m => SomeStanza () -> XmppThreadT m () e
writeChanS :: SomeStanza () -> XmppThreadT m () e
writeChanS SomeStanza ()
a = XmppThreadT m () e -> XmppThreadT m () e
forall (f :: * -> *) a. Functor f => f a -> f ()
void (XmppThreadT m () e -> XmppThreadT m () e)
-> XmppThreadT m () e -> XmppThreadT m () e
forall a b. (a -> b) -> a -> b
$ (Thread e -> TChan (SomeStanza ()))
-> ReaderT (Thread e) m (TChan (SomeStanza ()))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Thread e -> TChan (SomeStanza ())
forall e. Thread e -> TChan (SomeStanza ())
tOutCh ReaderT (Thread e) m (TChan (SomeStanza ()))
-> (TChan (SomeStanza ()) -> XmppThreadT m () e)
-> XmppThreadT m () e
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> XmppThreadT m () e
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> XmppThreadT m () e)
-> (TChan (SomeStanza ()) -> IO ())
-> TChan (SomeStanza ())
-> XmppThreadT m () e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ())
-> (TChan (SomeStanza ()) -> STM ())
-> TChan (SomeStanza ())
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (TChan (SomeStanza ()) -> SomeStanza () -> STM ())
-> SomeStanza () -> TChan (SomeStanza ()) -> STM ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip TChan (SomeStanza ()) -> SomeStanza () -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan SomeStanza ()
a

-- | Runs specified action in parallel
withNewThread
  :: (MonadIO m, MonadUnliftIO m)
  => XmppThreadT m () e
  -> XmppThreadT m (Async ()) e
withNewThread :: XmppThreadT m () e -> XmppThreadT m (Async ()) e
withNewThread XmppThreadT m () e
a = do
  TChan (Either XmppError (SomeStanza e))
newin <- (Thread e -> TChan (Either XmppError (SomeStanza e)))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Thread e -> TChan (Either XmppError (SomeStanza e))
forall e. Thread e -> TChan (Either XmppError (SomeStanza e))
tInCh ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
-> (TChan (Either XmppError (SomeStanza e))
    -> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e))))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (TChan (Either XmppError (SomeStanza e)))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TChan (Either XmppError (SomeStanza e)))
 -> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e))))
-> (TChan (Either XmppError (SomeStanza e))
    -> IO (TChan (Either XmppError (SomeStanza e))))
-> TChan (Either XmppError (SomeStanza e))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (TChan (Either XmppError (SomeStanza e)))
-> IO (TChan (Either XmppError (SomeStanza e)))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (TChan (Either XmppError (SomeStanza e)))
 -> IO (TChan (Either XmppError (SomeStanza e))))
-> (TChan (Either XmppError (SomeStanza e))
    -> STM (TChan (Either XmppError (SomeStanza e))))
-> TChan (Either XmppError (SomeStanza e))
-> IO (TChan (Either XmppError (SomeStanza e)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (Either XmppError (SomeStanza e))
-> STM (TChan (Either XmppError (SomeStanza e)))
forall a. TChan a -> STM (TChan a)
dupTChan
  (Thread e -> TChan (SomeStanza ()))
-> ReaderT (Thread e) m (TChan (SomeStanza ()))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Thread e -> TChan (SomeStanza ())
forall e. Thread e -> TChan (SomeStanza ())
tOutCh ReaderT (Thread e) m (TChan (SomeStanza ()))
-> (TChan (SomeStanza ()) -> XmppThreadT m (Async ()) e)
-> XmppThreadT m (Async ()) e
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m (Async ()) -> XmppThreadT m (Async ()) e
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Async ()) -> XmppThreadT m (Async ()) e)
-> (TChan (SomeStanza ()) -> m (Async ()))
-> TChan (SomeStanza ())
-> XmppThreadT m (Async ()) e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (m () -> m (Async ()))
-> (TChan (SomeStanza ()) -> m ())
-> TChan (SomeStanza ())
-> m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. XmppThreadT m () e -> Thread e -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT XmppThreadT m () e
a (Thread e -> m ())
-> (TChan (SomeStanza ()) -> Thread e)
-> TChan (SomeStanza ())
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ()) -> Thread e
forall e.
TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ()) -> Thread e
Thread TChan (Either XmppError (SomeStanza e))
newin

-- | Turns action into infinite loop
loop :: MonadIO m => XmppThreadT m () e -> XmppThreadT m () e
loop :: XmppThreadT m () e -> XmppThreadT m () e
loop XmppThreadT m () e
a = XmppThreadT m () e
a XmppThreadT m () e -> XmppThreadT m () e -> XmppThreadT m () e
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> XmppThreadT m () e -> XmppThreadT m () e
forall (m :: * -> *) e.
MonadIO m =>
XmppThreadT m () e -> XmppThreadT m () e
loop XmppThreadT m () e
a

waitFor
  :: MonadIO m
  => (Either XmppError (SomeStanza e) -> Bool)
  -> XmppThreadT m (Either XmppError (SomeStanza e)) e
waitFor :: (Either XmppError (SomeStanza e) -> Bool)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
waitFor Either XmppError (SomeStanza e) -> Bool
f = do
  Either XmppError (SomeStanza e)
s <- XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) e.
MonadIO m =>
XmppThreadT m (Either XmppError (SomeStanza e)) e
readChanS
  if Either XmppError (SomeStanza e) -> Bool
f Either XmppError (SomeStanza e)
s then Either XmppError (SomeStanza e)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) a. Monad m => a -> m a
return Either XmppError (SomeStanza e)
s else (Either XmppError (SomeStanza e) -> Bool)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) e.
MonadIO m =>
(Either XmppError (SomeStanza e) -> Bool)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
waitFor Either XmppError (SomeStanza e) -> Bool
f

connPersist :: MonadIO m => Handle -> m ()
connPersist :: Handle -> m ()
connPersist Handle
h = do
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> String -> IO ()
hPutStr Handle
h String
" "
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
debugIO String
"<space added>"
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
30000000
  Handle -> m ()
forall (m :: * -> *). MonadIO m => Handle -> m ()
connPersist Handle
h