{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}

module AWS.Transcribe.Client (runClient) where

import AWS.Credentials (Credentials)
import AWS.Transcribe.Channel (Channel (MkChannel))
import AWS.Transcribe.EventStream (hEventType, hValueString, mkStreamingMessage, payload)
import qualified AWS.Transcribe.PreSignedUrl as PS
import AWS.Transcribe.Settings
import AWS.Transcribe.StreamingResponse (StreamingError (..), StreamingResponse (EndOfStream, Error, Event))
import Control.Concurrent.Async (Async, cancel, poll, withAsync)
import Control.Concurrent.STM (TQueue, atomically, readTQueue, writeTQueue)
import Control.Lens ((^.))
import qualified Data.Aeson as AE
import Data.Binary (decode, encode)
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time (getCurrentTime)
import qualified Network.WebSockets as WS
import qualified Wuss as WS

{- | Start a @WebSocket@ client.
 The client will probe the channel for incoming `BL.ByteString`, encode it in
 the expected @Event Stream@ and transmit them to the AWS Transcribe endpoint.
 When a response is received it will be written in the channel.

 Sending an empty `BL.ByteString` will close the @WebSocket@ connection. Receiving any kind
 of exception from @AWS Transcribe@ will also close the connection.
-}
runClient ::
    -- | AWS Credentials
    Credentials ->
    -- | Session settings
    Settings ->
    -- | Transcribe channel
    Channel ->
    IO ()
runClient :: Credentials -> Settings -> Channel -> IO ()
runClient Credentials
creds Settings
settings (MkChannel TQueue ByteString
wQ TQueue StreamingResponse
rQ) = do
    String -> IO ()
putStrLn String
"start client"
    UTCTime
now <- IO UTCTime
getCurrentTime
    String -> PortNumber -> String -> ClientApp () -> IO ()
forall a. String -> PortNumber -> String -> ClientApp a -> IO a
WS.runSecureClient
        (Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ Region -> Text
PS.host (Region -> Text) -> Region -> Text
forall a b. (a -> b) -> a -> b
$ Settings -> Region
region Settings
settings)
        PortNumber
8443
        (Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
T.decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Credentials -> Settings -> UTCTime -> ByteString
PS.path Credentials
creds Settings
settings UTCTime
now)
        (ClientApp () -> IO ()) -> ClientApp () -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn ->
            IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (Connection -> TQueue ByteString -> IO ()
send Connection
conn TQueue ByteString
wQ) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
handle ->
                Connection -> TQueue StreamingResponse -> Async () -> IO ()
receive Connection
conn TQueue StreamingResponse
rQ Async ()
handle

send :: WS.Connection -> TQueue BL.ByteString -> IO ()
send :: Connection -> TQueue ByteString -> IO ()
send Connection
conn TQueue ByteString
wQ = IO ()
go
  where
    go :: IO ()
go = do
        ByteString
pl <- STM ByteString -> IO ByteString
forall a. STM a -> IO a
atomically (STM ByteString -> IO ByteString)
-> STM ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ TQueue ByteString -> STM ByteString
forall a. TQueue a -> STM a
readTQueue TQueue ByteString
wQ
        Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Message -> ByteString
forall a. Binary a => a -> ByteString
encode (Message -> ByteString) -> Message -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> Message
mkStreamingMessage (ByteString -> Message) -> ByteString -> Message
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
BL.toStrict ByteString
pl
        -- Sending an empty message will shut down the socket
        if ByteString -> Bool
BL.null ByteString
pl then () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure () else IO ()
go

receive :: WS.Connection -> TQueue StreamingResponse -> Async () -> IO ()
receive :: Connection -> TQueue StreamingResponse -> Async () -> IO ()
receive Connection
conn TQueue StreamingResponse
rQ Async ()
handle = IO ()
go
  where
    go :: IO ()
