module Control.Eff.Concurrent.Protocol.Client
(
cast
, call
, callWithTimeout
, castSingleton
, castEndpointReader
, callSingleton
, callEndpointReader
, ServesProtocol
, EndpointReader
, askEndpoint
, runEndpointReader
)
where
import Control.Eff
import Control.Eff.Reader.Strict
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Request
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Process.Timer
import Control.Eff.Log
import Data.Typeable ( Typeable )
import GHC.Stack
cast
:: forall o' o r q
. ( HasCallStack
, SetMember Process (Process q) r
, Member Interrupts r
, IsPdu o' 'Asynchronous
, IsPdu o 'Asynchronous
, EmbedProtocol o' o
)
=> Endpoint o'
-> Pdu o 'Asynchronous
-> Eff r ()
cast (Endpoint pid) castMsg = sendMessage pid (Cast (embedPdu @o' castMsg))
call
:: forall result protocol' protocol r q
. ( SetMember Process (Process q) r
, Member Interrupts r
, TangiblePdu protocol' ( 'Synchronous result)
, TangiblePdu protocol ( 'Synchronous result)
, EmbedProtocol protocol' protocol
, Tangible result
, HasCallStack
)
=> Endpoint protocol'
-> Pdu protocol ( 'Synchronous result)
-> Eff r result
call (Endpoint pidInternal) req = do
callRef <- makeReference
fromPid <- self
let requestMessage = Call origin $! (embedPdu @protocol' req)
origin = RequestOrigin @protocol' @result fromPid callRef
sendMessage pidInternal requestMessage
let selectResult :: MessageSelector result
selectResult =
let extractResult
:: Reply protocol' result -> Maybe result
extractResult (Reply origin' result) =
if origin == origin' then Just result else Nothing
in selectMessageWith extractResult
resultOrError <- receiveWithMonitor pidInternal selectResult
either (interrupt . becauseProcessIsDown) return resultOrError
callWithTimeout
:: forall result protocol' protocol r q
. ( SetMember Process (Process q) r
, Member Interrupts r
, TangiblePdu protocol' ( 'Synchronous result)
, TangiblePdu protocol ( 'Synchronous result)
, EmbedProtocol protocol' protocol
, Tangible result
, Member Logs r
, Lifted IO q
, Lifted IO r
, HasCallStack
)
=> Endpoint protocol'
-> Pdu protocol ( 'Synchronous result)
-> Timeout
-> Eff r result
callWithTimeout serverP@(Endpoint pidInternal) req timeOut = do
fromPid <- self
callRef <- makeReference
let requestMessage = Call origin $! embedPdu @protocol' req
origin = RequestOrigin @protocol' @result fromPid callRef
sendMessage pidInternal requestMessage
let selectResult =
let extractResult
:: Reply protocol' result -> Maybe result
extractResult (Reply origin' result) =
if origin == origin' then Just result else Nothing
in selectMessageWith extractResult
resultOrError <- receiveSelectedWithMonitorAfter pidInternal selectResult timeOut
let onTimeout timerRef = do
let msg = "call timed out after "
++ show timeOut ++ " to server: "
++ show serverP ++ " from "
++ show fromPid ++ " "
++ show timerRef
logWarning' msg
interrupt (TimeoutInterrupt msg)
onProcDown p = do
logWarning' ("call to dead server: "++ show serverP ++ " from " ++ show fromPid)
interrupt (becauseProcessIsDown p)
either (either onProcDown onTimeout) return resultOrError
type ServesProtocol o r q =
( Typeable o
, SetMember Process (Process q) r
, Member (EndpointReader o) r
)
type EndpointReader o = Reader (Endpoint o)
runEndpointReader
:: HasCallStack => Endpoint o -> Eff (EndpointReader o ': r) a -> Eff r a
runEndpointReader = runReader
askEndpoint :: Member (EndpointReader o) e => Eff e (Endpoint o)
askEndpoint = ask
callEndpointReader
:: forall reply o r q .
( ServesProtocol o r q
, HasCallStack
, Tangible reply
, TangiblePdu o ( 'Synchronous reply)
, Member Interrupts r
)
=> Pdu o ( 'Synchronous reply)
-> Eff r reply
callEndpointReader method = do
serverPid <- askEndpoint @o
call @reply @o @o serverPid method
castEndpointReader
:: forall o r q .
( ServesProtocol o r q
, HasCallStack
, Member Interrupts r
, IsPdu o 'Asynchronous
)
=> Pdu o 'Asynchronous
-> Eff r ()
castEndpointReader method = do
serverPid <- askEndpoint @o
cast @o @o serverPid method
callSingleton
:: forall outer inner reply q e
. ( HasCallStack
, EmbedProtocol outer inner
, Member (EndpointReader outer) e
, SetMember Process (Process q) e
, Member Interrupts e
, TangiblePdu outer ('Synchronous reply)
, TangiblePdu inner ('Synchronous reply)
, Tangible reply
)
=> Pdu inner ('Synchronous reply)
-> Eff e reply
callSingleton = withFrozenCallStack $ \p -> callEndpointReader (embedPdu @outer @inner p)
castSingleton
:: forall outer inner q e
. ( HasCallStack
, EmbedProtocol outer inner
, Member (EndpointReader outer) e
, SetMember Process (Process q) e
, Member Interrupts e
, IsPdu outer 'Asynchronous
, IsPdu inner 'Asynchronous
)
=> Pdu inner 'Asynchronous
-> Eff e ()
castSingleton = withFrozenCallStack $ \p -> castEndpointReader (embedPdu @outer @inner p)