{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} module Database.InfluxDB.Writer ( Config(..), Handle , createHandle, newHandle , Value(..), Tags, Fields , writePoint, writePoint' ) where import Data.Int import Data.Monoid import Data.Pool import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T import Data.Map (Map) import qualified Data.Map as M import Control.Monad import Control.Concurrent import Control.Concurrent.STM import System.Clock import Network.HTTP.Client import Network.HTTP.Client.TLS (tlsManagerSettings) import Prelude data Config = Config { cURL :: !String , cDB :: !String } data Handle = Handle { hPool :: Pool (TQueue (Maybe Point)) -- ^ A pool of threads which consume messages from 'TQueue's and write -- them in batches to InfluxDB. } createHandle :: Config -> IO (Either () Handle) createHandle c = do manager <- newManager tlsManagerSettings newHandle c manager newHandle :: Config -> Manager -> IO (Either () Handle) newHandle c manager = do req <- mkRequestTemplate <$> parseUrl (cURL c) pool <- createPool (allocateResource req) releaseResource 1 600 3 return $ Right $ Handle pool where mkRequestTemplate req = req { method = "POST" , path = "/write" , queryString = "?db=" <> T.encodeUtf8 (T.pack (cDB c)) } dequeuePoints :: [Point] -> TQueue (Maybe Point) -> Int -> STM ([Point], Bool) dequeuePoints acc queue n | n <= 0 = return $ (reverse acc, False) | otherwise = do mbPoint <- tryReadTQueue queue case mbPoint of Nothing -> return $ (reverse acc, False) Just Nothing -> return $ (reverse acc, True) Just (Just p) -> dequeuePoints (p:acc) queue (n - 1) dequeueBatch :: [Point] -> TQueue (Maybe Point) -> TimeSpec -> Int -> IO ([Point], Bool) dequeueBatch acc queue start n = do mbNextBatch <- atomically $ dequeuePoints [] queue n case mbNextBatch of (points, True) -> return (points, True) (points, False) -> do -- We can collect more points if we still have space AND time -- left. Otherwise return what we have. now <- getTime Monotonic if length points < n && timeSpecAsNanoSecs (diffTimeSpec start now) < (10 * 1000000000) then dequeueBatch (acc ++ points) queue start (n - length points) else return $ (acc ++ points, False) flushQueue :: Request -> TQueue (Maybe Point) -> IO () flushQueue req queue = do -- Dequeue the next batch of points. This operation will block until -- 20 points are available or a timeout is reached, whichever comes -- first. start <- getTime Monotonic (points, isLast) <- dequeueBatch [] queue start 20 let sleep = threadDelay $ 5 * 1000000 flush = flushPoints manager req points continue = flushQueue req queue -- Deciding what to do next is a bit tricky. case (length points, isLast) of (0, True) -> return () (0, False) -> sleep >> continue (_, True) -> flush (_, False) -> flush >> continue allocateResource :: Request -> IO (TQueue (Maybe Point)) allocateResource req = do queue <- newTQueueIO -- Fork off a thread which periodically flushes the queue. void $ forkIO $ flushQueue req queue return queue releaseResource queue = atomically $ writeTQueue queue Nothing -- | A 'Value' is either an integer, a floating point number, a boolean or -- string. data Value = I !Int64 | F !Double | B !Bool | S !Text deriving (Show, Eq) type Tags = Map Text Text type Fields = Map Text Value data Point = Point { pMeasurement :: !Text , pTags :: !Tags , pFields :: !Fields , pTimestamp :: !(Maybe Int64) } deriving (Show, Eq) -- | Write a point to the database. Generates a timestamp from the local clock. writePoint :: Handle -> Text -> Tags -> Fields -> IO () writePoint h measurement tags fields = do timestamp <- fromIntegral . timeSpecAsNanoSecs <$> getTime Realtime writePoint' h measurement tags fields timestamp -- | Same as 'writePoint' but allows the caller to supply the timestamp. writePoint' :: Handle -> Text -> Tags -> Fields -> Int64 -> IO () writePoint' h measurement tags fields timestamp = withResource (hPool h) $ \queue -> atomically $ writeTQueue queue $ Just $ Point measurement tags fields (Just timestamp) flushPoints :: Manager -> Request -> [Point] -> IO () flushPoints manager requestTemplate points = do void $ httpLbs req manager where renderValue :: Value -> Text renderValue (I x) = (T.pack $ show x) <> "i" renderValue (F x) = (T.pack $ show x) renderValue (B x) = (T.pack $ if x then "true" else "false") renderValue (S x) = "\"" <> x <> "\"" tagsList :: Tags -> [Text] tagsList tags = map (\(k,v) -> k <> "=" <> v) $ M.toList tags fieldsList fields = map (\(k,v) -> k <> "=" <> renderValue v) $ M.toList fields line :: Point -> Text line Point{..} = mconcat $ [ T.intercalate "," ([pMeasurement] ++ (tagsList pTags)) , " " , T.intercalate "," (fieldsList pFields) ] ++ maybe [] (\x -> [" ", T.pack $ show x]) pTimestamp body = T.encodeUtf8 $ T.intercalate "\n" $ map line points req = requestTemplate { requestBody = RequestBodyBS body }