go = TQueue StreamingResponse -> Async () -> IO () -> IO ()
withRunningSend TQueue StreamingResponse
rQ Async ()
handle (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Message
message <- Connection -> IO Message
WS.receive Connection
conn
        case Message
message of
            WS.DataMessage Bool
_ Bool
_ Bool
_ (WS.Binary ByteString
msg) -> do
                -- TODO: Use decodeOrFail
                let decoded :: Message
decoded = ByteString -> Message
forall a. Binary a => ByteString -> a
decode ByteString
msg
                let response :: StreamingResponse
response = case Message
decoded Message -> Getting Text Message Text -> Text
forall s a. s -> Getting a s a -> a
^. (Header -> Const Text Header) -> Message -> Const Text Message
Lens' Message Header
hEventType ((Header -> Const Text Header) -> Message -> Const Text Message)
-> ((Text -> Const Text Text) -> Header -> Const Text Header)
-> Getting Text Message Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Const Text Text) -> Header -> Const Text Header
Lens' Header Text
hValueString of
                        Text
"BadRequestException" -> StreamingError -> StreamingResponse
Error StreamingError
BadRequestException
                        Text
"InternalFailureException" -> StreamingError -> StreamingResponse
Error StreamingError
InternalFailureException
                        Text
"LimitExceededException" -> StreamingError -> StreamingResponse
Error StreamingError
LimitExceededException
                        Text
"UnrecognizedClientException" -> StreamingError -> StreamingResponse
Error StreamingError
UnrecognizedClientException
                        Text
"TranscriptEvent" ->
                            case ByteString -> Either String TranscriptEvent
forall a. FromJSON a => ByteString -> Either String a
AE.eitherDecodeStrict (ByteString -> Either String TranscriptEvent)
-> ByteString -> Either String TranscriptEvent
forall a b. (a -> b) -> a -> b
$ Message
decoded Message -> Getting ByteString Message ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString Message ByteString
Lens' Message ByteString
payload of
                                Left String
err -> StreamingError -> StreamingResponse
Error (StreamingError -> StreamingResponse)
-> StreamingError -> StreamingResponse
forall a b. (a -> b) -> a -> b
$ String -> StreamingError
TranscriptEventError String
err
                                Right TranscriptEvent
trans -> TranscriptEvent -> StreamingResponse
Event TranscriptEvent
trans
                        Text
hOther -> StreamingError -> StreamingResponse
Error (StreamingError -> StreamingResponse)
-> StreamingError -> StreamingResponse
forall a b. (a -> b) -> a -> b
$ Message -> String -> StreamingError
OtherStreamingError Message
decoded (String -> StreamingError) -> String -> StreamingError
forall a b. (a -> b) -> a -> b
$ String
"Unknown header: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack Text
hOther

                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue StreamingResponse -> StreamingResponse -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue StreamingResponse
rQ StreamingResponse
response
                IO ()
go
            -- On close, clean up the send async action and close the connection
            WS.ControlMessage (WS.Close Word16
_ ByteString
_) -> Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
handle IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue StreamingResponse -> StreamingResponse -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue StreamingResponse
rQ StreamingResponse
EndOfStream) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            -- Ignore other messages
            Message
_ -> IO ()
go

-- Perform an action while the async send has not raised an exception. If an
-- exception is raised in the send thread, a `EndOfStream` message is sent
-- to the stream and the action is not run.
withRunningSend :: TQueue StreamingResponse -> Async () -> IO () -> IO ()
withRunningSend :: TQueue StreamingResponse -> Async () -> IO () -> IO ()
withRunningSend TQueue StreamingResponse
rQ Async ()
handle IO ()
action = do
    Maybe (Either SomeException ())
status <- Async () -> IO (Maybe (Either SomeException ()))
forall a. Async a -> IO (Maybe (Either SomeException a))
poll Async ()
handle
    case Maybe (Either SomeException ())
status of
        -- If the sendthread returns, we still run the receive action
        -- to catch any in-flight messages
        Just (Right ()
_) -> IO ()
action
        Just (Left SomeException
_) -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue StreamingResponse -> StreamingResponse -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue StreamingResponse
rQ StreamingResponse
EndOfStream) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Maybe (Either SomeException ())
Nothing -> IO ()
action