module Network.Riak.Montage.Main where import System.IO (hSetBuffering, BufferMode(..), stdout, stderr) import Control.Monad (forever, void) import Control.Concurrent (forkIO, threadDelay) import qualified Data.Text as T import qualified Data.ByteString.Lazy.Char8 as B import Network.StatsWeb (initStats, addCounter, incCounter, runStats, Stats) import Network.Riak.Montage.Protocol import Network.Riak.Montage.Process (newEmptyConcurrentState, generateRequest) import Network.Riak.Montage.Types import Network.Riak.Montage.Util -- | Stats broadcast to localhost, port 3344 montageStats :: [T.Text] montageStats = [ "pulse" , "requests" , "requests.slow" , "requests.many.siblings" , "requests.big" ] sleepForever :: IO a sleepForever = forever $ threadDelay (1000000 * 3600) simpleCallback :: LogCallback simpleCallback logType _ val = logError $ (B.unpack logType) ++ " " ++ show val -- | Proxy configuration. Configurable fields: proxyPort, logger, statsPrefix. Non-configurable fields: generator. cfg :: (MontageRiakValue a) => Config a cfg = Config { proxyPort = 7078 , logger = simpleCallback , statsPrefix = "montage" , generator = generateRequest } -- | Start the resolution proxy, where you define resolutions for your data @a@, and create one or more Riak connection pools @p@. runDaemon :: (MontageRiakValue a, Poolable p) => Config a -> p -> IO () runDaemon cfg' pools = do hSetBuffering stdout LineBuffering hSetBuffering stderr LineBuffering stats <- initStats (statsPrefix cfg') mapM_ (addCounter stats) montageStats state <- newEmptyConcurrentState let chooser' = chooser pools let logging = logger cfg' let runOn = "tcp://*:" ++ show (proxyPort cfg') void $ forkIO $ loggedSupervise logging "network-zeromq" $ serveMontageZmq (generator cfg') runOn state logging chooser' stats void $ forkIO $ loggedSupervise logging "timekeeper" $ timeKeeper stats void $ forkIO $ runStats stats 3334 sleepForever timeKeeper :: Stats -> IO a timeKeeper stats = forever $ do logError "TICK" incCounter "pulse" stats threadDelay (5 * 1000000)