{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE NoImplicitPrelude #-}

module Data.Morpheus.Subscriptions.Stream
  ( toOutStream,
    runStreamWS,
    runStreamHTTP,
    ApiContext (..),
    Input (..),
    Output,
    API (..),
    PUB,
    SUB,
  )
where

import Control.Monad.Except (throwError)
import Data.ByteString.Lazy.Char8 (ByteString)
import Data.Morpheus.App.Internal.Resolving
  ( Channel,
    ResponseEvent (..),
    ResponseStream,
    Result (..),
    ResultT (..),
    runResultT,
  )
import Data.Morpheus.Subscriptions.Apollo
  ( ApolloAction (..),
    apolloFormat,
    toApolloResponse,
  )
import Data.Morpheus.Subscriptions.ClientConnectionStore
  ( ClientConnectionStore,
    SessionID (..),
    Updates (..),
    endSession,
    insertConnection,
    startSession,
  )
import Data.Morpheus.Subscriptions.Event (Event (..))
import Data.Morpheus.Types.IO
  ( GQLRequest (..),
    GQLResponse (..),
  )
import Data.Morpheus.Types.Internal.AST
  ( GQLError,
    VALID,
    Value (..),
  )
import Data.UUID (UUID)
import Relude hiding (ByteString)

data API = PUB | SUB

type SUB = 'SUB

type PUB = 'PUB

data
  Input
    (api :: API)
  where
  InitConnection :: UUID -> Input SUB
  Request :: GQLRequest -> Input PUB

run :: ApiContext SUB e m -> Updates e m -> m ()
run :: forall e (m :: * -> *). ApiContext SUB e m -> Updates e m -> m ()
run SubContext {(ClientConnectionStore e m -> ClientConnectionStore e m) -> m ()
updateStore :: forall (m :: * -> *) event.
ApiContext SUB event m
-> (ClientConnectionStore event m -> ClientConnectionStore event m)
-> m ()
updateStore :: (ClientConnectionStore e m -> ClientConnectionStore e m) -> m ()
updateStore} (Updates ClientConnectionStore e m -> ClientConnectionStore e m
changes) = (ClientConnectionStore e m -> ClientConnectionStore e m) -> m ()
updateStore ClientConnectionStore e m -> ClientConnectionStore e m
changes

data ApiContext (api :: API) event (m :: Type -> Type) where
  PubContext ::
    { forall event (m :: * -> *). ApiContext PUB event m -> event -> m ()
eventPublisher :: event -> m ()
    } ->
    ApiContext PUB event m
  SubContext ::
    { forall (m :: * -> *) event. ApiContext SUB event m -> m ByteString
listener :: m ByteString,
      forall (m :: * -> *) event.
ApiContext SUB event m -> ByteString -> m ()
callback :: ByteString -> m (),
      forall (m :: * -> *) event.
ApiContext SUB event m
-> (ClientConnectionStore event m -> ClientConnectionStore event m)
-> m ()
updateStore :: (ClientConnectionStore event m -> ClientConnectionStore event m) -> m ()
    } ->
    ApiContext SUB event m

data
  Output
    (api :: API)
    (e :: Type)
    (m :: Type -> Type)
  where
  SubOutput ::
    { forall e (m :: * -> *).
Output SUB e m
-> ApiContext SUB e m -> m (Either ByteString [Updates e m])
streamWS :: ApiContext SUB e m -> m (Either ByteString [Updates e m])
    } ->
    Output SUB e m
  PubOutput ::
    { forall e (m :: * -> *).
Output PUB e m -> ApiContext PUB e m -> m GQLResponse
streamHTTP :: ApiContext PUB e m -> m GQLResponse
    } ->
    Output PUB e m

handleResponseStream ::
  ( Monad m,
    Eq (Channel e),
    Hashable (Channel e)
  ) =>
  SessionID ->
  ResponseStream e m (Value VALID) ->
  Output SUB e m
handleResponseStream :: forall (m :: * -> *) e.
(Monad m, Eq (Channel e), Hashable (Channel e)) =>
SessionID -> ResponseStream e m (Value VALID) -> Output SUB e m
handleResponseStream SessionID
session (ResultT m (Result GQLError ([ResponseEvent e m], Value VALID))
res) =
  forall e (m :: * -> *).
(ApiContext SUB e m -> m (Either ByteString [Updates e m]))
-> Output SUB e m
SubOutput forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ Result GQLError ([ResponseEvent e m], Value VALID)
-> Either ByteString [Updates e m]
unfoldR forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Result GQLError ([ResponseEvent e m], Value VALID))
res
  where
    execute :: ResponseEvent e m -> Either ByteString (Updates e m)
execute Publish {} =
      forall a. [GQLError] -> Either ByteString a
