{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
module AWS.Transcribe.Client (runClient) where
import AWS.Credentials (Credentials)
import AWS.Transcribe.Channel (Channel (MkChannel))
import AWS.Transcribe.EventStream (hEventType, hValueString, mkStreamingMessage, payload)
import qualified AWS.Transcribe.PreSignedUrl as PS
import AWS.Transcribe.Settings
import AWS.Transcribe.StreamingResponse (StreamingError (..), StreamingResponse (EndOfStream, Error, Event))
import Control.Concurrent.Async (Async, cancel, poll, withAsync)
import Control.Concurrent.STM (TQueue, atomically, readTQueue, writeTQueue)
import Control.Lens ((^.))
import qualified Data.Aeson as AE
import Data.Binary (decode, encode)
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time (getCurrentTime)
import qualified Network.WebSockets as WS
import qualified Wuss as WS
runClient ::
Credentials ->
Settings ->
Channel ->
IO ()
runClient :: Credentials -> Settings -> Channel -> IO ()
runClient Credentials
creds Settings
settings (MkChannel TQueue ByteString
wQ TQueue StreamingResponse
rQ) = do
String -> IO ()
putStrLn String
"start client"
UTCTime
now <- IO UTCTime
getCurrentTime
String -> PortNumber -> String -> ClientApp () -> IO ()
forall a. String -> PortNumber -> String -> ClientApp a -> IO a
WS.runSecureClient
(Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ Region -> Text
PS.host (Region -> Text) -> Region -> Text
forall a b. (a -> b) -> a -> b
$ Settings -> Region
region Settings
settings)
PortNumber
8443
(Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
T.decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Credentials -> Settings -> UTCTime -> ByteString
PS.path Credentials
creds Settings
settings UTCTime
now)
(ClientApp () -> IO ()) -> ClientApp () -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn ->
IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (Connection -> TQueue ByteString -> IO ()
send Connection
conn TQueue ByteString
wQ) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
handle ->
Connection -> TQueue StreamingResponse -> Async () -> IO ()
receive Connection
conn TQueue StreamingResponse
rQ Async ()
handle
send :: WS.Connection -> TQueue BL.ByteString -> IO ()
send :: Connection -> TQueue ByteString -> IO ()
send Connection
conn TQueue ByteString
wQ = IO ()
go
where
go :: IO ()
go = do
ByteString
pl <- STM ByteString -> IO ByteString
forall a. STM a -> IO a
atomically (STM ByteString -> IO ByteString)
-> STM ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ TQueue ByteString -> STM ByteString
forall a. TQueue a -> STM a
readTQueue TQueue ByteString
wQ
Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Message -> ByteString
forall a. Binary a => a -> ByteString
encode (Message -> ByteString) -> Message -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> Message
mkStreamingMessage (ByteString -> Message) -> ByteString -> Message
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
BL.toStrict ByteString
pl
if ByteString -> Bool
BL.null ByteString
pl then () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure () else IO ()
go
receive :: WS.Connection -> TQueue StreamingResponse -> Async () -> IO ()
receive :: Connection -> TQueue StreamingResponse -> Async () -> IO ()
receive Connection
conn TQueue StreamingResponse
rQ Async ()
handle = IO ()
go
where
go :: IO ()
go = TQueue StreamingResponse -> Async () -> IO () -> IO ()
withRunningSend TQueue StreamingResponse
rQ Async ()
handle (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Message
message <- Connection -> IO Message
WS.receive Connection
conn
case Message
message of
WS.DataMessage Bool
_ Bool
_ Bool
_ (WS.Binary ByteString
msg) -> do
let decoded :: Message
decoded = ByteString -> Message
forall a. Binary a => ByteString -> a
decode ByteString
msg
let response :: StreamingResponse
response = case Message
decoded Message -> Getting Text Message Text -> Text
forall s a. s -> Getting a s a -> a
^. (Header -> Const Text Header) -> Message -> Const Text Message
Lens' Message Header
hEventType ((Header -> Const Text Header) -> Message -> Const Text Message)
-> ((Text -> Const Text Text) -> Header -> Const Text Header)
-> Getting Text Message Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Const Text Text) -> Header -> Const Text Header
Lens' Header Text
hValueString of
Text
"BadRequestException" -> StreamingError -> StreamingResponse
Error StreamingError
BadRequestException
Text
"InternalFailureException" -> StreamingError -> StreamingResponse
Error StreamingError
InternalFailureException
Text
"LimitExceededException" -> StreamingError -> StreamingResponse
Error StreamingError
LimitExceededException
Text
"UnrecognizedClientException" -> StreamingError -> StreamingResponse
Error StreamingError
UnrecognizedClientException
Text
"TranscriptEvent" ->
case ByteString -> Either String TranscriptEvent
forall a. FromJSON a => ByteString -> Either String a
AE.eitherDecodeStrict (ByteString -> Either String TranscriptEvent)
-> ByteString -> Either String TranscriptEvent
forall a b. (a -> b) -> a -> b
$ Message
decoded Message -> Getting ByteString Message ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString Message ByteString
Lens' Message ByteString
payload of
Left String
err -> StreamingError -> StreamingResponse
Error (StreamingError -> StreamingResponse)
-> StreamingError -> StreamingResponse
forall a b. (a -> b) -> a -> b
$ String -> StreamingError
TranscriptEventError String
err
Right TranscriptEvent
trans -> TranscriptEvent -> StreamingResponse
Event TranscriptEvent
trans
Text
hOther -> StreamingError -> StreamingResponse
Error (StreamingError -> StreamingResponse)
-> StreamingError -> StreamingResponse
forall a b. (a -> b) -> a -> b
$ Message -> String -> StreamingError
OtherStreamingError Message
decoded (String -> StreamingError) -> String -> StreamingError
forall a b. (a -> b) -> a -> b
$ String
"Unknown header: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack Text
hOther
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue StreamingResponse -> StreamingResponse -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue StreamingResponse
rQ StreamingResponse
response
IO ()
go
WS.ControlMessage (WS.Close Word16
_ ByteString
_) -> Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
handle IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue StreamingResponse -> StreamingResponse -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue StreamingResponse
rQ StreamingResponse
EndOfStream) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Message
_ -> IO ()
go
withRunningSend :: TQueue StreamingResponse -> Async () -> IO () -> IO ()
withRunningSend :: TQueue StreamingResponse -> Async () -> IO () -> IO ()
withRunningSend TQueue StreamingResponse
rQ Async ()
handle IO ()
action = do
Maybe (Either SomeException ())
status <- Async () -> IO (Maybe (Either SomeException ()))
forall a. Async a -> IO (Maybe (Either SomeException a))
poll Async ()
handle
case Maybe (Either SomeException ())
status of
Just (Right ()
_) -> IO ()
action
Just (Left SomeException
_) -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue StreamingResponse -> StreamingResponse -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue StreamingResponse
rQ StreamingResponse
EndOfStream) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe (Either SomeException ())
Nothing -> IO ()
action