module PostgRESTWS.HasqlBroadcast
( newHasqlBroadcaster
, newHasqlBroadcasterOrError
, acquire
, relayMessages
, relayMessagesForever
) where
import Protolude
import Hasql.Connection
import Data.Either.Combinators (mapBoth)
import System.IO (hPutStrLn)
import Data.Function (id)
import Control.Retry (RetryStatus, retrying, capDelay, exponentialBackoff)
import PostgRESTWS.Database
import PostgRESTWS.Broadcast
newHasqlBroadcaster :: ByteString -> IO Multiplexer
newHasqlBroadcaster = newHasqlBroadcasterForConnection . tryUntilConnected
newHasqlBroadcasterOrError :: ByteString -> IO (Either ByteString Multiplexer)
newHasqlBroadcasterOrError =
acquire >=> (sequence . mapBoth show (newHasqlBroadcasterForConnection . return))
tryUntilConnected :: ByteString -> IO Connection
tryUntilConnected =
fmap (either (panic "Failure on connection retry") id) . retryConnection
where
retryConnection conStr = retrying retryPolicy shouldRetry (const $ acquire conStr)
maxDelayInMicroseconds = 32000000
firstDelayInMicroseconds = 1000000
retryPolicy = capDelay maxDelayInMicroseconds $ exponentialBackoff firstDelayInMicroseconds
shouldRetry :: RetryStatus -> Either ConnectionError Connection -> IO Bool
shouldRetry _ con =
case con of
Left err ->
hPutStrLn stderr ("Error connecting notification listener to database: " <> show err)
>> return True
_ -> return False
newHasqlBroadcasterForConnection :: IO Connection -> IO Multiplexer
newHasqlBroadcasterForConnection getCon = do
multi <-
newMultiplexer (\cmds msgs-> do
con <- getCon
waitForNotifications
(\c m-> atomically $ writeTQueue msgs $ Message c m)
con
forever $ do
cmd <- atomically $ readTQueue cmds
case cmd of
Open ch -> listen con ch
Close ch -> unlisten con ch
) (\_ -> hPutStrLn stderr "Broadcaster is dead")
void $ relayMessagesForever multi
return multi