apolloError
        [GQLError
"websocket can only handle subscriptions, not mutations"]
    execute (Subscribe Channel e
ch e -> m GQLResponse
subRes) = forall a b. b -> Either a b
Right forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e.
(Monad m, Eq (Channel e), Hashable (Channel e)) =>
Channel e -> (e -> m GQLResponse) -> SessionID -> Updates e m
startSession Channel e
ch e -> m GQLResponse
subRes SessionID
session
    --------------------------
    unfoldR :: Result GQLError ([ResponseEvent e m], Value VALID)
-> Either ByteString [Updates e m]
unfoldR Success {result :: forall err a. Result err a -> a
result = ([ResponseEvent e m]
events, Value VALID
_)} = forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ResponseEvent e m -> Either ByteString (Updates e m)
execute [ResponseEvent e m]
events
    unfoldR Failure {NonEmpty GQLError
errors :: forall err a. Result err a -> NonEmpty err
errors :: NonEmpty GQLError
errors} = forall a. [GQLError] -> Either ByteString a
apolloError (forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty GQLError
errors)
    --------------------------
    apolloError :: [GQLError] -> Either ByteString a
    apolloError :: forall a. [GQLError] -> Either ByteString a
apolloError = forall a b. a -> Either a b
Left forall b c a. (b -> c) -> (a -> b) -> a -> c
. ID -> GQLResponse -> ByteString
toApolloResponse (SessionID -> ID
sid SessionID
session) forall b c a. (b -> c) -> (a -> b) -> a -> c
. [GQLError] -> GQLResponse
Errors

handleWSRequest ::
  ( Monad m,
    Functor m,
    Eq ch,
    Hashable ch
  ) =>
  ( GQLRequest ->
    ResponseStream (Event ch con) m (Value VALID)
  ) ->
  UUID ->
  ByteString ->
  Output SUB (Event ch con) m
handleWSRequest :: forall (m :: * -> *) ch con.
(Monad m, Functor m, Eq ch, Hashable ch) =>
(GQLRequest -> ResponseStream (Event ch con) m (Value VALID))
-> UUID -> ByteString -> Output SUB (Event ch con) m
handleWSRequest GQLRequest -> ResponseStream (Event ch con) m (Value VALID)
gqlApp UUID
clientId = Either ByteString ApolloAction -> Output SUB (Event ch con) m
handle forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Either ByteString ApolloAction
apolloFormat
  where
    -- handle :: Applicative m => Validation ApolloAction -> Stream SUB e m
    handle :: Either ByteString ApolloAction -> Output SUB (Event ch con) m
handle = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall (m :: * -> *) e.
Applicative m =>
Either ByteString [Updates e m] -> Output SUB e m
liftWS forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left) ApolloAction -> Output SUB (Event ch con) m
handleAction
    --------------------------------------------------
    -- handleAction :: ApolloAction -> Stream SUB e m
    handleAction :: ApolloAction -> Output SUB (Event ch con) m
handleAction ApolloAction
ConnectionInit = forall (m :: * -> *) e.
Applicative m =>
Either ByteString [Updates e m] -> Output SUB e m
liftWS forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right []
    handleAction (SessionStart ID
sessionId GQLRequest
request) =
      forall (m :: * -> *) e.
(Monad m, Eq (Channel e), Hashable (Channel e)) =>
SessionID -> ResponseStream e m (Value VALID) -> Output SUB e m
handleResponseStream (UUID -> ID -> SessionID
SessionID UUID
clientId ID
sessionId) (GQLRequest -> ResponseStream (Event ch con) m (Value VALID)
gqlApp GQLRequest
request)
    handleAction (SessionStop ID
sessionId) =
      forall (m :: * -> *) e.
Applicative m =>
Either ByteString [Updates e m] -> Output SUB e m
liftWS forall a b. (a -> b) -> a -> b
$
        forall a b. b -> Either a b
Right [forall ch con (m :: * -> *).
(Eq ch, Hashable ch) =>
SessionID -> Updates (Event ch con) m
endSession (UUID -> ID -> SessionID
SessionID UUID
clientId ID
sessionId)]

liftWS ::
  Applicative m =>
  Either ByteString [Updates e m] ->
  Output SUB e m
liftWS :: forall (m :: * -> *) e.
Applicative m =>
Either ByteString [Updates e m] -> Output SUB e m
liftWS = forall e (m :: * -> *).
(ApiContext SUB e m -> m (Either ByteString [Updates e m]))
-> Output SUB e m
SubOutput forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> b -> a
const forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a. Applicative f => a -> f a
pure

runStreamWS ::
  (Monad m) =>
  ApiContext SUB e m ->
  Output SUB e m ->
  m ()
