{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
module Network.ZRE.Chan (
zreChan
, zreChan'
, zreChanWith
, mapToGroup
, mapToGroup'
) where
import Control.Concurrent.STM (TChan)
import Data.Serialize (Serialize)
import Data.ZRE (Group, KnownGroup, knownToGroup)
import Network.ZRE (ZRE)
import qualified Control.Concurrent.Async.Lifted
import qualified Control.Concurrent.STM
import qualified Control.Monad
import qualified Control.Monad.IO.Class
import qualified Data.Serialize
import qualified Network.ZRE
mapToGroup :: forall fromGroup toGroup from to .
( Serialize from
, Show from
, Serialize to
, KnownGroup fromGroup
, KnownGroup toGroup
)
=> (from -> to)
-> ZRE ()
mapToGroup :: (from -> to) -> ZRE ()
mapToGroup from -> to
fn = Group -> Group -> (from -> to) -> ZRE ()
forall from to.
(Show from, Serialize from, Serialize to) =>
Group -> Group -> (from -> to) -> ZRE ()
mapToGroup'
(KnownGroup fromGroup => Group
forall (n :: Symbol). KnownGroup n => Group
knownToGroup @fromGroup)
(KnownGroup toGroup => Group
forall (n :: Symbol). KnownGroup n => Group
knownToGroup @toGroup)
from -> to
fn
mapToGroup' :: (Show from, Serialize from, Serialize to)
=> Group
-> Group
-> (from -> to)
-> ZRE ()
mapToGroup' :: Group -> Group -> (from -> to) -> ZRE ()
mapToGroup' Group
fromGroup Group
toGroup from -> to
fn = do
Group -> ZRE ()
Network.ZRE.zjoin Group
fromGroup
Group -> ZRE ()
Network.ZRE.zjoin Group
toGroup
Group
-> (ByteString -> Either String from)
-> (Either String from -> ZRE ())
-> ZRE ()
forall decoded.
Group
-> (ByteString -> Either String decoded)
-> (Either String decoded -> ZRE ())
-> ZRE ()
Network.ZRE.zrecvShoutsDecode Group
fromGroup ByteString -> Either String from
forall a. Serialize a => ByteString -> Either String a
Data.Serialize.decode
((Either String from -> ZRE ()) -> ZRE ())
-> (Either String from -> ZRE ()) -> ZRE ()
forall a b. (a -> b) -> a -> b
$ \(Either String from
mdec :: Either String from) -> do
case Either String from
mdec of
Left String
e -> do
String -> ZRE ()
forall a. String -> ZRE a
Network.ZRE.zfail
(String -> ZRE ()) -> String -> ZRE ()
forall a b. (a -> b) -> a -> b
$ String
"Unable to decode message from "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ Group -> String
forall a. Show a => a -> String
show Group
fromGroup String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" error was: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e
Right from
dec -> do
Group -> ByteString -> ZRE ()
Network.ZRE.zshout Group
toGroup (ByteString -> ZRE ()) -> ByteString -> ZRE ()
forall a b. (a -> b) -> a -> b
$ to -> ByteString
forall a. Serialize a => a -> ByteString
Data.Serialize.encode (to -> ByteString) -> to -> ByteString
forall a b. (a -> b) -> a -> b
$ from -> to
fn from
dec
zreChan :: forall input output inputGroup outputGroup .
( Serialize input
, Serialize output
, KnownGroup inputGroup
, KnownGroup outputGroup
)
=> IO ( TChan input
, TChan output)
zreChan :: IO (TChan input, TChan output)
zreChan = Group -> Group -> IO (TChan input, TChan output)
forall input output.
(Serialize input, Serialize output) =>
Group -> Group -> IO (TChan input, TChan output)
zreChan'
(KnownGroup outputGroup => Group
forall (n :: Symbol). KnownGroup n => Group
knownToGroup @outputGroup)
(KnownGroup inputGroup => Group
forall (n :: Symbol). KnownGroup n => Group
knownToGroup @inputGroup)
zreChan' :: (Serialize input, Serialize output)
=> Group
-> Group
-> IO ( TChan input
, TChan output)
zreChan' :: Group -> Group -> IO (TChan input, TChan output)
zreChan' = (ZRE () -> IO ())
-> Group -> Group -> IO (TChan input, TChan output)
forall input output.
(Serialize input, Serialize output) =>
(ZRE () -> IO ())
-> Group -> Group -> IO (TChan input, TChan output)
zreChanWith ZRE () -> IO ()
forall a. ZRE a -> IO ()
Network.ZRE.runZre
zreChanWith :: (Serialize input, Serialize output)
=> (ZRE () -> IO ())
-> Group
-> Group
-> IO ( TChan input
, TChan output)
zreChanWith :: (ZRE () -> IO ())
-> Group -> Group -> IO (TChan input, TChan output)
zreChanWith ZRE () -> IO ()
runner Group
outputGroup Group
inputGroup = do
TChan input
chanInput <- IO (TChan input)
forall a. IO (TChan a)
Control.Concurrent.STM.newTChanIO
TChan output
chanOutput <- IO (TChan output)
forall a. IO (TChan a)
Control.Concurrent.STM.newTChanIO
Async ()
_ <- IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
Control.Concurrent.Async.Lifted.async (IO () -> IO (Async (StM IO ())))
-> IO () -> IO (Async (StM IO ()))
forall a b. (a -> b) -> a -> b
$ ZRE () -> IO ()
runner (ZRE () -> IO ()) -> ZRE () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Group -> ZRE ()
Network.ZRE.zjoin Group
outputGroup
ZRE (Async Any) -> ZRE ()
forall (f :: * -> *) a. Functor f => f a -> f ()
Control.Monad.void
(ZRE (Async Any) -> ZRE ()) -> ZRE (Async Any) -> ZRE ()
forall a b. (a -> b) -> a -> b
$ ZRE Any -> ZRE (Async (StM ZRE Any))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
Control.Concurrent.Async.Lifted.async
(ZRE Any -> ZRE (Async (StM ZRE Any)))
-> ZRE Any -> ZRE (Async (StM ZRE Any))
forall a b. (a -> b) -> a -> b
$ ZRE () -> ZRE Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
Control.Monad.forever
(ZRE () -> ZRE Any) -> ZRE () -> ZRE Any
forall a b. (a -> b) -> a -> b
$ do
input
out <-
IO input -> ZRE input
forall (m :: * -> *) a. MonadIO m => IO a -> m a
Control.Monad.IO.Class.liftIO
(IO input -> ZRE input) -> IO input -> ZRE input
forall a b. (a -> b) -> a -> b
$ STM input -> IO input
forall a. STM a -> IO a
Control.Concurrent.STM.atomically
(STM input -> IO input) -> STM input -> IO input
forall a b. (a -> b) -> a -> b
$ TChan input -> STM input
forall a. TChan a -> STM a
Control.Concurrent.STM.readTChan TChan input
chanInput
Group -> ByteString -> ZRE ()
Network.ZRE.zshout Group
outputGroup
(ByteString -> ZRE ()) -> ByteString -> ZRE ()
forall a b. (a -> b) -> a -> b
$ input -> ByteString
forall a. Serialize a => a -> ByteString
Data.Serialize.encode input
out
Group -> ZRE ()
Network.ZRE.zjoin Group
inputGroup
Group
-> (ByteString -> Either String output)
-> (Either String output -> ZRE ())
-> ZRE ()
forall decoded.
Group
-> (ByteString -> Either String decoded)
-> (Either String decoded -> ZRE ())
-> ZRE ()
Network.ZRE.zrecvShoutsDecode Group
inputGroup ByteString -> Either String output
forall a. Serialize a => ByteString -> Either String a
Data.Serialize.decode
((Either String output -> ZRE ()) -> ZRE ())
-> (Either String output -> ZRE ()) -> ZRE ()
forall a b. (a -> b) -> a -> b
$ (String -> ZRE ())
-> (output -> ZRE ()) -> Either String output -> ZRE ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
(\String
e -> String -> ZRE ()
forall a. String -> ZRE a
Network.ZRE.zfail
(String -> ZRE ()) -> String -> ZRE ()
forall a b. (a -> b) -> a -> b
$ String
"zreChan: Unable to decode message from input "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ Group -> String
forall a. Show a => a -> String
show Group
inputGroup
String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" error was: "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e
)
( IO () -> ZRE ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
Control.Monad.IO.Class.liftIO
(IO () -> ZRE ()) -> (output -> IO ()) -> output -> ZRE ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
Control.Concurrent.STM.atomically
(STM () -> IO ()) -> (output -> STM ()) -> output -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan output -> output -> STM ()
forall a. TChan a -> a -> STM ()
Control.Concurrent.STM.writeTChan TChan output
chanOutput
)
(TChan input, TChan output) -> IO (TChan input, TChan output)
forall (m :: * -> *) a. Monad m => a -> m a
return (TChan input
chanInput, TChan output
chanOutput)