module Database.InfluxDB.Write
(
write
, writeBatch
, writeByteString
, WriteParams
, writeParams
, Types.server
, Types.database
, retentionPolicy
, Types.precision
, Types.manager
) where
import Control.Exception
import Control.Monad
import Data.Maybe
import Control.Lens
import qualified Data.Aeson as A
import qualified Data.Aeson.Types as A
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text.Encoding as TE
import qualified Network.HTTP.Types as HT
import Database.InfluxDB.Line
import Database.InfluxDB.Types as Types
import Database.InfluxDB.JSON
import qualified Network.HTTP.Client.Compat as HC
data WriteParams = WriteParams
{ _server :: !Server
, _database :: !Database
, _retentionPolicy :: !(Maybe Key)
, _precision :: !(Precision 'WriteRequest)
, _authentication :: !(Maybe Credentials)
, _manager :: !(Either HC.ManagerSettings HC.Manager)
}
writeParams :: Database -> WriteParams
writeParams _database = WriteParams
{ _server = localServer
, _precision = Nanosecond
, _retentionPolicy = Nothing
, _authentication = Nothing
, _manager = Left HC.defaultManagerSettings
, ..
}
write
:: Timestamp time
=> WriteParams
-> Line time
-> IO ()
write p@WriteParams {_precision} =
writeByteString p . encodeLine (scaleTo _precision)
writeBatch
:: (Timestamp time, Traversable f)
=> WriteParams
-> f (Line time)
-> IO ()
writeBatch p@WriteParams {_precision} =
writeByteString p . encodeLines (scaleTo _precision)
writeByteString :: WriteParams -> BL.ByteString -> IO ()
writeByteString params payload = do
manager' <- either HC.newManager return $ _manager params
response <- HC.httpLbs request manager'
let body = HC.responseBody response
status = HC.responseStatus response
if BL.null body
then do
let message = B8.unpack $ HT.statusMessage status
when (HT.statusIsServerError status) $
throwIO $ ServerError message
when (HT.statusIsClientError status) $
throwIO $ BadRequest message request
else case A.eitherDecode' body of
Left message ->
throwIO $ IllformedJSON message body
Right val -> case A.parse parseErrorObject val of
A.Success _ ->
fail $ "BUG: impossible code path in Database.InfluxDB.Write.writeByteString"
A.Error message -> do
when (HT.statusIsServerError status) $
throwIO $ ServerError message
when (HT.statusIsClientError status) $
throwIO $ BadRequest message request
fail $ "BUG: " ++ message
++ " in Database.InfluxDB.Write.writeByteString"
where
request = (writeRequest params)
{ HC.requestBody = HC.RequestBodyLBS payload
}
writeRequest :: WriteParams -> HC.Request
writeRequest WriteParams {..} =
HC.setQueryString qs HC.defaultRequest
{ HC.host = TE.encodeUtf8 _host
, HC.port = fromIntegral _port
, HC.secure = _ssl
, HC.method = "POST"
, HC.path = "/write"
}
where
Server {..} = _server
qs = concat
[ [("db", Just $ TE.encodeUtf8 $ databaseName _database)]
, fromMaybe [] $ do
Key name <- _retentionPolicy
return [("rp", Just (TE.encodeUtf8 name))]
, fromMaybe [] $ do
Credentials { _user = u, _password = p } <- _authentication
return
[ ("u", Just (TE.encodeUtf8 u))
, ("p", Just (TE.encodeUtf8 p))
]
]
makeLensesWith (lensRules & generateSignatures .~ False) ''WriteParams
server :: Lens' WriteParams Server
instance HasServer WriteParams where
server = Database.InfluxDB.Write.server
database :: Lens' WriteParams Database
instance HasDatabase WriteParams where
database = Database.InfluxDB.Write.database
retentionPolicy :: Lens' WriteParams (Maybe Key)
precision :: Lens' WriteParams (Precision 'WriteRequest)
instance HasPrecision 'WriteRequest WriteParams where
precision = Database.InfluxDB.Write.precision
manager :: Lens' WriteParams (Either HC.ManagerSettings HC.Manager)
instance HasManager WriteParams where
manager = Database.InfluxDB.Write.manager
authentication :: Lens' WriteParams (Maybe Credentials)
instance HasCredentials WriteParams where
authentication = Database.InfluxDB.Write.authentication