{-| Module : PostgresWebsockets.Broadcast Description : Build a Hasql.Notifications based producer 'Multiplexer'. Uses Broadcast module adding database as a source producer. This module provides a function to produce a 'Multiplexer' from a Hasql 'Connection'. The producer issues a LISTEN command upon Open commands and UNLISTEN upon Close. -} module PostgresWebsockets.HasqlBroadcast ( newHasqlBroadcaster , newHasqlBroadcasterOrError -- re-export , acquire , relayMessages , relayMessagesForever ) where import Protolude hiding (putErrLn) import Hasql.Connection import Hasql.Notifications import Data.Aeson (decode, Value(..)) import Data.HashMap.Lazy (lookupDefault) import Data.Either.Combinators (mapBoth) import Data.Function (id) import Control.Retry (RetryStatus(..), retrying, capDelay, exponentialBackoff) import PostgresWebsockets.Broadcast {- | Returns a multiplexer from a connection URI, keeps trying to connect in case there is any error. This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners -} newHasqlBroadcaster :: IO () -> Text -> Int -> ByteString -> IO Multiplexer newHasqlBroadcaster onConnectionFailure ch maxRetries = newHasqlBroadcasterForConnection . tryUntilConnected maxRetries where newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch {- | Returns a multiplexer from a connection URI or an error message on the left case This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners -} newHasqlBroadcasterOrError :: IO () -> Text -> ByteString -> IO (Either ByteString Multiplexer) newHasqlBroadcasterOrError onConnectionFailure ch = acquire >=> (sequence . mapBoth show (newHasqlBroadcasterForConnection . return)) where newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch tryUntilConnected :: Int -> ByteString -> IO Connection tryUntilConnected maxRetries = fmap (either (panic "Failure on connection retry") id) . retryConnection where retryConnection conStr = retrying retryPolicy shouldRetry (const $ acquire conStr) maxDelayInMicroseconds = 32000000 firstDelayInMicroseconds = 1000000 retryPolicy = capDelay maxDelayInMicroseconds $ exponentialBackoff firstDelayInMicroseconds shouldRetry :: RetryStatus -> Either ConnectionError Connection -> IO Bool shouldRetry RetryStatus{..} con = case con of Left err -> do putErrLn $ "Error connecting notification listener to database: " <> show err pure $ rsIterNumber < maxRetries - 1 _ -> return False {- | Returns a multiplexer from a channel and an IO Connection, listen for different database notifications on the provided channel using the connection produced. This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners To listen on channels *chat* @ import Protolude import PostgresWebsockets.HasqlBroadcast import PostgresWebsockets.Broadcast import Hasql.Connection main = do conOrError <- H.acquire "postgres://localhost/test_database" let con = either (panic . show) id conOrError :: Connection multi <- newHasqlBroadcaster con onMessage multi "chat" (\ch -> forever $ fmap print (atomically $ readTChan ch) @ -} newHasqlBroadcasterForChannel :: IO () -> Text -> IO Connection -> IO Multiplexer newHasqlBroadcasterForChannel onConnectionFailure ch getCon = do multi <- newMultiplexer openProducer $ const onConnectionFailure void $ relayMessagesForever multi return multi where toMsg :: ByteString -> ByteString -> Message toMsg c m = case decode (toS m) of Just v -> Message (channelDef c v) m Nothing -> Message c m lookupStringDef :: Text -> ByteString -> Value -> ByteString lookupStringDef key d (Object obj) = case lookupDefault (String $ toS d) key obj of String s -> toS s _ -> d lookupStringDef _ d _ = d channelDef = lookupStringDef "channel" openProducer msgQ = do con <- getCon listen con $ toPgIdentifier ch waitForNotifications (\c m-> atomically $ writeTQueue msgQ $ toMsg c m) con putErrLn :: Text -> IO () putErrLn = hPutStrLn stderr