module App.EventBus where
import Control.Applicative
import Control.Concurrent
import Control.Monad
import Data.Maybe
import Data.List (foldl', foldl1')
import Data.Monoid
import qualified Data.Set as Set
import Data.Time.Clock
import qualified Data.Map as Map
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import System.IO.Unsafe
import Debug.Trace
EQ />/ b = b
a />/ _ = a
a /</ EQ = a
_ /</ b = b
g %=> f = f `on` g
on f g a b = f (g a) (g b)
a =<<^ b = \m -> b >> a m
data TimeSpan =
Persistent
| Time DiffTime
| Iterations Int
deriving (Eq,Ord,Show)
seconds :: Integer -> TimeSpan
seconds = Time . secondsToDiffTime
minutes :: Integer -> TimeSpan
minutes = Time . secondsToDiffTime . (60*)
hours :: Integer -> TimeSpan
hours = Time . secondsToDiffTime . (3600*)
days :: Integer -> TimeSpan
days = Time . secondsToDiffTime . (86400*)
once :: TimeSpan
once = Iterations 1
data Diff a =
Insertion (Event a)
| Deletion (Event a)
instance Show (Diff a) where
show (Insertion a) = show ("Insertion",group a, src a, ename a, timespan a)
show (Deletion a) = show ("Deletion",group a, src a, ename a, timespan a)
data EData a =
EString String
| EByteString B.ByteString
| EByteStringL [B.ByteString]
| ELByteString LB.ByteString
| ELByteStringL [LB.ByteString]
| EChar Char
| EDouble Double
| EInt Int
| EBool Bool
| EStringL [String]
| EDoubleL [Double]
| EIntL [Int]
| EBoolL [Bool]
| EOther a
| EAssoc (String,EData a)
| EAssocL [(String,EData a)]
| EOtherL [a]
deriving (Eq, Show, Read)
fromEString (EString a) = a
fromEByteString (EByteString a) = a
fromEByteStringL (EByteStringL a) = a
fromELByteString (ELByteString a) = a
fromELByteStringL (ELByteStringL a) = a
fromEChar (EChar a) = a
fromEDouble (EDouble a) = a
fromEInt (EInt a) = a
fromEBool (EBool a) = a
fromEStringL (EStringL a) = a
fromEDoubleL (EDoubleL a) = a
fromEIntL (EIntL a) = a
fromEBoolL (EBoolL a) = a
fromEOther (EOther a) = a
fromEAssoc (EAssoc a) = a
fromEAssocL (EAssocL a) = a
fromEOtherL (EOtherL a) = a
safeShow :: Maybe Int -> EData a -> String
safeShow n (EString s) = maybe s ((flip take) s) n
safeShow n (EStringL s) = maybe (show s) ((flip take) (show s)) n
safeShow n (EByteString _) = "ByteString data"
safeShow n (EByteStringL _) = "ByteString list data"
safeShow n (EChar c) = [c]
safeShow n (EDouble x) = maybe (show x) ((flip take) (show x)) n
safeShow n (EDoubleL x) = maybe (show x) ((flip take) (show x)) n
safeShow n (EInt x) = maybe (show x) ((flip take) (show x)) n
safeShow n (EIntL x) = maybe (show x) ((flip take) (show x)) n
safeShow n (EBool x) = maybe (show x) ((flip take) (show x)) n
safeShow n (EBoolL x) = maybe (show x) ((flip take) (show x)) n
safeShow n (EAssoc (x,y)) = x ++ " -> " ++ safeShow n y
safeShow n (EAssocL xs) = concat $ (\(a,b) -> "(" ++ a ++ " -> " ++ safeShow n b ++ ")\n" ) <$> xs
safeShow n (EOther _) = "Other data"
safeShow n (EOtherL _) = "Other data list"
data Event a = Event
{ ename :: String
, group :: String
, timespan :: TimeSpan
, eventdata :: a
, src :: String
, time :: UTCTime }
instance Ord (Event a) where
compare l r = ((src %=> compare) l r) />/
((group %=> compare) l r) />/
((ename %=> compare) l r)
instance Eq (Event a) where
x == y = (ename %=> (==)) x y &&
(group %=> (==)) x y &&
(src %=> (==)) x y
data Bus a = Bus
{ nameMap :: Map.Map String (Set.Set (Event a))
, srcMap :: Map.Map String (Set.Set (Event a))
, groupMap :: Map.Map String (Set.Set (Event a))
, fullyQualifiedMap :: Map.Map (String,String,String) (Event a) }
instrument bname behave = behave
instance Show (Bus a) where
show = concat . map showQName . Map.elems . fullyQualifiedMap
showQName ev = show (group ev, src ev, ename ev, timespan ev)
eventsByName :: String -> Bus a -> Set.Set (Event a)
eventsByName n = fromMaybe Set.empty . Map.lookup n . nameMap
eventsBySource :: String -> Bus a -> Set.Set (Event a)
eventsBySource s = fromMaybe Set.empty . Map.lookup s . srcMap
eventsByGroup :: String -> Bus a -> Set.Set (Event a)
eventsByGroup g = fromMaybe Set.empty . Map.lookup g . groupMap
eventByQName:: String -> String -> String -> Bus a -> Maybe (Event a)
eventByQName g s n = Map.lookup (g,s,n) . fullyQualifiedMap
eventsFor (Just g) Nothing Nothing b = eventsByGroup g b
eventsFor Nothing (Just s) Nothing b = eventsBySource s b
eventsFor Nothing Nothing (Just n) b = eventsByName n b
eventsFor (Just g) (Just s) (Just n) b = maybe Set.empty (Set.singleton) (eventByQName g s n b)
eventsFor g s n b = persection gset . persection sset $ nset
where gset = fromMaybe Set.empty $ (flip eventsByGroup) b <$> g
sset = fromMaybe Set.empty $ (flip eventsBySource) b <$> s
nset = fromMaybe Set.empty $ (flip eventsByName) b <$> n
persection a b | a == Set.empty = b
| b == Set.empty = a
| otherwise = Set.intersection a b
filteredEventsFor (Left g) (Right sfilter) (Right nfilter) b = filter nfilter . filter sfilter . Set.toList $ eventsByGroup g b
filteredEventsFor (Right gfilter) (Left s) (Right nfilter) b = filter gfilter . filter nfilter . Set.toList $ eventsBySource s b
filteredEventsFor (Right gfilter) (Right sfilter) (Left n) b = filter gfilter . filter sfilter . Set.toList $ eventsByName n b
filteredEventsFor (Left g) (Left s) (Left n) b = maybe [] (\a -> [a]) (eventByQName g s n b)
filteredEventsFor (Right gfilter) (Right sfilter) (Right nfilter) b = filter gfilter . filter sfilter . filter nfilter . map snd . Map.toList . fullyQualifiedMap $ b
filteredEventsFor (Right gfilter) (Left s) (Left n) b = filter gfilter . Set.toList$ eventsFor Nothing (Just s) (Just n) b
filteredEventsFor (Left g) (Right sfilter) (Left n) b = filter sfilter . Set.toList $ eventsFor (Just g) Nothing (Just n) b
filteredEventsFor (Left g) (Left s) (Right nfilter) b = filter nfilter . Set.toList $ eventsFor (Just g) (Just s) Nothing b
topEvent = head . Set.toList
instance Monoid (Bus a) where
mempty = emptyBus
mappend (Bus n0 s0 g0 f0) (Bus n1 s1 g1 f1) = Bus (Map.union n0 n1) (Map.union s0 s1) (Map.union g0 g1) (Map.union f0 f1)
emptyBus :: Bus a
emptyBus = Bus Map.empty Map.empty Map.empty Map.empty
addEvent :: Event a -> Bus a -> Bus a
addEvent edata b = b{ nameMap = Map.insertWith (Set.union) (ename edata) (singleton edata) (nameMap b)
, srcMap = Map.insertWith (Set.union) (src edata) (singleton edata) (srcMap b)
, groupMap = Map.insertWith (Set.union) (group edata) (singleton edata) (groupMap b)
, fullyQualifiedMap = Map.insert (group edata, src edata, ename edata) edata (fullyQualifiedMap b) }
type Widget a = MVar (Bus a) -> IO ()
type Future a = IO (Bus a, MVar [Diff a])
future :: Bus a -> IO [Diff a] -> Future a
future b thunk = do
ref <- newEmptyMVar
forkIO $ thunk >>= putMVar ref
return (b,ref)
immediate = takeMVar
type Behaviour a = Bus a -> Future a
instance Monoid (Behaviour a) where
mempty = passthrough
mappend = (>~>)
passthrough :: Behaviour a
passthrough a = future a (return [])
(<~<) :: Behaviour a -> Behaviour a -> Behaviour a
behaviour1 <~< behaviour0 = \m -> behaviour0 m >>= applyDiff >>= behaviour1
(>~>) :: Behaviour a -> Behaviour a -> Behaviour a
behaviour0 >~> behaviour1 = \m -> behaviour0 m >>= applyDiff >>= behaviour1
(|~|) :: Behaviour a -> Behaviour a -> Behaviour a
behaviour0 |~| behaviour1 = \m -> future m $ do
(_,mv0) <- behaviour0 m
(_,mv1) <- behaviour1 m
value0 <- takeMVar mv0
value1 <- takeMVar mv1
return $ value0 ++ value1
behind = (>~>)
beside = (|~|)
infrontof = (<~<)
applyDiff (m,ds) = immediate ds >>= (\k -> (return . foldl' busDiff m) $ k)
where busDiff b (Insertion ev) = b{ nameMap = Map.insertWith (union') (ename ev) (singleton ev) (nameMap b)
, srcMap = Map.insertWith (union') (src ev) (singleton ev) (srcMap b)
, groupMap = Map.insertWith (union') (group ev) (singleton ev) (groupMap b)
, fullyQualifiedMap = Map.insert (group ev, src ev, ename ev) ev (fullyQualifiedMap b) }
busDiff b (Deletion ev) = b { nameMap = deleteOneFrom ev (ename ev) (nameMap b)
, srcMap = deleteOneFrom ev (src ev) (srcMap b)
, groupMap = deleteOneFrom ev (group ev) (groupMap b)
, fullyQualifiedMap = Map.delete (group ev, src ev, ename ev) (fullyQualifiedMap b) }
deleteOneFrom ev key mp = case Map.lookup key mp of
Just eset -> let eset' = Set.delete ev eset in if eset' == Set.empty then Map.delete key mp else Map.insert key eset' mp
Nothing -> mp
union' v st = Set.union (Set.difference st v) v
bus :: [Widget a] -> IO b -> Behaviour a -> IO ()
bus widgets widgetThunk behaviour = do
evBus <- newMVar emptyBus
forM_ widgets ($evBus)
let loop = do
widgetThunk
busIteration evBus behaviour
loop
loop
busIteration :: MVar (Bus a) -> Behaviour a -> IO ()
busIteration b behaviour = do
v <- tryTakeMVar b
case v of
Nothing -> return ()
Just m -> do diffs <- behaviour m
bus' <- applyDiff diffs
bus'' <- expire <$> decrementTimeSpan bus'
putMVar b bus''
produce :: String -> String -> String -> TimeSpan -> a -> IO (Diff a)
produce group source nm timetolive edata =
(return . Insertion . Event nm group timetolive edata source) =<< getCurrentTime
produce' :: String -> String -> String -> TimeSpan -> a -> MVar (Bus a) -> IO ()
produce' group source nm timetolive edata b = getCurrentTime >>= \t -> modifyMVar_ b (return . addEvent (Event nm group timetolive edata source t))
consumeNamedEventsCollectivelyWith :: Bus a -> String -> (Set.Set (Event a) -> IO [Diff a]) -> Future a
consumeNamedEventsCollectivelyWith em nm f =
maybe (future em . return $ [])
(\ev -> future em $ (map Deletion (Set.toList ev) ++) <$> f ev)
(Map.lookup nm (nameMap em))
consumeNamedEvents :: String -> Behaviour a
consumeNamedEvents nm b =
maybe (future b . return $ [])
(\ev -> future b . return $ Deletion <$> Set.toList ev)
(Map.lookup nm . nameMap $ b)
consumeEventGroup :: String -> Behaviour a
consumeEventGroup g b =
maybe (future b . return $ [])
(\ev -> future b . return $ Deletion <$> Set.toList ev)
(Map.lookup g . groupMap $ b)
consumeEventsFromSource :: String -> Behaviour a
consumeEventsFromSource s b =
maybe (future b . return $ [])
(\ev -> future b . return $ Deletion <$> Set.toList ev)
(Map.lookup s . srcMap $ b)
consumeFullyQualifiedEvent :: String -> String -> String -> Behaviour a
consumeFullyQualifiedEvent g s n b =
maybe (future b . return $ [])
(\ev -> future b . return $ [Deletion ev])
(Map.lookup (g, s, n) . fullyQualifiedMap $ b)
modifyEventData :: Event a -> (a -> a) -> [Diff a]
modifyEventData ev f = [Insertion ev{ eventdata = f . eventdata $ ev }]
modifyEvent :: Event a -> (Event a -> Event a) -> [Diff a]
modifyEvent ev f = let ev' = f ev in if ev==ev' then [Insertion ev'] else [Deletion ev, Insertion ev']
consumeNamedEventsWith :: Bus a -> String -> (Event a -> IO [Diff a]) -> Future a
consumeNamedEventsWith b n f =
future b $ concat <$> ((\l -> (map Deletion l :) <$> mapM f l) . Set.toList $ fromMaybe Set.empty (Map.lookup n (nameMap b)))
consumeEventGroupCollectivelyWith :: Bus a -> String -> (Set.Set (Event a) -> IO [Diff a]) -> Future a
consumeEventGroupCollectivelyWith em gp f =
maybe (future em . return $ [])
(\ev -> future em $ (map Deletion (Set.toList ev) ++) <$> f ev)
(Map.lookup gp (groupMap em))
consumeEventGroupWith :: Bus a -> String -> (Event a -> IO [Diff a]) -> Future a
consumeEventGroupWith b n f =
future b $ concat <$> ((\l -> (map Deletion l :) <$> mapM f l) . Set.toList $ fromMaybe Set.empty (Map.lookup n (groupMap b)))
consumeEventsFromSourceCollectivelyWith :: Bus a -> String -> (Set.Set (Event a) -> IO [Diff a]) -> Future a
consumeEventsFromSourceCollectivelyWith em source f =
maybe (future em . return $ [])
(\ev -> future em $ (map Deletion (Set.toList ev) ++) <$> f ev)
(Map.lookup source (srcMap em))
consumeEventsFromSourceWith :: Bus a -> String -> (Event a -> IO [Diff a]) -> Future a
consumeEventsFromSourceWith b n f =
future b $ concat <$> ((\l -> (map Deletion l :) <$> mapM f l) . Set.toList $ fromMaybe Set.empty (Map.lookup n (srcMap b)))
consumeFullyQualifiedEventWith :: Bus a -> String -> String -> String -> (Event a -> IO [Diff a]) -> Future a
consumeFullyQualifiedEventWith em group source name f =
maybe (future em . return $ [])
(\ev -> future em $ (Deletion ev :) <$> f ev)
(Map.lookup (group,source,name) (fullyQualifiedMap em))
pollNamedEventsCollectivelyWith :: Bus a -> String -> (Set.Set (Event a) -> IO [Diff a]) -> Future a
pollNamedEventsCollectivelyWith b nm f = maybe (future b . return $[]) (future b . f) (Map.lookup nm (nameMap b))
pollNamedEventsWith :: Bus a -> String -> (Event a -> IO [Diff a]) -> Future a
pollNamedEventsWith b nm f = future b $ concat <$> (mapM f . Set.toList $ fromMaybe Set.empty (Map.lookup nm (nameMap b)))
pollEventGroupCollectivelyWith :: Bus a -> String -> (Set.Set (Event a) -> IO [Diff a]) -> Future a
pollEventGroupCollectivelyWith b nm f = maybe (future b . return $[]) (future b . f) (Map.lookup nm (groupMap b))
pollEventGroupWith :: Bus a -> String -> (Event a -> IO [Diff a]) -> Future a
pollEventGroupWith b nm f = future b $ concat <$> (mapM f . Set.toList $ fromMaybe Set.empty (Map.lookup nm (groupMap b)))
pollEventsFromSourceCollectivelyWith :: Bus a -> String -> (Set.Set (Event a) -> IO [Diff a]) -> Future a
pollEventsFromSourceCollectivelyWith b nm f = maybe (future b . return $[]) (future b . f) (Map.lookup nm (srcMap b))
pollEventsFromSourceWith :: Bus a -> String -> (Event a -> IO [Diff a]) -> Future a
pollEventsFromSourceWith b nm f = future b $ concat <$> (mapM f . Set.toList $ fromMaybe Set.empty (Map.lookup nm (srcMap b)))
pollFullyQualifiedEventWith :: Bus a -> String -> String -> String -> (Event a -> IO [Diff a]) -> Future a
pollFullyQualifiedEventWith b gp source nm f = maybe (future b . return $ []) (future b . f) (Map.lookup (gp,source,nm) (fullyQualifiedMap b))
pollAllEventsWith :: Bus a -> (Event a -> IO [Diff a]) -> Future a
pollAllEventsWith b f = future b $ concat <$> (mapM f . Map.elems . fullyQualifiedMap $ b)
pollAllEventsCollectivelyWith :: Bus a -> (Set.Set (Event a) -> IO [Diff a]) -> Future a
pollAllEventsCollectivelyWith b f = future b $ f . Set.fromList . Map.elems . fullyQualifiedMap $ b
singleton a = Set.fromList [a]
decrementTimeSpan b = return $ b{ nameMap = Map.map decrements (nameMap b)
, srcMap = Map.map decrements (srcMap b)
, groupMap = Map.map decrements (groupMap b)
, fullyQualifiedMap = Map.map decrement (fullyQualifiedMap b) }
where decrement e = e{ timespan = decTimeSpan e (timespan e) }
decrements = Set.map (\e -> e{timespan = decTimeSpan e (timespan e)} )
decTimeSpan _ Persistent = Persistent
decTimeSpan e (Time x) = Time . realToFrac $ diffUTCTime (addUTCTime (realToFrac x) (time e)) (unsafePerformIO getCurrentTime)
decTimeSpan _ (Iterations x) = (Iterations (x1))
expire b = b'
where current (Time x) = x > 0
current Persistent = True
current (Iterations x) = x > 0
b' = Bus (Map.filter (/=Set.empty) . Map.map (Set.filter (current . timespan)) . nameMap $ b)
(Map.filter (/=Set.empty) . Map.map (Set.filter (current . timespan)) . srcMap $ b)
(Map.filter (/=Set.empty) . Map.map (Set.filter (current . timespan)) . groupMap $ b)
(Map.filter (current . timespan) . fullyQualifiedMap $ b)
listM v = v >>= return . (:[])