{-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilyDependencies #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE UndecidableInstances #-} -- | -- Module : Network.Ethereum.Contract.Event.MultiFilter -- Copyright : FOAM team 2018 -- License : BSD3 -- -- Maintainer : mail@akru.me -- Stability : experimental -- Portability : unportable -- -- Parallel multiple event filters. -- module Network.Ethereum.Contract.Event.MultiFilter ( -- * The @MultiFilter@ type MultiFilter(..) , minStartBlock , minEndBlock , modifyMultiFilter -- * With geth filters , multiEvent , multiEventMany -- * Without geth filters , multiEventNoFilter , multiEventManyNoFilter -- * Re-exports , Handlers , Handler(..) , Rec(..) ) where import Control.Concurrent (threadDelay) import Control.Monad (forM, void, when) import Control.Monad.IO.Class (MonadIO (..)) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Reader (ReaderT (..)) import Data.List (sortOn) import Data.Machine (MachineT, asParts, autoM, await, construct, final, repeatedly, runT, unfoldPlan, (~>)) import Data.Machine.Plan (PlanT, stop, yield) import Data.Maybe (catMaybes, fromJust, listToMaybe) import Data.Tagged (Tagged (..)) import Data.Vinyl (Rec ((:&), RNil), RecApplicative) import Data.Vinyl.CoRec (CoRec (..), Field, FoldRec, Handler (H), Handlers, coRecToRec, firstField, match, onField) import Data.Vinyl.Functor (Compose (..), Identity (..)) #if MIN_VERSION_vinyl(0,10,0) import Data.Vinyl (RPureConstrained) #else import Data.Proxy (Proxy (..)) import Data.Vinyl.TypeLevel (AllAllSat) #endif import Data.Solidity.Event (DecodeEvent (..)) import qualified Network.Ethereum.Api.Eth as Eth import Network.Ethereum.Api.Types (Change (..), DefaultBlock (..), Filter (..), Quantity) import Network.JsonRpc.TinyClient (JsonRpc(..)) import Network.Ethereum.Contract.Event.Common -------------------------------------------------------------------------------- -- | MultiFilters -------------------------------------------------------------------------------- data MultiFilter (es :: [*]) where NilFilters :: MultiFilter '[] (:?) :: Filter e -> MultiFilter es -> MultiFilter (e ': es) infixr 5 :? minEndBlock :: MultiFilter es -> DefaultBlock minEndBlock NilFilters = Pending minEndBlock (Filter _ _ e _ :? fs) = e `min` minEndBlock fs minStartBlock :: MultiFilter es -> DefaultBlock minStartBlock NilFilters = Pending minStartBlock (Filter _ s _ _ :? fs) = s `min` minStartBlock fs modifyMultiFilter :: (forall e. Filter e -> Filter e) -> MultiFilter es -> MultiFilter es modifyMultiFilter _ NilFilters = NilFilters modifyMultiFilter h (f :? fs) = h f :? modifyMultiFilter h fs multiEvent :: ( PollFilters es , QueryAllLogs es , MapHandlers m es (WithChange es) #if MIN_VERSION_vinyl(0,10,0) , RPureConstrained HasLogIndex (WithChange es) #else , AllAllSat '[HasLogIndex] (WithChange es) #endif , RecApplicative (WithChange es) , JsonRpc m ) => MultiFilter es -> Handlers es (ReaderT Change m EventAction) -> m () multiEvent fltrs = multiEventMany fltrs 0 data MultiFilterStreamState es = MultiFilterStreamState { mfssCurrentBlock :: Quantity , mfssInitialMultiFilter :: MultiFilter es , mfssWindowSize :: Integer } multiEventMany :: ( PollFilters es , QueryAllLogs es , MapHandlers m es (WithChange es) #if MIN_VERSION_vinyl(0,10,0) , RPureConstrained HasLogIndex (WithChange es) #else , AllAllSat '[HasLogIndex] (WithChange es) #endif , RecApplicative (WithChange es) , JsonRpc m ) => MultiFilter es -> Integer -> Handlers es (ReaderT Change m EventAction) -> m () multiEventMany fltrs window handlers = do start <- mkBlockNumber $ minStartBlock fltrs let initState = MultiFilterStreamState { mfssCurrentBlock = start , mfssInitialMultiFilter = fltrs , mfssWindowSize = window } mLastProcessedFilterState <- reduceMultiEventStream (playMultiLogs initState) handlers case mLastProcessedFilterState of Nothing -> startPolling (modifyMultiFilter (\f -> f {filterFromBlock = BlockWithNumber start}) fltrs) Just (act, lastBlock) -> do end <- mkBlockNumber . minEndBlock $ fltrs when (act /= TerminateEvent && lastBlock < end) $ let pollingFromBlock = lastBlock + 1 in startPolling (modifyMultiFilter (\f -> f {filterFromBlock = BlockWithNumber pollingFromBlock}) fltrs) where startPolling fltrs' = do fIds <- openMultiFilter fltrs' let pollTo = minEndBlock fltrs' void $ reduceMultiEventStream (pollMultiFilter fIds pollTo) handlers multiFilterStream :: JsonRpc m => MultiFilterStreamState es -> MachineT m k (MultiFilter es) multiFilterStream initialPlan = do unfoldPlan initialPlan $ \s -> do end <- lift . mkBlockNumber . minEndBlock . mfssInitialMultiFilter $ initialPlan filterPlan end s where filterPlan :: JsonRpc m => Quantity -> MultiFilterStreamState es -> PlanT k (MultiFilter es) m (MultiFilterStreamState es) filterPlan end initialState@MultiFilterStreamState{..} = do if mfssCurrentBlock > end then stop else do let to' = min end $ mfssCurrentBlock + fromInteger mfssWindowSize h :: forall e. Filter e -> Filter e h f = f { filterFromBlock = BlockWithNumber mfssCurrentBlock , filterToBlock = BlockWithNumber to' } yield (modifyMultiFilter h mfssInitialMultiFilter) filterPlan end initialState { mfssCurrentBlock = to' + 1 } weakenCoRec :: ( RecApplicative ts , FoldRec (t ': ts) (t ': ts) ) => Field ts -> Field (t ': ts) weakenCoRec = fromJust . firstField . (Compose Nothing :&) . coRecToRec type family WithChange (es :: [*]) = (es' :: [*]) | es' -> es where WithChange '[] = '[] WithChange (e : es) = FilterChange e : WithChange es class QueryAllLogs (es :: [*]) where queryAllLogs :: JsonRpc m => MultiFilter es -> m [Field (WithChange es)] instance QueryAllLogs '[] where queryAllLogs NilFilters = pure [] instance forall e i ni es. ( DecodeEvent i ni e , QueryAllLogs es , RecApplicative (WithChange es) , FoldRec (FilterChange e : WithChange es) (WithChange es) ) => QueryAllLogs (e:es) where queryAllLogs (f :? fs) = do changes <- Eth.getLogs f filterChanges <- liftIO . mkFilterChanges @_ @_ @e $ changes filterChanges' <- queryAllLogs fs pure $ map (CoRec . Identity) filterChanges <> map weakenCoRec filterChanges' class HasLogIndex a where getLogIndex :: a -> Maybe (Quantity, Quantity) instance HasLogIndex (FilterChange e) where getLogIndex FilterChange{..} = (,) <$> changeBlockNumber filterChangeRawChange <*> changeLogIndex filterChangeRawChange sortChanges #if MIN_VERSION_vinyl(0,10,0) :: ( RPureConstrained HasLogIndex es #else :: ( AllAllSat '[HasLogIndex] es #endif , RecApplicative es ) => [Field es] -> [Field es] sortChanges changes = #if MIN_VERSION_vinyl(0,10,0) let sorterProj change = onField @HasLogIndex getLogIndex change #else let sorterProj change = onField (Proxy @'[HasLogIndex]) getLogIndex change #endif in sortOn sorterProj changes class MapHandlers m es es' where mapHandlers :: Handlers es (ReaderT Change m EventAction) -> Handlers es' (m (Maybe (EventAction, Quantity))) instance Monad m => MapHandlers m '[] '[] where mapHandlers RNil = RNil instance ( Monad m , MapHandlers m es es' ) => MapHandlers m (e : es) (FilterChange e : es') where mapHandlers (H f :& fs) = let f' FilterChange{..} = do act <- runReaderT (f filterChangeEvent) filterChangeRawChange return ((,) act <$> changeBlockNumber filterChangeRawChange) in H f' :& mapHandlers fs reduceMultiEventStream :: ( Monad m , MapHandlers m es (WithChange es) ) => MachineT m k [Field (WithChange es)] -> Handlers es (ReaderT Change m EventAction) -> m (Maybe (EventAction, Quantity)) reduceMultiEventStream filterChanges handlers = fmap listToMaybe . runT $ filterChanges ~> autoM (processChanges handlers) ~> asParts ~> runWhile (\(act, _) -> act /= TerminateEvent) ~> final where runWhile p = repeatedly $ do v <- await if p v then yield v else yield v >> stop processChanges handlers' changes = fmap catMaybes $ forM changes $ \fc -> match fc (mapHandlers handlers') -- | 'playLogs' streams the 'filterStream' and calls eth_getLogs on these 'Filter' objects. playMultiLogs :: forall es k m. ( QueryAllLogs es #if MIN_VERSION_vinyl(0,10,0) , RPureConstrained HasLogIndex (WithChange es) #else , AllAllSat '[HasLogIndex] (WithChange es) #endif , RecApplicative (WithChange es) , JsonRpc m ) => MultiFilterStreamState es -> MachineT m k [Field (WithChange es)] playMultiLogs s = fmap sortChanges $ multiFilterStream s ~> autoM queryAllLogs data TaggedFilterIds (es :: [*]) where TaggedFilterNil :: TaggedFilterIds '[] TaggedFilterCons :: Tagged e Quantity -> TaggedFilterIds es -> TaggedFilterIds (e : es) class PollFilters (es :: [*]) where openMultiFilter :: JsonRpc m => MultiFilter es -> m (TaggedFilterIds es) checkMultiFilter :: JsonRpc m => TaggedFilterIds es -> m [Field (WithChange es)] closeMultiFilter :: JsonRpc m => TaggedFilterIds es -> m () instance PollFilters '[] where openMultiFilter _ = pure TaggedFilterNil checkMultiFilter _ = pure [] closeMultiFilter _ = pure () instance forall e i ni es. ( DecodeEvent i ni e , PollFilters es , RecApplicative (WithChange es) , FoldRec (FilterChange e : WithChange es) (WithChange es) ) => PollFilters (e:es) where openMultiFilter (f :? fs) = do fId <- Eth.newFilter f fsIds <- openMultiFilter fs pure $ TaggedFilterCons (Tagged fId) fsIds checkMultiFilter (TaggedFilterCons (Tagged fId) fsIds) = do changes <- Eth.getFilterChanges fId filterChanges <- liftIO . mkFilterChanges @_ @_ @e $ changes filterChanges' <- checkMultiFilter @es fsIds pure $ map (CoRec . Identity) filterChanges <> map weakenCoRec filterChanges' closeMultiFilter (TaggedFilterCons (Tagged fId) fsIds) = do _ <- Eth.uninstallFilter fId closeMultiFilter fsIds -- | Polls a filter from the given filterId until the target toBlock is reached. pollMultiFilter :: ( PollFilters es , RecApplicative (WithChange es) #if MIN_VERSION_vinyl(0,10,0) , RPureConstrained HasLogIndex (WithChange es) #else , AllAllSat '[HasLogIndex] (WithChange es) #endif , JsonRpc m ) => TaggedFilterIds es -> DefaultBlock -> MachineT m k [Field (WithChange es)] pollMultiFilter is = construct . pollPlan is where pollPlan (fIds :: TaggedFilterIds es) end = do bn <- lift $ Eth.blockNumber if BlockWithNumber bn > end then do _ <- lift $ closeMultiFilter fIds stop else do liftIO $ threadDelay 1000000 changes <- lift $ sortChanges <$> checkMultiFilter fIds yield changes pollPlan fIds end -------------------------------------------------------------------------------- multiEventNoFilter :: ( QueryAllLogs es , MapHandlers m es (WithChange es) #if MIN_VERSION_vinyl(0,10,0) , RPureConstrained HasLogIndex (WithChange es) #else , AllAllSat '[HasLogIndex] (WithChange es) #endif , RecApplicative (WithChange es) , JsonRpc m ) => MultiFilter es -> Handlers es (ReaderT Change m EventAction) -> m () multiEventNoFilter fltrs = multiEventManyNoFilter fltrs 0 multiEventManyNoFilter :: ( QueryAllLogs es , MapHandlers m es (WithChange es) #if MIN_VERSION_vinyl(0,10,0) , RPureConstrained HasLogIndex (WithChange es) #else , AllAllSat '[HasLogIndex] (WithChange es) #endif , RecApplicative (WithChange es) , JsonRpc m ) => MultiFilter es -> Integer -> Handlers es (ReaderT Change m EventAction) -> m () multiEventManyNoFilter fltrs window handlers = do start <- mkBlockNumber $ minStartBlock fltrs let initState = MultiFilterStreamState { mfssCurrentBlock = start , mfssInitialMultiFilter = fltrs , mfssWindowSize = window } mLastProcessedFilterState <- reduceMultiEventStream (playMultiLogs initState) handlers case mLastProcessedFilterState of Nothing -> let pollingFilterState = MultiFilterStreamState { mfssCurrentBlock = start , mfssInitialMultiFilter = fltrs , mfssWindowSize = 0 } in void $ reduceMultiEventStream (playNewMultiLogs pollingFilterState) handlers Just (act, lastBlock) -> do end <- mkBlockNumber . minEndBlock $ fltrs when (act /= TerminateEvent && lastBlock < end) $ let pollingFilterState = MultiFilterStreamState { mfssCurrentBlock = lastBlock + 1 , mfssInitialMultiFilter = fltrs , mfssWindowSize = 0 } in void $ reduceMultiEventStream (playNewMultiLogs pollingFilterState) handlers newMultiFilterStream :: JsonRpc m => MultiFilterStreamState es -> MachineT m k (MultiFilter es) newMultiFilterStream initialPlan = do unfoldPlan initialPlan $ \s -> do let end = minEndBlock . mfssInitialMultiFilter $ initialPlan filterPlan end s where filterPlan :: JsonRpc m => DefaultBlock -> MultiFilterStreamState es -> PlanT k (MultiFilter es) m (MultiFilterStreamState es) filterPlan end initialState@MultiFilterStreamState{..} = do if BlockWithNumber mfssCurrentBlock > end then stop else do newestBlockNumber <- lift . pollTillBlockProgress $ mfssCurrentBlock let h :: forall e. Filter e -> Filter e h f = f { filterFromBlock = BlockWithNumber mfssCurrentBlock , filterToBlock = BlockWithNumber newestBlockNumber } yield (modifyMultiFilter h mfssInitialMultiFilter) filterPlan end initialState { mfssCurrentBlock = newestBlockNumber + 1 } playNewMultiLogs :: forall es k m. ( QueryAllLogs es #if MIN_VERSION_vinyl(0,10,0) , RPureConstrained HasLogIndex (WithChange es) #else , AllAllSat '[HasLogIndex] (WithChange es) #endif , RecApplicative (WithChange es) , JsonRpc m ) => MultiFilterStreamState es -> MachineT m k [Field (WithChange es)] playNewMultiLogs s = fmap sortChanges $ newMultiFilterStream s ~> autoM queryAllLogs