runStreamWS :: forall (m :: * -> *) e.
Monad m =>
ApiContext SUB e m -> Output SUB e m -> m ()
runStreamWS scope :: ApiContext SUB e m
scope@SubContext {ByteString -> m ()
callback :: ByteString -> m ()
callback :: forall (m :: * -> *) event.
ApiContext SUB event m -> ByteString -> m ()
callback} SubOutput {ApiContext SUB e m -> m (Either ByteString [Updates e m])
streamWS :: ApiContext SUB e m -> m (Either ByteString [Updates e m])
streamWS :: forall e (m :: * -> *).
Output SUB e m
-> ApiContext SUB e m -> m (Either ByteString [Updates e m])
streamWS} =
  ApiContext SUB e m -> m (Either ByteString [Updates e m])
streamWS ApiContext SUB e m
scope
    forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ByteString -> m ()
callback (forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall e (m :: * -> *). ApiContext SUB e m -> Updates e m -> m ()
run ApiContext SUB e m
scope))

runStreamHTTP ::
  (Monad m) =>
  ApiContext PUB e m ->
  Output PUB e m ->
  m GQLResponse
runStreamHTTP :: forall (m :: * -> *) e.
Monad m =>
ApiContext PUB e m -> Output PUB e m -> m GQLResponse
runStreamHTTP ApiContext PUB e m
scope PubOutput {ApiContext PUB e m -> m GQLResponse
streamHTTP :: ApiContext PUB e m -> m GQLResponse
streamHTTP :: forall e (m :: * -> *).
Output PUB e m -> ApiContext PUB e m -> m GQLResponse
streamHTTP} =
  ApiContext PUB e m -> m GQLResponse
streamHTTP ApiContext PUB e m
scope

toOutStream ::
  ( Monad m,
    Eq ch,
    Hashable ch
  ) =>
  ( GQLRequest ->
    ResponseStream (Event ch con) m (Value VALID)
  ) ->
  Input api ->
  Output api (Event ch con) m
toOutStream :: forall (m :: * -> *) ch con (api :: API).
(Monad m, Eq ch, Hashable ch) =>
(GQLRequest -> ResponseStream (Event ch con) m (Value VALID))
-> Input api -> Output api (Event ch con) m
toOutStream GQLRequest -> ResponseStream (Event ch con) m (Value VALID)
app (InitConnection UUID
clientId) =
  forall e (m :: * -> *).
(ApiContext SUB e m -> m (Either ByteString [Updates e m]))
-> Output SUB e m
SubOutput ApiContext SUB (Event ch con) m
-> m (Either ByteString [Updates (Event ch con) m])
handle
  where
    handle :: ApiContext SUB (Event ch con) m
-> m (Either ByteString [Updates (Event ch con) m])
handle ws :: ApiContext SUB (Event ch con) m
ws@SubContext {m ByteString
listener :: m ByteString
listener :: forall (m :: * -> *) event. ApiContext SUB event m -> m ByteString
listener, ByteString -> m ()
callback :: ByteString -> m ()
callback :: forall (m :: * -> *) event.
ApiContext SUB event m -> ByteString -> m ()
callback} = do
      let runS :: Output SUB (Event ch con) m
-> m (Either ByteString [Updates (Event ch con) m])
runS (SubOutput ApiContext SUB (Event ch con) m
-> m (Either ByteString [Updates (Event ch con) m])
x) = ApiContext SUB (Event ch con) m
-> m (Either ByteString [Updates (Event ch con) m])
x ApiContext SUB (Event ch con) m
ws
      Either ByteString [Updates (Event ch con) m]
bla <- m ByteString
listener forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Output SUB (Event ch con) m
-> m (Either ByteString [Updates (Event ch con) m])
runS forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) ch con.
(Monad m, Functor m, Eq ch, Hashable ch) =>
(GQLRequest -> ResponseStream (Event ch con) m (Value VALID))
-> UUID -> ByteString -> Output SUB (Event ch con) m
handleWSRequest GQLRequest -> ResponseStream (Event ch con) m (Value VALID)
app UUID
clientId
      forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ (forall e (m :: * -> *).
(ClientConnectionStore e m -> ClientConnectionStore e m)
-> Updates e m
Updates (forall (m :: * -> *) e.
UUID -> (ByteString -> m ()) -> StoreMap e m
insertConnection UUID
clientId ByteString -> m ()
callback) forall a. a -> [a] -> [a]
:) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either ByteString [Updates (Event ch con) m]
bla
toOutStream GQLRequest -> ResponseStream (Event ch con) m (Value VALID)
app (Request GQLRequest
req) =
  forall e (m :: * -> *).
