module Control.Eff.Concurrent.Api.Server
(
serve
, spawnServer
, spawnServerWithEffects
, ApiHandler(..)
, castCallback
, callCallback
, terminateCallback
, apiHandler
, apiHandlerForever
, castHandler
, castHandlerForever
, callHandler
, callHandlerForever
, castAndCallHandler
, castAndCallHandlerForever
, ApiServerCmd(..)
, unhandledCallError
, unhandledCastError
, defaultTermination
, Servable(..)
, ServerCallback(..)
, requestHandlerSelector
, terminationHandler
)
where
import Control.Eff
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Internal
import Control.Eff.Concurrent.Process
import Control.Eff.Exception
import Control.Eff.Log
import Control.Lens
import Data.Proxy
import Data.Typeable ( Typeable
, typeRep
)
import Data.Dynamic
import Control.Applicative
import Data.Kind
import GHC.Stack
import Data.Maybe
import GHC.Generics
import Control.DeepSeq
import Data.Default
data ApiHandler api eff where
ApiHandler ::
{
_castCallback
:: Maybe (Api api 'Asynchronous -> Eff eff ApiServerCmd)
, _callCallback
:: forall reply . Maybe (Api api ('Synchronous reply) -> (reply -> Eff eff ()) -> Eff eff ApiServerCmd)
, _terminateCallback
:: Maybe (ExitReason 'Recoverable -> Eff eff ())
} -> ApiHandler api eff
instance Default (ApiHandler api eff) where
def = ApiHandler { _castCallback = def
, _callCallback = def
, _terminateCallback = def
}
apiHandler
:: (Api api 'Asynchronous -> Eff e ApiServerCmd)
-> ( forall r
. Api api ( 'Synchronous r)
-> (r -> Eff e ())
-> Eff e ApiServerCmd
)
-> (ExitReason 'Recoverable -> Eff e ())
-> ApiHandler api e
apiHandler c d e = ApiHandler
{ _castCallback = Just c
, _callCallback = Just d
, _terminateCallback = Just e
}
apiHandlerForever
:: (Api api 'Asynchronous -> Eff e ())
-> (forall r . Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ())
-> (ExitReason 'Recoverable -> Eff e ())
-> ApiHandler api e
apiHandlerForever c d = apiHandler
(\someCast -> c someCast >> return HandleNextRequest)
(\someCall k -> d someCall k >> return HandleNextRequest)
castHandler
:: (Api api 'Asynchronous -> Eff eff ApiServerCmd) -> ApiHandler api eff
castHandler c = def { _castCallback = Just c }
castHandlerForever
:: (Api api 'Asynchronous -> Eff eff ()) -> ApiHandler api eff
castHandlerForever c =
castHandler (\someCast -> c someCast >> return HandleNextRequest)
callHandler
:: ( forall r
. Api api ( 'Synchronous r)
-> (r -> Eff e ())
-> Eff e ApiServerCmd
)
-> ApiHandler api e
callHandler c = def { _callCallback = Just c }
callHandlerForever
:: (forall r . Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ())
-> ApiHandler api e
callHandlerForever d =
callHandler (\someCall k -> d someCall k >> return HandleNextRequest)
castAndCallHandler
:: (Api api 'Asynchronous -> Eff e ApiServerCmd)
-> ( forall r
. Api api ( 'Synchronous r)
-> (r -> Eff e ())
-> Eff e ApiServerCmd
)
-> ApiHandler api e
castAndCallHandler c d = def { _castCallback = Just c, _callCallback = Just d }
castAndCallHandlerForever
:: (Api api 'Asynchronous -> Eff e ())
-> (forall r . Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ())
-> ApiHandler api e
castAndCallHandlerForever c d = castAndCallHandler
(\someCast -> c someCast >> return HandleNextRequest)
(\someCall k -> d someCall k >> return HandleNextRequest)
data ApiServerCmd where
HandleNextRequest :: ApiServerCmd
StopApiServer :: ExitReason 'Recoverable -> ApiServerCmd
deriving (Show, Typeable, Generic)
instance NFData ApiServerCmd
makeLenses ''ApiHandler
data ServerCallback eff =
ServerCallback { _requestHandlerSelector :: MessageSelector (Eff eff ApiServerCmd)
, _terminationHandler :: ExitReason 'Recoverable -> Eff eff ()
}
makeLenses ''ServerCallback
instance Semigroup (ServerCallback eff) where
l <> r = l & requestHandlerSelector .~
selectDynamicMessageLazy (\x ->
runMessageSelector (view requestHandlerSelector l) x <|>
runMessageSelector (view requestHandlerSelector r) x)
& terminationHandler .~
(\reason ->
do (l^.terminationHandler) reason
(r^.terminationHandler) reason)
instance Monoid (ServerCallback eff) where
mappend = (<>)
mempty = ServerCallback
{ _requestHandlerSelector = selectDynamicMessageLazy (const Nothing)
, _terminationHandler = const (return ())
}
class Servable a where
type ServerEff a :: [Type -> Type]
type ServerPids a
toServerPids :: proxy a -> ProcessId -> ServerPids a
toServerCallback
:: (Member Interrupts (ServerEff a), SetMember Process (Process effScheduler) (ServerEff a))
=> SchedulerProxy effScheduler -> a -> ServerCallback (ServerEff a)
instance Servable (ServerCallback eff) where
type ServerEff (ServerCallback eff) = eff
type ServerPids (ServerCallback eff) = ProcessId
toServerCallback = const id
toServerPids = const id
instance Typeable a => Servable (ApiHandler a eff) where
type ServerEff (ApiHandler a eff) = eff
type ServerPids (ApiHandler a eff) = Server a
toServerCallback = apiHandlerServerCallback
toServerPids _ = asServer
instance (ServerEff a ~ ServerEff b, Servable a, Servable b) => Servable (a, b) where
type ServerPids (a, b) = (ServerPids a, ServerPids b)
type ServerEff (a, b) = ServerEff a
toServerCallback px (a, b) = toServerCallback px a <> toServerCallback px b
toServerPids _ pid =
( toServerPids (Proxy :: Proxy a) pid
, toServerPids (Proxy :: Proxy b) pid
)
serve
:: forall a effScheduler
. ( Servable a
, SetMember Process (Process effScheduler) (ServerEff a)
, Member Interrupts (ServerEff a)
, HasCallStack
)
=> SchedulerProxy effScheduler
-> a
-> Eff (ServerEff a) ()
serve px a =
let serverCb = toServerCallback px a
stopServer reason = do
(serverCb ^. terminationHandler) reason
return (Just ())
in receiveSelectedLoop px (serverCb ^. requestHandlerSelector) $ \case
Left reason -> stopServer reason
Right handleIt -> handleIt >>= \case
HandleNextRequest -> return Nothing
StopApiServer reason -> stopServer reason
spawnServer
:: forall a effScheduler eff
. ( Servable a
, ServerEff a ~ (InterruptableProcess effScheduler)
, SetMember Process (Process effScheduler) eff
, Member Interrupts eff
, HasCallStack
)
=> SchedulerProxy effScheduler
-> a
-> Eff eff (ServerPids a)
spawnServer px a = spawnServerWithEffects px a id
spawnServerWithEffects
:: forall a effScheduler eff
. ( Servable a
, SetMember Process (Process effScheduler) (ServerEff a)
, SetMember Process (Process effScheduler) eff
, Member Interrupts eff
, Member Interrupts (ServerEff a)
, HasCallStack
)
=> SchedulerProxy effScheduler
-> a
-> ( Eff (ServerEff a) ()
-> Eff (InterruptableProcess effScheduler) ()
)
-> Eff eff (ServerPids a)
spawnServerWithEffects px a handleEff = do
pid <- spawn (handleEff (serve px a))
return (toServerPids (Proxy @a) pid)
apiHandlerServerCallback
:: forall eff effScheduler api
. ( HasCallStack
, Typeable api
, SetMember Process (Process effScheduler) eff
, Member Interrupts eff
)
=> SchedulerProxy effScheduler
-> ApiHandler api eff
-> ServerCallback eff
apiHandlerServerCallback px handlers = mempty
{ _requestHandlerSelector = selectHandlerMethod px handlers
, _terminationHandler = fromMaybe (const (return ()))
(_terminateCallback handlers)
}
selectHandlerMethod
:: forall eff effScheduler api
. ( HasCallStack
, Typeable api
, SetMember Process (Process effScheduler) eff
, Member Interrupts eff
)
=> SchedulerProxy effScheduler
-> ApiHandler api eff
-> MessageSelector (Eff eff ApiServerCmd)
selectHandlerMethod px handlers =
selectDynamicMessageLazy (fmap (applyHandlerMethod px handlers) . fromDynamic)
applyHandlerMethod
:: forall eff effScheduler api
. ( Typeable api
, SetMember Process (Process effScheduler) eff
, Member Interrupts eff
, HasCallStack
)
=> SchedulerProxy effScheduler
-> ApiHandler api eff
-> Request api
-> Eff eff ApiServerCmd
applyHandlerMethod px handlers (Cast request) =
fromMaybe (unhandledCastError px) (_castCallback handlers) request
applyHandlerMethod px handlers (Call callRef fromPid request) = fromMaybe
(unhandledCallError px)
(_callCallback handlers)
request
sendReply
where
sendReply :: Typeable reply => reply -> Eff eff ()
sendReply reply =
sendMessage px fromPid (Response (Proxy @api) callRef $! reply)
unhandledCallError
:: forall p x r q
. ( Typeable p
, HasCallStack
, SetMember Process (Process q) r
, Member Interrupts r
)
=> SchedulerProxy q
-> Api p ( 'Synchronous x)
-> (x -> Eff r ())
-> Eff r ApiServerCmd
unhandledCallError _px _api _ = throwError
(ProcessError ("unhandled call on api: " ++ show (typeRep (Proxy @p))))
unhandledCastError
:: forall p r q
. ( Typeable p
, HasCallStack
, SetMember Process (Process q) r
, Member Interrupts r
)
=> SchedulerProxy q
-> Api p 'Asynchronous
-> Eff r ApiServerCmd
unhandledCastError _px _api = throwError
(ProcessError ("unhandled cast on api: " ++ show (typeRep (Proxy @p))))
defaultTermination
:: forall q r
. ( HasCallStack
, SetMember Process (Process q) r
, Member (Logs LogMessage) r
)
=> SchedulerProxy q
-> ExitReason 'Recoverable
-> Eff r ()
defaultTermination _px r = logNotice ("server process terminating " ++ show r)