module Hasql.Core.Dispatcher where

import Hasql.Prelude
import Hasql.Core.Model
import qualified Hasql.Core.Socket as A
import qualified ByteString.StrictBuilder as B
import qualified BinaryParser as D
import qualified Hasql.Core.Request as C
import qualified Hasql.Core.UnauthenticatedSession as G
import qualified Hasql.Core.Loops.Serializer as H
import qualified Hasql.Core.Loops.Receiver as I
import qualified Hasql.Core.Loops.Sender as J
import qualified Hasql.Core.Loops.Interpreter as K


data Dispatcher =
  Dispatcher
    !ThreadId !ThreadId !ThreadId !ThreadId
    !(TQueue ByteString) !(TQueue H.Message) !(TQueue Response) !(TQueue K.ResultProcessor) !(TMVar Text)

start :: A.Socket -> (Notification -> IO ()) -> IO Dispatcher
start socket sendNotification =
  do
    outgoingBytesQueue <- newTQueueIO
    serializerMessageQueue <- newTQueueIO
    responseQueue <- newTQueueIO
    resultProcessorQueue <- newTQueueIO
    transportErrorVar <- newEmptyTMVarIO
    interpreterTid <-
      forkIO (K.loop
        (atomically (readTQueue responseQueue))
        (atomically (tryReadTQueue resultProcessorQueue))
        (sendNotification))
    serializerTid <-
      forkIO (H.loop
        (atomically (readTQueue serializerMessageQueue))
        (atomically . writeTQueue outgoingBytesQueue))
    senderTid <-
      forkIO (J.loop socket
        (atomically (readTQueue outgoingBytesQueue))
        (atomically . putTMVar transportErrorVar))
    receiverTid <-
      forkIO (I.loop socket
        (atomically . writeTQueue responseQueue)
        (atomically . putTMVar transportErrorVar))
    return (Dispatcher interpreterTid serializerTid senderTid receiverTid
      outgoingBytesQueue serializerMessageQueue responseQueue resultProcessorQueue transportErrorVar)

performRequest :: Dispatcher -> C.Request result -> IO (Either Error result)
performRequest (Dispatcher _ _ _ _ _ serializerMessageQueue _ resultProcessorQueue transportErrorVar) (C.Request builder ir) =
  do
    resultVar <- newEmptyTMVarIO
    atomically $ do
      writeTQueue resultProcessorQueue (K.ResultProcessor ir (atomically . putTMVar resultVar))
      writeTQueue serializerMessageQueue (H.SerializeMessage builder)
      writeTQueue serializerMessageQueue (H.FlushMessage)
    atomically (fmap (Left . TransportError) (readTMVar transportErrorVar) <|> takeTMVar resultVar)

stop :: Dispatcher -> IO ()
stop (Dispatcher interpreterTid serializerTid senderTid receiverTid _ _ _ _ _) =
  do
    killThread interpreterTid
    killThread serializerTid
    killThread senderTid
    killThread receiverTid

interact :: Dispatcher -> G.Session result -> IO (Either Error result)
interact dispatcher (G.Session free) =
  runExceptT $ iterM interpretFreeRequest free
  where
    interpretFreeRequest request =
      join (ExceptT (performRequest dispatcher request))