bolty
Native Haskell driver for Neo4j using the BOLT protocol (versions 4.4 through 5.4).
Features
- Connection pooling with configurable idle timeout, health checks, and validation strategies
- Cluster routing — automatic server discovery and read/write splitting for Neo4j causal clusters
- Sessions with bookmark-based causal consistency across transactions
- Transactions with automatic retry on transient errors (exponential backoff)
- Type-safe record decoding — composable
Decode / RowDecoder combinators, or derive via FromBolt
- All Neo4j types — nodes, relationships, paths, temporal (date, time, datetime, duration), spatial (point2D, point3D)
- Query metadata — server timing, query statistics, notifications, EXPLAIN/PROFILE plans
- TLS support via
crypton-connection
- Multi-auth — basic, bearer token, Kerberos, custom schemes, plus LOGON/LOGOFF for Bolt 5.1+
Quick start
import qualified Database.Bolty as Bolt
import Data.Default (def)
main :: IO ()
main = do
let cfg = def{ Bolt.scheme = Bolt.Basic "neo4j" "password", Bolt.use_tls = False }
case Bolt.validateConfig cfg of
Failure errs -> mapM_ putStrLn errs
Success vc -> do
conn <- Bolt.connect vc
result <- Bolt.runBolt conn $
Bolt.queryWith (Bolt.field "greeting" Bolt.text) "RETURN 'hello' AS greeting" mempty
print result -- Right (Vector ["hello"])
Bolt.close conn
Configuration
Use Data.Default.def for sensible defaults and override what you need. The config must be validated before use:
import Data.Default (def)
cfg = def
{ host = "db.example.com"
, port = 7687
, scheme = Basic "neo4j" "s3cret"
, use_tls = True
, timeout = 5000 -- ms
}
case validateConfig cfg of
Failure errs -> error (show errs)
Success vc -> connect vc >>= ...
Default config: 127.0.0.1:7687, no auth, TLS on, 10s timeout, BOLT 5.4 down to 4.4.
Authentication schemes
None -- no auth
Basic "user" "pass" -- username/password
Bearer "jwt-token" -- SSO / JWT token
Kerberos "base64-ticket" -- Kerberos
Custom "scheme" credentials -- custom auth provider
The BoltM monad
Queries run in BoltM, a thin ReaderT Connection IO wrapper:
runBolt :: Connection -> BoltM a -> IO a
Running queries
-- Auto-decode via FromBolt (returns Either DecodeError (Vector a))
query :: FromBolt a => Text -> HashMap Text Ps -> BoltM (Either DecodeError (Vector a))
-- Explicit decoder
queryWith :: RowDecoder a -> Text -> HashMap Text Ps -> BoltM (Either DecodeError (Vector a))
-- Raw result set (field names + records)
queryResult :: Text -> HashMap Text Ps -> BoltM ResultSet
-- Side-effects only, discard results
execute :: Text -> HashMap Text Ps -> BoltM ()
Pass mempty for no parameters.
Parameters
import Data.PackStream.Ps (Ps(..))
import qualified Data.HashMap.Lazy as H
queryWith decoder "MATCH (p:Person) WHERE p.age > $minAge RETURN p.name AS name"
(H.singleton "minAge" (PsInteger 21))
Record decoding
bolty provides composable, type-safe decoders. A Decode a extracts a single value from a Bolt cell; a RowDecoder a maps column names to decoders for a full row.
Primitive decoders
bool :: Decode Bool
int :: Decode Int -- may lose precision from Int64
int64 :: Decode Int64
float :: Decode Double
text :: Decode Text
bytes :: Decode ByteString
Combining with RowDecoder
data Person = Person { pName :: Text, pAge :: Int64 }
personDecoder :: RowDecoder Person
personDecoder = Person
<$> field "name" text
<*> field "age" int64
result <- runBolt conn $
queryWith personDecoder "MATCH (p:Person) RETURN p.name AS name, p.age AS age" mempty
Node property decoders
When a query returns full nodes, decode properties from within:
data Person = Person { name :: Text, age :: Int64 }
personDecoder :: RowDecoder Person
personDecoder = do
n <- field "p" node
pure $ Person
<$> nodeProperty "name" text n
<*> nodeProperty "age" int64 n
Other decoders
nullable :: Decode a -> Decode (Maybe a) -- NULL-safe
list :: Decode a -> Decode (Vector a) -- list values
dict :: Decode (HashMap Text Bolt) -- raw dictionary
node :: Decode Node -- graph node
relationship :: Decode Relationship -- graph relationship
path :: Decode Path -- graph path
uuid :: Decode UUID -- UUID from string
utcTime :: Decode UTCTime -- DateTime → UTCTime
day :: Decode Day -- Date → Day
timeOfDay :: Decode TimeOfDay -- LocalTime → TimeOfDay
aesonValue :: Decode Aeson.Value -- Bolt → aeson Value
Result sets
For multi-pass decoding (e.g. denormalized OPTIONAL MATCH results):
rs <- runBolt conn $ queryResult "MATCH (p:Person) RETURN p.name AS name, p.age AS age" mempty
-- Decode all rows
people <- either throwIO pure $ decodeResultSet personDecoder rs
-- Decode just the first row
first <- either throwIO pure $ decodeHead personDecoder rs
-- Group by a key field (consecutive grouping)
groups <- either throwIO pure $ groupByField (field "dept" (nullable text)) rs
Transactions
Basic transactions
withTransaction conn $ \txConn -> do
runBolt txConn $ execute "CREATE (p:Person {name: 'Alice'})" mempty
runBolt txConn $ execute "CREATE (p:Person {name: 'Bob'})" mempty
-- auto-commits on success, rolls back on exception
Retry on transient errors
withRetryTransaction defaultRetryConfig conn $ \txConn ->
runBolt txConn $ execute "CREATE (p:Person {name: 'Alice'})" mempty
-- retries up to 5 times with exponential backoff on transient Neo4j errors
RetryConfig controls maxRetries (default 5), initialDelay (200ms), and maxDelay (5s).
Connection pooling
pool <- createPool validatedConfig defaultPoolConfig
-- defaultPoolConfig: 10 max connections, 60s idle timeout, PingIfIdle 30s
withConnection pool $ \conn ->
runBolt conn $ query @Person "MATCH (p:Person) RETURN p" mempty
-- Convenience: pool + retry transaction in one call
withTransaction' pool $ \conn ->
runBolt conn $ execute "CREATE (n:Test)" mempty
destroyPool pool
Validation strategies
Control how connections are health-checked on checkout:
AlwaysPing -- send RESET before every use (safest)
PingIfIdle 30 -- only ping if idle > 30 seconds (default, good balance)
NeverPing -- skip health check (fastest, use in trusted environments)
Cluster routing
For Neo4j causal clusters with multiple servers:
let cfg = def{ scheme = Basic "neo4j" "pass", routing = EnableRouting Nothing }
routingPool <- createRoutingPool validatedConfig defaultRoutingPoolConfig
-- Writes go to a writer server, reads to a reader
withRoutingTransaction routingPool WriteAccess $ \conn ->
runBolt conn $ execute "CREATE (n:Test)" mempty
withRoutingTransaction routingPool ReadAccess $ \conn ->
runBolt conn $ queryWith decoder "MATCH (n) RETURN n" mempty
destroyRoutingPool routingPool
The routing pool automatically discovers servers via the ROUTE message, caches routing tables with TTL, and retries on different servers when a routing error occurs.
Sessions
Sessions track bookmarks for causal consistency across transactions:
session <- createSession pool defaultSessionConfig
-- Each transaction's bookmark is automatically passed to the next
writeTransaction session $ \conn ->
runBolt conn $ execute "CREATE (p:Person {name: 'Alice'})" mempty
readTransaction session $ \conn ->
runBolt conn $ queryWith decoder "MATCH (p:Person) RETURN p" mempty
-- ↑ guaranteed to see Alice because of bookmark chaining
bookmarks <- getLastBookmarks session
(result, meta) <- runBolt conn $ queryMetaWith decoder cypher params
-- meta :: QueryMeta contains:
-- parsedNotifications :: [Notification] -- warnings, deprecations
-- parsedStats :: Maybe QueryStats -- nodes/rels created/deleted
-- parsedPlan :: Maybe PlanNode -- EXPLAIN plan
-- parsedProfile :: Maybe ProfileNode -- PROFILE with execution stats
-- bookmark, db, tFirst, tLast -- timing and metadata
-- EXPLAIN a query without executing it
plan <- runBolt conn $ queryExplain "MATCH (n) RETURN n" mempty
-- PROFILE a query with actual execution statistics
(rows, profile) <- runBolt conn $ queryProfile "MATCH (n) RETURN n" mempty
Query logging
let cfg = def{ queryLogger = Just $ \ql meta -> do
putStrLn $ "Query: " <> show (qlCypher ql)
putStrLn $ "Rows: " <> show (qlRowCount ql)
putStrLn $ "Time: " <> show (qlClientTime ql) <> "ns"
}
Error handling
-- Check if an error is transient (safe to retry)
isTransient :: Error -> Bool
-- Check if an error is a routing error (server unreachable, etc.)
isRoutingError :: Error -> Bool
Bolt value types
Every cell in a query result is a Bolt value:
| Neo4j type |
Bolt constructor |
Haskell type inside |
| null |
BoltNull |
— |
| boolean |
BoltBoolean |
Bool |
| integer |
BoltInteger |
PSInteger |
| float |
BoltFloat |
Double |
| bytes |
BoltBytes |
ByteString |
| string |
BoltString |
Text |
| list |
BoltList |
Vector Bolt |
| map |
BoltDictionary |
HashMap Text Bolt |
| node |
BoltNode |
Node (id, labels, properties) |
| relationship |
BoltRelationship |
Relationship (id, start, end, type, properties) |
| path |
BoltPath |
Path (nodes, rels, indices) |
| date |
BoltDate |
Date (days since epoch) |
| time |
BoltTime |
Time (nanos, tz offset) |
| local time |
BoltLocalTime |
LocalTime (nanos since midnight) |
| datetime |
BoltDateTime |
DateTime (seconds, nanos) |
| datetime (zoned) |
BoltDateTimeZoneId |
DateTimeZoneId (seconds, nanos, tz name) |
| local datetime |
BoltLocalDateTime |
LocalDateTime (seconds, nanos) |
| duration |
BoltDuration |
Duration (months, days, seconds, nanos) |
| point (2d) |
BoltPoint2D |
Point2D (srid, x, y) |
| point (3d) |
BoltPoint3D |
Point3D (srid, x, y, z) |
Module structure
Public API — import these:
| Module |
Purpose |
Database.Bolty |
Main entry point, re-exports everything |
Database.Bolty.Decode |
Record decoders (Decode, RowDecoder, FromBolt) |
Database.Bolty.Pool |
Connection pooling |
Database.Bolty.Routing |
Cluster routing |
Database.Bolty.Session |
Sessions with bookmark management |
Database.Bolty.ResultSet |
Multi-pass result set decoding |
Database.Bolty.Logging |
Query log types |
Database.Bolty.Notification |
Server notifications |
Database.Bolty.Plan |
EXPLAIN/PROFILE plan types |
Database.Bolty.Stats |
Query statistics types |
Database.Bolty.Record |
Record type alias |
All other modules under Database.Bolty.Connection.*, Database.Bolty.Message.*, and Database.Bolty.Value.* are internal — exposed for bolty-streamly but not part of the stable API.
Supported GHC versions
9.6.7, 9.8.4, 9.10.3, 9.12.3
License
Apache-2.0