{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications    #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
module Network.ZRE.Chan (
    zreChan
  , zreChan'
  , zreChanWith
  , mapToGroup
  , mapToGroup'
  ) where

import Control.Concurrent.STM (TChan)
import Data.Serialize (Serialize)
import Data.ZRE (Group, KnownGroup, knownToGroup)
import Network.ZRE (ZRE)

import qualified Control.Concurrent.Async.Lifted
import qualified Control.Concurrent.STM
import qualified Control.Monad
import qualified Control.Monad.IO.Class
import qualified Data.Serialize

import qualified Network.ZRE

-- | Map function to deserialized data type received from one group
-- and send it encoded to another group. Basically a typed proxy between
-- two groups.
mapToGroup :: forall fromGroup toGroup from to .
            ( Serialize from
            , Show from
            , Serialize to
            , KnownGroup fromGroup
            , KnownGroup toGroup
            )
          => (from -> to)  -- ^ Conversion function
          -> ZRE ()
mapToGroup :: (from -> to) -> ZRE ()
mapToGroup from -> to
fn = Group -> Group -> (from -> to) -> ZRE ()
forall from to.
(Show from, Serialize from, Serialize to) =>
Group -> Group -> (from -> to) -> ZRE ()
mapToGroup'
  (KnownGroup fromGroup => Group
forall (n :: Symbol). KnownGroup n => Group
knownToGroup @fromGroup)
  (KnownGroup toGroup => Group
forall (n :: Symbol). KnownGroup n => Group
knownToGroup @toGroup)
  from -> to
fn

-- | Like `mapToGroup` but with non-symbolic groups
mapToGroup' :: (Show from, Serialize from, Serialize to)
           => Group         -- ^ Group to listen to and decode its messages
           -> Group         -- ^ Group to send encoded messages to
           -> (from -> to)  -- ^ Conversion function
           -> ZRE ()
mapToGroup' :: Group -> Group -> (from -> to) -> ZRE ()
mapToGroup' Group
fromGroup Group
toGroup from -> to
fn = do
  Group -> ZRE ()
Network.ZRE.zjoin Group
fromGroup
  Group -> ZRE ()
Network.ZRE.zjoin Group
toGroup

  Group
-> (ByteString -> Either String from)
-> (Either String from -> ZRE ())
-> ZRE ()
forall decoded.
Group
-> (ByteString -> Either String decoded)
-> (Either String decoded -> ZRE ())
-> ZRE ()
Network.ZRE.zrecvShoutsDecode Group
fromGroup ByteString -> Either String from
forall a. Serialize a => ByteString -> Either String a
Data.Serialize.decode
    ((Either String from -> ZRE ()) -> ZRE ())
-> (Either String from -> ZRE ()) -> ZRE ()
forall a b. (a -> b) -> a -> b
$ \(Either String from
mdec :: Either String from) -> do
      case Either String from
mdec of
        Left String
e -> do
          String -> ZRE ()
forall a. String -> ZRE a
Network.ZRE.zfail
            (String -> ZRE ()) -> String -> ZRE ()
forall a b. (a -> b) -> a -> b
$ String
"Unable to decode message from "
            String -> String -> String
forall a. [a] -> [a] -> [a]
++ Group -> String
forall a. Show a => a -> String
show Group
fromGroup String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" error was: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e
        Right from
dec -> do
          Group -> ByteString -> ZRE ()
Network.ZRE.zshout Group
toGroup (ByteString -> ZRE ()) -> ByteString -> ZRE ()
forall a b. (a -> b) -> a -> b
$ to -> ByteString
forall a. Serialize a => a -> ByteString
Data.Serialize.encode (to -> ByteString) -> to -> ByteString
forall a b. (a -> b) -> a -> b
$ from -> to
fn from
dec

-- | Typed ZRE channel using two groups
--
-- * @input -> outputGroup@ for transfering encoded data
-- * @inputGroup -> output@ for receiving decoded data
--
-- Unexpected data on channel will result in error.
zreChan :: forall input output inputGroup outputGroup .
        ( Serialize input
        , Serialize output
        , KnownGroup inputGroup
        , KnownGroup outputGroup
        )
        => IO ( TChan input
              , TChan output)
zreChan :: IO (TChan input, TChan output)
zreChan = Group -> Group -> IO (TChan input, TChan output)
forall input output.
(Serialize input, Serialize output) =>
Group -> Group -> IO (TChan input, TChan output)
zreChan'
  (KnownGroup outputGroup => Group
forall (n :: Symbol). KnownGroup n => Group
knownToGroup @outputGroup)
  (KnownGroup inputGroup => Group
forall (n :: Symbol). KnownGroup n => Group
knownToGroup @inputGroup)

-- | Like `zreChan` but with non-symbolic groups
zreChan' :: (Serialize input, Serialize output)
         => Group
         -> Group
         -> IO ( TChan input
               , TChan output)
zreChan' :: Group -> Group -> IO (TChan input, TChan output)
zreChan' = (ZRE () -> IO ())
-> Group -> Group -> IO (TChan input, TChan output)
forall input output.
(Serialize input, Serialize output) =>
(ZRE () -> IO ())
-> Group -> Group -> IO (TChan input, TChan output)
zreChanWith ZRE () -> IO ()
forall a. ZRE a -> IO ()
Network.ZRE.runZre

-- | Principled version accepting runner function
zreChanWith :: (Serialize input, Serialize output)
            => (ZRE () -> IO ())
            -> Group
            -> Group
            -> IO ( TChan input
                  , TChan output)
zreChanWith :: (ZRE () -> IO ())
-> Group -> Group -> IO (TChan input, TChan output)
zreChanWith ZRE () -> IO ()
runner Group
outputGroup Group
inputGroup = do
  TChan input
chanInput  <- IO (TChan input)
forall a. IO (TChan a)
Control.Concurrent.STM.newTChanIO
  TChan output
chanOutput <- IO (TChan output)
forall a. IO (TChan a)
Control.Concurrent.STM.newTChanIO

  Async ()
_ <- IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
Control.Concurrent.Async.Lifted.async (IO () -> IO (Async (StM IO ())))
-> IO () -> IO (Async (StM IO ()))
forall a b. (a -> b) -> a -> b
$ ZRE () -> IO ()
runner (ZRE () -> IO ()) -> ZRE () -> IO ()
forall a b. (a -> b) -> a -> b
$ do

    -- joining the outputGroup is not strictly needed for
    -- shouts to pass thru, for indication only
    Group -> ZRE ()
Network.ZRE.zjoin Group
outputGroup

    -- shout input to outputGroup
    ZRE (Async Any) -> ZRE ()
forall (f :: * -> *) a. Functor f => f a -> f ()
Control.Monad.void
      (ZRE (Async Any) -> ZRE ()) -> ZRE (Async Any) -> ZRE ()
forall a b. (a -> b) -> a -> b
$ ZRE Any -> ZRE (Async (StM ZRE Any))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
Control.Concurrent.Async.Lifted.async
      (ZRE Any -> ZRE (Async (StM ZRE Any)))
-> ZRE Any -> ZRE (Async (StM ZRE Any))
forall a b. (a -> b) -> a -> b
$ ZRE () -> ZRE Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
Control.Monad.forever
      (ZRE () -> ZRE Any) -> ZRE () -> ZRE Any
forall a b. (a -> b) -> a -> b
$ do
        input
out <-
            IO input -> ZRE input
forall (m :: * -> *) a. MonadIO m => IO a -> m a
Control.Monad.IO.Class.liftIO
          (IO input -> ZRE input) -> IO input -> ZRE input
forall a b. (a -> b) -> a -> b
$ STM input -> IO input
forall a. STM a -> IO a
Control.Concurrent.STM.atomically
          (STM input -> IO input) -> STM input -> IO input
forall a b. (a -> b) -> a -> b
$ TChan input -> STM input
forall a. TChan a -> STM a
Control.Concurrent.STM.readTChan TChan input
chanInput

        Group -> ByteString -> ZRE ()
Network.ZRE.zshout Group
outputGroup
          (ByteString -> ZRE ()) -> ByteString -> ZRE ()
forall a b. (a -> b) -> a -> b
$ input -> ByteString
forall a. Serialize a => a -> ByteString
Data.Serialize.encode input
out

    -- receive on inputGroup and forward to output
    Group -> ZRE ()
Network.ZRE.zjoin Group
inputGroup
    Group
-> (ByteString -> Either String output)
-> (Either String output -> ZRE ())
-> ZRE ()
forall decoded.
Group
-> (ByteString -> Either String decoded)
-> (Either String decoded -> ZRE ())
-> ZRE ()
Network.ZRE.zrecvShoutsDecode Group
inputGroup ByteString -> Either String output
forall a. Serialize a => ByteString -> Either String a
Data.Serialize.decode
      ((Either String output -> ZRE ()) -> ZRE ())
-> (Either String output -> ZRE ()) -> ZRE ()
forall a b. (a -> b) -> a -> b
$ (String -> ZRE ())
-> (output -> ZRE ()) -> Either String output -> ZRE ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
        (\String
e -> String -> ZRE ()
forall a. String -> ZRE a
Network.ZRE.zfail
          (String -> ZRE ()) -> String -> ZRE ()
forall a b. (a -> b) -> a -> b
$ String
"zreChan: Unable to decode message from input "
          String -> String -> String
forall a. [a] -> [a] -> [a]
++ Group -> String
forall a. Show a => a -> String
show Group
inputGroup
          String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" error was: "
          String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e
        )
        ( IO () -> ZRE ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
Control.Monad.IO.Class.liftIO
        (IO () -> ZRE ()) -> (output -> IO ()) -> output -> ZRE ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
Control.Concurrent.STM.atomically
        (STM () -> IO ()) -> (output -> STM ()) -> output -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan output -> output -> STM ()
forall a. TChan a -> a -> STM ()
Control.Concurrent.STM.writeTChan TChan output
chanOutput
        )

  (TChan input, TChan output) -> IO (TChan input, TChan output)
forall (m :: * -> *) a. Monad m => a -> m a
return (TChan input
chanInput, TChan output
chanOutput)