{-# LANGUAGE FlexibleContexts #-} -------------------------------------------------------------------------------- -- | -- Module : EventSource.Store.Iterator -- Copyright : (C) 2018 Yorick Laupa -- License : (see the file LICENSE) -- Maintainer: Yorick Laupa -- Stability : experimental -- Portability: non-portable -- -------------------------------------------------------------------------------- module EventSource.Store.Internal.Iterator where -------------------------------------------------------------------------------- import Data.Int (Int32) -------------------------------------------------------------------------------- import Control.Concurrent.Async.Lifted (Async, wait) import Control.Monad.Base (MonadBase, liftBase) import Data.IORef.Lifted (IORef, atomicModifyIORef') -------------------------------------------------------------------------------- import EventSource.Types -------------------------------------------------------------------------------- -- | Represents batch information needed to read a stream. data Batch' a = Batch' { batchFrom :: !a , batchSize :: !Int32 } -------------------------------------------------------------------------------- -- | Starts a 'Batch' from a given point. The batch size is set to default, -- which is 500. startFrom :: a -> Batch' a startFrom from = Batch' from 500 -------------------------------------------------------------------------------- data IteratorOverState a = IteratorOverAvailable (Slice' a) | IteratorOverClosed -------------------------------------------------------------------------------- data IteratorOverAction a = IteratorOverEvent SavedEvent | IteratorOverNextBatch a | IteratorOverEndOfStream -------------------------------------------------------------------------------- iterateOver :: MonadBase IO m => IORef (IteratorOverState a) -> (Batch' a -> m (Async (ReadStatus (Slice' a)))) -> m (Maybe SavedEvent) iterateOver ref puller = go where go = do action <- atomicModifyIORef' ref $ \st -> case st of IteratorOverAvailable slice -> case sliceEvents slice of e:es -> let nextSlice = slice { sliceEvents = es } nxtSt = IteratorOverAvailable nextSlice in (nxtSt, IteratorOverEvent e) [] | sliceEndOfStream slice -> (IteratorOverClosed, IteratorOverEndOfStream) | otherwise -> let resp = IteratorOverNextBatch $ sliceNext slice in (st, resp) IteratorOverClosed -> (st, IteratorOverEndOfStream) case action of IteratorOverEvent e -> return $ Just e IteratorOverEndOfStream -> return Nothing IteratorOverNextBatch num -> do w <- puller (startFrom num) res <- liftBase $ wait w case res of ReadFailure _ -> do atomicModifyIORef' ref $ \_ -> (IteratorOverClosed, ()) return Nothing ReadSuccess slice -> do let nxtSt = IteratorOverAvailable slice atomicModifyIORef' ref $ \_ -> (nxtSt, ()) go