(ApiContext PUB e m -> m GQLResponse) -> Output PUB e m
PubOutput forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e.
Monad m =>
ResponseStream e m (Value VALID)
-> ApiContext PUB e m -> m GQLResponse
handleResponseHTTP (GQLRequest -> ResponseStream (Event ch con) m (Value VALID)
app GQLRequest
req)

handleResponseHTTP ::
  ( Monad m
  ) =>
  ResponseStream e m (Value VALID) ->
  ApiContext PUB e m ->
  m GQLResponse
handleResponseHTTP :: forall (m :: * -> *) e.
Monad m =>
ResponseStream e m (Value VALID)
-> ApiContext PUB e m -> m GQLResponse
handleResponseHTTP
  ResponseStream e m (Value VALID)
res
  PubContext {e -> m ()
eventPublisher :: e -> m ()
eventPublisher :: forall event (m :: * -> *). ApiContext PUB event m -> event -> m ()
eventPublisher} = forall event (m :: * -> *) a.
ResultT event m a -> m (Result GQLError ([event], a))
runResultT (forall (m :: * -> *) e a e'.
Monad m =>
ResponseStream e m a
-> (ResponseEvent e m -> ResultT e' m e') -> ResultT e' m a
handleRes ResponseStream e m (Value VALID)
res forall {f :: * -> *} {e} {a} {m :: * -> *}.
(MonadError e f, IsString e) =>
ResponseEvent a m -> f a
execute) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Result GQLError ([e], Value VALID) -> m GQLResponse
runResult
    where
      runResult :: Result GQLError ([e], Value VALID) -> m GQLResponse
runResult Success {result :: forall err a. Result err a -> a
result = ([e]
events, Value VALID
result)} = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ e -> m ()
eventPublisher [e]
events forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Value VALID -> GQLResponse
Data Value VALID
result
      runResult Failure {NonEmpty GQLError
errors :: NonEmpty GQLError
errors :: forall err a. Result err a -> NonEmpty err
errors} = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ [GQLError] -> GQLResponse
Errors forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty GQLError
errors
      execute :: ResponseEvent a m -> f a
execute (Publish a
event) = forall (f :: * -> *) a. Applicative f => a -> f a
pure a
event
      execute Subscribe {} = forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError e
"http server can't handle subscription"

handleRes ::
  (Monad m) =>
  ResponseStream e m a ->
  (ResponseEvent e m -> ResultT e' m e') ->
  ResultT e' m a
handleRes :: forall (m :: * -> *) e a e'.
Monad m =>
ResponseStream e m a
-> (ResponseEvent e m -> ResultT e' m e') -> ResultT e' m a
handleRes ResponseStream e m a
res ResponseEvent e m -> ResultT e' m e'
execute = forall event (m :: * -> *) a.
m (Result GQLError ([event], a)) -> ResultT event m a
ResultT forall a b. (a -> b) -> a -> b
$ forall event (m :: * -> *) a.
ResultT event m a -> m (Result GQLError ([event], a))
runResultT ResponseStream e m a
res forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall event (m :: * -> *) a.
ResultT event m a -> m (Result GQLError ([event], a))
runResultT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) e e' a.
Monad m =>
(e -> ResultT e' m e')
-> Result GQLError ([e], a) -> ResultT e' m a
unfoldRes ResponseEvent e m -> ResultT e' m e'
execute

unfoldRes ::
  (Monad m) =>
  (e -> ResultT e' m e') ->
  Result GQLError ([e], a) ->
  ResultT e' m a
unfoldRes :: forall (m :: * -> *) e e' a.
Monad m =>
(e -> ResultT e' m e')
-> Result GQLError ([e], a) -> ResultT e' m a
unfoldRes e -> ResultT e' m e'
execute Success {result :: forall err a. Result err a -> a
result = ([e]
events, a
result), [GQLError]
warnings :: forall err a. Result err a -> [err]
warnings :: [GQLError]
..} = forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse e -> ResultT e' m e'
execute [e]
events forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [e'] -> ResultT e' m a
packResultT
  where
    packResultT :: [e'] -> ResultT e' m a
packResultT [e']
events' = forall event (m :: * -> *) a.
m (Result GQLError ([event], a)) -> ResultT event m a
ResultT forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Success {result :: ([e'], a)
result = ([e']
events', a
result), [GQLError]
warnings :: [GQLError]
warnings :: [GQLError]
..}
unfoldRes e -> ResultT e' m e'
_ Failure {NonEmpty GQLError
errors :: NonEmpty GQLError
errors :: forall err a. Result err a -> NonEmpty err
errors} = forall event (m :: * -> *) a.
m (Result GQLError ([event], a)) -> ResultT event m a
ResultT forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Failure {NonEmpty GQLError
errors :: NonEmpty GQLError
errors :: NonEmpty GQLError
errors}