module Data.CQRS.Internal.Repository
    ( Repository(..)
    , Settings(..)
    , defaultSettings
    , enumerateEventStore
    , enumerateAndStreamEvents
    , newRepository
    , withEventStoreBackend
    ) where

-- External imports
import           Control.DeepSeq (NFData)
import           Control.Monad (forever)
import           Control.Monad.Trans.Class (lift)
import           Data.Conduit (ResourceT, Source, addCleanup, bracketP, yield)
import           Data.Conduit.Pool (Pool, withResource, takeResource, mrReuse, mrValue)

-- Internal imports
import           Data.CQRS.Internal.EventBus
import qualified Data.CQRS.Internal.EventStore as ES
import           Data.CQRS.Internal.EventStore (EventStore(..))
import           Data.CQRS.EventStore.Backend
import           Data.CQRS.PersistedEvent
import           Data.CQRS.Serializable

-- | Repository settings
data Settings = Settings
    { settingsSnapshotFrequency :: Maybe Int
    }

-- | Default repository settings.
defaultSettings :: Settings
defaultSettings = Settings
    { settingsSnapshotFrequency = Nothing
    }

-- | Repository consisting of an event store and an event bus.
data Repository e b = Repository
    { repositoryEventStoreBackendPool :: Pool b  -- ^ Event store backend pool
    , repositoryEventBus :: EventBus e           -- ^ Event bus to use
    , repositorySettings :: Settings             -- ^ Repository settings
    }

-- | Create a repository from a pool of event store backends.
newRepository :: (EventStoreBackend b, Show e, NFData e) => Settings -> Pool b -> IO (Repository e b)
newRepository settings eventStoreBackendPool = do
  eventBus <- newEventBus
  return $ Repository { repositoryEventStoreBackendPool = eventStoreBackendPool
                      , repositoryEventBus = eventBus
                      , repositorySettings = settings
                      }

-- | Use event store backed from repository to perform an action.
withEventStoreBackend :: (EventStoreBackend b) => Repository e b -> (b -> IO a) -> IO a
withEventStoreBackend (Repository p _ _) action =
    withResource p (\eventStoreBackend -> action eventStoreBackend)

-- | Enumerate all events which satisfy certain criteria from event
-- store associated with repository.
enumerateEventStore :: (Serializable e, EventStoreBackend b) => Repository e b -> Source (ResourceT IO) [PersistedEvent e]
enumerateEventStore (Repository p _ _) = do
  eventStoreBackend <- lift $ takeResource p
  addCleanup (mrReuse eventStoreBackend) $ ES.enumerateEventStore (EventStore $ mrValue eventStoreBackend)

-- State for the streaming thread.
data StreamingState e = Buffering [PersistedEvent e]
                      | EnumerationComplete [PersistedEvent e]
                      | Streaming
                        deriving (Show)

-- | Enumerate all events which satisfy criteria and stream
-- all new events from repository. All events which arrive
-- while enumerating are buffered until enumeration completes.
enumerateAndStreamEvents :: (Show e, Serializable e, EventStoreBackend b) => Repository e b -> Source (ResourceT IO) [PersistedEvent e]
enumerateAndStreamEvents repository@(Repository _ eventBus _) = do
  bracketP (subscribeToEventBus eventBus)
           (unsubscribeFromEventBus eventBus)
           (\subscription -> do
              enumerateEventStore repository
              subscriptionSource subscription)
  where
    subscriptionSource subscription = forever $ do
      e <- lift $ lift $ readEventFromSubscription subscription
      yield e