{-# LANGUAGE OverloadedStrings #-}

module Hasql.Streams (
  CursorStreamFold,
  cursorStreamQuery,
) where

-- hasql
import Hasql.Statement

-- hasql-transaction-io
import Hasql.CursorTransactionIO as CursorTransactionIO

-- | A fold over a stream of @a@ values to produce an @r@ using @CursorTransactionIO s@
type CursorStreamFold s a r = 
  (forall x. 
    (x -> CursorTransactionIO s (Maybe (a, x))) ->
    CursorTransactionIO s x ->
    r
  )

-- | Run a `Statement` using a cursor inside a `CursorTransactionIO` to produce a stream of @a@s, which are consumed by a `CursorStreamFold`. This function can produce any type of stream without depending on any particular stream library.
cursorStreamQuery :: forall params s a r.
  Statement params [a] -> params -> CursorStreamFold s a r -> r
cursorStreamQuery :: Statement params [a] -> params -> CursorStreamFold s a r -> r
cursorStreamQuery Statement params [a]
stmt params
params CursorStreamFold s a r
foldStream = (StreamState s a
 -> CursorTransactionIO s (Maybe (a, StreamState s a)))
-> CursorTransactionIO s (StreamState s a) -> r
CursorStreamFold s a r
foldStream StreamState s a
-> CursorTransactionIO s (Maybe (a, StreamState s a))
step CursorTransactionIO s (StreamState s a)
init
  where
    init :: CursorTransactionIO s (StreamState s a)
    init :: CursorTransactionIO s (StreamState s a)
init = do
      Cursor s [a]
cursor <- params
-> Statement params [a] -> CursorTransactionIO s (Cursor s [a])
forall params result s.
params
-> Statement params result
-> CursorTransactionIO s (Cursor s result)
declareCursorFor params
params Statement params [a]
stmt
      StreamState s a -> CursorTransactionIO s (StreamState s a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cursor s [a]
cursor, [])
    step :: StreamState s a -> CursorTransactionIO s (Maybe (a, StreamState s a))
    step :: StreamState s a
-> CursorTransactionIO s (Maybe (a, StreamState s a))
step (Cursor s [a]
cursor, [a]
batch) = case [a]
batch of
      (a
a:[a]
batch') -> Maybe (a, StreamState s a)
-> CursorTransactionIO s (Maybe (a, StreamState s a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (a, StreamState s a)
 -> CursorTransactionIO s (Maybe (a, StreamState s a)))
-> Maybe (a, StreamState s a)
-> CursorTransactionIO s (Maybe (a, StreamState s a))
forall a b. (a -> b) -> a -> b
$ (a, StreamState s a) -> Maybe (a, StreamState s a)
forall a. a -> Maybe a
Just (a
a, (Cursor s [a]
cursor, [a]
batch'))
      [] -> do
        [a]
batch' <- Cursor s [a] -> CursorTransactionIO s [a]
forall s a. Cursor s a -> CursorTransactionIO s a
fetchWithCursor Cursor s [a]
cursor
        if [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
Prelude.null [a]
batch'
        then Maybe (a, StreamState s a)
-> CursorTransactionIO s (Maybe (a, StreamState s a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (a, StreamState s a)
forall a. Maybe a
Nothing
        else StreamState s a
-> CursorTransactionIO s (Maybe (a, StreamState s a))
step (Cursor s [a]
cursor, [a]
batch')

type StreamState s a = (Cursor s [a], [a])