module Control.Eff.Concurrent.Protocol.Client
(
cast
, call
, callWithTimeout
, castSingleton
, castEndpointReader
, callSingleton
, callEndpointReader
, HasEndpointReader
, EndpointReader
, askEndpoint
, runEndpointReader
)
where
import Control.Eff
import Control.Eff.Reader.Strict
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Wrapper
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Process.Timer
import Control.Eff.Log
import Data.Typeable ( Typeable )
import Data.Text (pack)
import GHC.Stack
cast
:: forall destination protocol r q
. ( HasCallStack
, HasProcesses r q
, HasPdu destination
, HasPdu protocol
, Tangible (Pdu destination 'Asynchronous)
, Embeds destination protocol
)
=> Endpoint destination
-> Pdu protocol 'Asynchronous
-> Eff r ()
cast (Endpoint pid) castMsg = sendMessage pid (Cast (embedPdu @destination castMsg))
call
:: forall result destination protocol r q
. ( HasProcesses r q
, TangiblePdu destination ( 'Synchronous result)
, TangiblePdu protocol ( 'Synchronous result)
, Tangible result
, Embeds destination protocol
, HasCallStack
)
=> Endpoint destination
-> Pdu protocol ( 'Synchronous result)
-> Eff r result
call (Endpoint pidInternal) req = do
callRef <- makeReference
fromPid <- self
let requestMessage = Call origin $! (embedPdu @destination req)
origin = RequestOrigin @destination @result fromPid callRef
sendMessage pidInternal requestMessage
let selectResult :: MessageSelector result
selectResult =
let extractResult
:: Reply destination result -> Maybe result
extractResult (Reply origin' result) =
if origin == origin' then Just result else Nothing
in selectMessageWith extractResult
resultOrError <- receiveWithMonitor pidInternal selectResult
either (interrupt . becauseProcessIsDown) return resultOrError
callWithTimeout
:: forall result destination protocol r q
. ( HasProcesses r q
, TangiblePdu destination ( 'Synchronous result)
, TangiblePdu protocol ( 'Synchronous result)
, Tangible result
, Member Logs r
, HasCallStack
, Embeds destination protocol
)
=> Endpoint destination
-> Pdu protocol ( 'Synchronous result)
-> Timeout
-> Eff r result
callWithTimeout serverP@(Endpoint pidInternal) req timeOut = do
fromPid <- self
callRef <- makeReference
let requestMessage = Call origin $! embedPdu @destination req
origin = RequestOrigin @destination @result fromPid callRef
sendMessage pidInternal requestMessage
let selectResult =
let extractResult
:: Reply destination result -> Maybe result
extractResult (Reply origin' result) =
if origin == origin' then Just result else Nothing
in selectMessageWith extractResult
let timerTitle = MkProcessTitle ( "call-timer-" <> pack (show serverP)
<> "-" <> pack (show origin)
<> "-" <> pack (show timeOut))
resultOrError <- receiveSelectedWithMonitorAfterWithTitle pidInternal selectResult timeOut timerTitle
let onTimeout timerRef = do
let msg = "call timed out after "
++ show timeOut ++ " to server: "
++ show serverP ++ " from "
++ show fromPid ++ " "
++ show timerRef
logWarning' msg
interrupt (TimeoutInterrupt msg)
onProcDown p = do
logWarning' ("call to dead server: "++ show serverP ++ " from " ++ show fromPid)
interrupt (becauseProcessIsDown p)
either (either onProcDown onTimeout) return resultOrError
type HasEndpointReader o r =
( Typeable o
, Member (EndpointReader o) r
)
type EndpointReader o = Reader (Endpoint o)
runEndpointReader
:: HasCallStack => Endpoint o -> Eff (EndpointReader o ': r) a -> Eff r a
runEndpointReader = runReader
askEndpoint :: Member (EndpointReader o) e => Eff e (Endpoint o)
askEndpoint = ask
callEndpointReader
:: forall reply o r q .
( HasEndpointReader o r
, HasCallStack
, Tangible reply
, TangiblePdu o ( 'Synchronous reply)
, HasProcesses r q
, Embeds o o
)
=> Pdu o ( 'Synchronous reply)
-> Eff r reply
callEndpointReader method = do
serverPid <- askEndpoint @o
call @reply @o @o serverPid method
castEndpointReader
:: forall o r q .
( HasEndpointReader o r
, HasProcesses r q
, Tangible (Pdu o 'Asynchronous)
, HasCallStack
, HasPdu o
, Embeds o o
)
=> Pdu o 'Asynchronous
-> Eff r ()
castEndpointReader method = do
serverPid <- askEndpoint @o
cast @o @o serverPid method
callSingleton
:: forall outer inner reply q e
. ( HasCallStack
, Member (EndpointReader outer) e
, Embeds outer inner
, Embeds outer outer
, HasProcesses e q
, TangiblePdu outer ('Synchronous reply)
, TangiblePdu inner ('Synchronous reply)
, Tangible reply
)
=> Pdu inner ('Synchronous reply)
-> Eff e reply
callSingleton = withFrozenCallStack $ \p -> callEndpointReader (embedPdu @outer @inner p)
castSingleton
:: forall outer inner q e
. ( HasCallStack
, Member (EndpointReader outer) e
, Tangible (Pdu outer 'Asynchronous)
, HasProcesses e q
, HasPdu outer
, HasPdu inner
, Embeds outer inner
, Embeds outer outer
)
=> Pdu inner 'Asynchronous
-> Eff e ()
castSingleton = withFrozenCallStack $ \p -> castEndpointReader (embedPdu @outer @inner p)