{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Data.Morpheus.Types.Internal.Subscription
( connect,
disconnect,
connectionThread,
toOutStream,
runStreamWS,
runStreamHTTP,
Stream,
Scope (..),
Input (..),
WS,
HTTP,
acceptApolloRequest,
publish,
Store (..),
initDefaultStore,
publishEventWith,
GQLChannel (..),
ClientConnectionStore,
empty,
toList,
connectionSessionIds,
SessionID,
)
where
import Control.Concurrent
( modifyMVar_,
newMVar,
readMVar,
)
import Control.Exception (finally)
import Control.Monad (forever)
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.IO.Unlift
( MonadUnliftIO,
withRunInIO,
)
import Data.Morpheus.Internal.Utils
( empty,
)
import Data.Morpheus.Types.Internal.Resolving
( GQLChannel (..),
)
import Data.Morpheus.Types.Internal.Subscription.Apollo
( acceptApolloRequest,
)
import Data.Morpheus.Types.Internal.Subscription.ClientConnectionStore
( ClientConnectionStore,
SessionID,
connectionSessionIds,
delete,
publish,
toList,
)
import Data.Morpheus.Types.Internal.Subscription.Stream
( HTTP,
Input (..),
Scope (..),
Stream,
WS,
runStreamHTTP,
runStreamWS,
toOutStream,
)
import Data.UUID.V4 (nextRandom)
connect :: MonadIO m => m (Input WS)
connect = Init <$> liftIO nextRandom
disconnect :: Scope WS e m -> Input WS -> m ()
disconnect ScopeWS {update} (Init clientID) = update (delete clientID)
data Store e m = Store
{ readStore :: m (ClientConnectionStore e m),
writeStore :: (ClientConnectionStore e m -> ClientConnectionStore e m) -> m ()
}
publishEventWith ::
( MonadIO m,
(Eq (StreamChannel event)),
(GQLChannel event)
) =>
Store event m ->
event ->
m ()
publishEventWith store event = readStore store >>= publish event
initDefaultStore ::
( MonadIO m,
MonadIO m2,
(Eq (StreamChannel event)),
(GQLChannel event)
) =>
m2 (Store event m)
initDefaultStore = do
store <- liftIO $ newMVar empty
pure
Store
{ readStore = liftIO $ readMVar store,
writeStore = \changes -> liftIO $ modifyMVar_ store (return . changes)
}
finallyM :: MonadUnliftIO m => m () -> m () -> m ()
finallyM loop end = withRunInIO $ \runIO -> finally (runIO loop) (runIO end)
connectionThread ::
( MonadUnliftIO m
) =>
(Input WS -> Stream WS e m) ->
Scope WS e m ->
m ()
connectionThread api scope = do
input <- connect
finallyM
(connectionLoop api scope input)
(disconnect scope input)
connectionLoop ::
Monad m =>
(Input WS -> Stream WS e m) ->
Scope WS e m ->
Input WS ->
m ()
connectionLoop api scope input =
forever
$ runStreamWS scope
$ api input