{-# LANGUAGE Trustworthy #-} {-# LANGUAGE CPP #-} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE MagicHash #-} {-| This module provides finite maps that only grow. It is based on a /concurrent skip list/ implementation of maps. This module is usually a more efficient alternative to `Data.LVar.PureMap`, and provides almost the same interface. However, it's always good to test multiple data structures if you have a performance-critical use case. -} module Data.LVar.SLMap ( -- * The type and its basic operations IMap, newEmptyMap, newMap, newFromList, insert, getKey, waitSize, waitValue, modify, -- * Quasi-deterministic operations freezeMap, traverseFrzn_, -- * Iteration and callbacks forEach, forEachHP, withCallbacksThenFreeze, -- * Higher-level derived operations copy, traverseMap, traverseMap_, -- * Alternate versions of derived ops that expose @HandlerPool@s they create traverseMapHP, traverseMapHP_, unionHP, -- * Debugging Helpers levelCounts ) where import Control.Exception (throw) import Control.Applicative import Data.Concurrent.SkipListMap as SLM import qualified Data.Map.Strict as M import qualified Data.LVar.IVar as IV import qualified Data.Foldable as F import Data.IORef (readIORef) import Data.UtilInternal (traverseWithKey_) import Data.List (intersperse) import Data.LVar.Generic import Data.LVar.Generic.Internal (unsafeCoerceLVar) import Control.Monad import Control.Monad.IO.Class import Control.LVish import Control.LVish.DeepFrz.Internal import Control.LVish.Internal as LI import Control.LVish.SchedIdempotent (newLV, putLV, putLV_, getLV, freezeLV) import qualified Control.LVish.SchedIdempotent as L import System.IO.Unsafe (unsafeDupablePerformIO) import GHC.Prim (unsafeCoerce#) import Prelude import Debug.Trace #ifdef GENERIC_PAR import qualified Control.Par.Class as PC import Control.Par.Class.Unsafe (internalLiftIO) import qualified Data.Splittable.Class as Sp import Data.Par.Splittable (pmapReduceWith_, mkMapReduce) #endif ------------------------------------------------------------------------------ -- IMaps implemented vis SkipListMap ------------------------------------------------------------------------------ -- | The map datatype itself. Like all other LVars, it has an @s@ parameter (think -- `STRef`) in addition to the @a@ parameter that describes the type of elements -- in the set. -- -- Performance note: this data structure reduces contention between parallel -- computations inserting into the map, but all /blocking/ computations are not as -- scalable. All continuations waiting for not-yet-present elements will currently -- share a single queue [2013.09.26]. data IMap k s v = Ord k => IMap {-# UNPACK #-} !(LVar s (SLM.SLMap k v) (k,v)) -- | Equality is physical equality, as with @IORef@s. instance Eq (IMap k s v) where IMap lv1 == IMap lv2 = state lv1 == state lv2 -- | An `IMap` can be treated as a generic container LVar. However, the polymorphic -- operations are less useful than the monomorphic ones exposed by this module. instance LVarData1 (IMap k) where -- | Get the exact contents of the map. Using this may cause your -- program to exhibit a limited form of nondeterminism: it will never -- return the wrong answer, but it may include synchronization bugs -- that can (nondeterministically) cause exceptions. freeze orig@(IMap (WrapLVar lv)) = WrapPar$ do freezeLV lv; return (unsafeCoerceLVar orig) -- | We can do better than the default here; this is /O(1)/: sortFrzn = AFoldable -- | This generic version has the disadvantage that it does not observe the KEY, -- only the value. addHandler mh (IMap (WrapLVar lv)) callb = WrapPar $ L.addHandler mh lv globalCB (\(_k,v) -> return$ Just$ unWrapPar$ callb v) where globalCB slm = unWrapPar $ SLM.foldlWithKey LI.liftIO (\() _k v -> forkHP mh $ callb v) () slm -- | The `IMap`s in this module also have the special property that they support an -- /O(1)/ freeze operation which immediately yields a `Foldable` container -- (`snapFreeze`). instance OrderedLVarData1 (IMap k) where snapFreeze is = unsafeCoerceLVar <$> freeze is -- `IMap` values can be returned as the result of a -- `runParThenFreeze`. Hence they need a `DeepFrz` instance. -- @DeepFrz@ is just a type-coercion. No bits flipped at runtime. instance DeepFrz a => DeepFrz (IMap k s a) where type FrzType (IMap k s a) = IMap k Frzn (FrzType a) frz = unsafeCoerceLVar -------------------------------------------------------------------------------- -- | The default number of skiplist levels defaultLevels :: Int defaultLevels = 8 -- | Create a fresh map with nothing in it. newEmptyMap :: Ord k => Par d s (IMap k s v) newEmptyMap = newEmptyMap_ defaultLevels -- | Create a fresh map with nothing in it, with the given number of skiplist -- levels. newEmptyMap_ :: Ord k => Int -> Par d s (IMap k s v) newEmptyMap_ n = fmap (IMap . WrapLVar) $ WrapPar $ newLV $ SLM.newSLMap n -- | Create a new map populated with initial elements. newMap :: Ord k => M.Map k v -> Par d s (IMap k s v) newMap mp = fmap (IMap . WrapLVar) $ WrapPar $ newLV $ do slm <- SLM.newSLMap defaultLevels traverseWithKey_ (\ k v -> do Added _ <- SLM.putIfAbsent slm k (return v) return () ) mp return slm -- | Create a new map drawing initial elements from an existing list. newFromList :: (Ord k, Eq v) => [(k,v)] -> Par d s (IMap k s v) newFromList ls = newFromList_ ls defaultLevels -- | Create a new map drawing initial elements from an existing list, with -- the given number of skip list levels. newFromList_ :: Ord k => [(k,v)] -> Int -> Par d s (IMap k s v) newFromList_ ls n = do m@(IMap lv) <- newEmptyMap_ n -- TODO: May want to consider parallelism here for sufficiently large inputs: forM_ ls $ \(k,v) -> LI.liftIO $ SLM.putIfAbsent (state lv) k $ return v return m -- | Register a per-element callback, then run an action in this context, and freeze -- when all (recursive) invocations of the callback are complete. Returns the final -- value of the provided action. withCallbacksThenFreeze :: forall k v b s . Eq b => IMap k s v -> (k -> v -> QPar s ()) -> QPar s b -> QPar s b withCallbacksThenFreeze (IMap lv) callback action = do hp <- newPool res <- IV.new let deltCB (k,v) = return$ Just$ unWrapPar$ callback k v initCB slm = do -- The implementation guarantees that all elements will be caught either here, -- or by the delta-callback: unWrapPar $ do SLM.foldlWithKey LI.liftIO (\() k v -> forkHP (Just hp) $ callback k v) () slm x <- action -- Any additional puts here trigger the callback. IV.put_ res x WrapPar $ L.addHandler (Just hp) (unWrapLVar lv) initCB deltCB -- We additionally have to quiesce here because we fork the inital set of -- callbacks on their own threads: quiesce hp IV.get res -- | Add an (asynchronous) callback that listens for all new key/value pairs -- added to the map, optionally tied to a handler pool. forEachHP :: Maybe HandlerPool -- ^ optional pool to enroll in -> IMap k s v -- ^ Map to listen to -> (k -> v -> Par d s ()) -- ^ callback -> Par d s () forEachHP mh (IMap (WrapLVar lv)) callb = WrapPar $ L.addHandler mh lv globalCB (\(k,v) -> return$ Just$ unWrapPar$ callb k v) where gcallb k v = do logDbgLn 5 " [SLMap] callback from global traversal " callb k v globalCB slm = do unWrapPar $ do logDbgLn 5 " [SLMap] Beginning fold to check for global-work" SLM.foldlWithKey LI.liftIO (\() k v -> forkHP mh $ gcallb k v) () slm -- | Add an (asynchronous) callback that listens for all new new key/value pairs added to -- the map. forEach :: IMap k s v -> (k -> v -> Par d s ()) -> Par d s () forEach = forEachHP Nothing -- | Put a single entry into the map. (WHNF) Strict in the key and value. insert :: (Ord k, Eq v) => k -> v -> IMap k s v -> Par d s () insert !key !elm (IMap (WrapLVar lv)) = WrapPar$ putLV lv putter where putter slm = do putRes <- SLM.putIfAbsent slm key $ return elm case putRes of Added _ -> return $ Just (key, elm) Found _ -> throw$ ConflictingPutExn$ "Multiple puts to one entry in an IMap!" -- | `IMap`s containing other LVars have some additional capabilities compared to -- those containing regular Haskell data. In particular, it is possible to modify -- existing entries (monotonically). Further, this `modify` function implicitly -- inserts a \"bottom\" element if there is no existing entry for the key. -- modify :: forall f a b d s key . (Ord key, LVarData1 f, Show key, Ord a) => IMap key s (f s a) -> key -- ^ The key to lookup. -> (Par d s (f s a)) -- ^ Create a new \"bottom\" element whenever an entry is not present. -> (f s a -> Par d s b) -- ^ The computation to apply on the right-hand side of the keyed entry. -> Par d s b modify (IMap (WrapLVar lv)) key newBottom fn = do act <- WrapPar $ putLV_ lv putter act where putter slm = do putRes <- unWrapPar $ SLM.putIfAbsent slm key newBottom case putRes of Added v -> return (Just (key,v), fn v) Found v -> return (Nothing, fn v) -- | Wait for the map to contain a specified key, and return the associated value. getKey :: Ord k => k -> IMap k s v -> Par d s v getKey !key (IMap (WrapLVar lv)) = WrapPar$ getLV lv globalThresh deltaThresh where globalThresh slm _frzn = SLM.find slm key deltaThresh (k,v) | k == key = return $ Just v | otherwise = return Nothing -- | Wait until the map contains a certain value (on any key). waitValue :: (Ord k, Eq v) => v -> IMap k s v -> Par d s () waitValue !val (IMap (WrapLVar lv)) = WrapPar$ getLV lv globalThresh deltaThresh where deltaThresh (_,v) | v == val = return$ Just () | otherwise = return Nothing globalThresh ref _frzn = do let slm = L.state lv let fn Nothing _k v | v == val = return $! Just () | otherwise = return $ Nothing fn just _ _ = return $! just -- This is inefficient. -- FIXME: no short-circuit for this fold: SLM.foldlWithKey id fn Nothing slm -- | Wait on the SIZE of the map, not its contents. waitSize :: Int -> IMap k s v -> Par d s () waitSize !sz (IMap (WrapLVar lv)) = WrapPar $ getLV lv globalThresh deltaThresh where globalThresh slm _ = do snapSize <- SLM.foldlWithKey id (\n _ _ -> return $ n+1) 0 slm case snapSize >= sz of True -> return (Just ()) False -> return (Nothing) -- Here's an example of a situation where we CANNOT TELL if a delta puts it over -- the threshold.a deltaThresh _ = globalThresh (L.state lv) False -- | Get the exact contents of the map. As with any -- quasi-deterministic operation, using `freezeMap` may cause your -- program to exhibit a limited form of nondeterminism: it will never -- return the wrong answer, but it may include synchronization bugs -- that can (nondeterministically) cause exceptions. -- -- This is an /O(1)/ operation that doesn't copy the in-memory representation of the -- IMap. freezeMap :: Ord k => IMap k s v -> QPar s (IMap k Frzn v) -- freezeMap (IMap (WrapLVar lv)) = return (IMap (WrapLVar lv)) -- OR we can just do this: freezeMap x@(IMap (WrapLVar lv)) = WrapPar $ do freezeLV lv -- For the final deepFreeze at the end of a runpar we can actually skip -- the freezeLV part.... return (unsafeCoerce# x) -- | Traverse a frozen map for side effect. This is useful (in comparison with more -- generic operations) because the function passed in may see the key as well as the -- value. traverseFrzn_ :: (Ord k) => (k -> a -> Par d s ()) -> IMap k Frzn a -> Par d s () traverseFrzn_ fn (IMap (WrapLVar lv)) = SLM.foldlWithKey LI.liftIO (\ () k v -> fn k v) () (L.state lv) -------------------------------------------------------------------------------- -- Higher level routines that could (mostly) be defined using the above interface. -------------------------------------------------------------------------------- -- | Establish a monotonic map between the input and output map Produce a new result -- based on each element, while leaving the keys the same. traverseMap :: (Ord k, Eq b) => (k -> a -> Par d s b) -> IMap k s a -> Par d s (IMap k s b) traverseMap f s = traverseMapHP Nothing f s -- | An imperative-style, in-place version of 'traverseMap' that takes the output map -- as an argument. traverseMap_ :: (Ord k, Eq b) => (k -> a -> Par d s b) -> IMap k s a -> IMap k s b -> Par d s () traverseMap_ f s o = traverseMapHP_ Nothing f s o -------------------------------------------------------------------------------- -- Alternate versions of functions that EXPOSE the HandlerPools -------------------------------------------------------------------------------- -- | Return a fresh map which will contain strictly more elements than the input. -- That is, things put in the former go in the latter, but not vice versa. copy :: (Ord k, Eq v) => IMap k s v -> Par d s (IMap k s v) copy = traverseMap (\ _ x -> return x) -- | Variant of `traverseMap` that optionally ties the handlers to a pool. traverseMapHP :: (Ord k, Eq b) => Maybe HandlerPool -> (k -> a -> Par d s b) -> IMap k s a -> Par d s (IMap k s b) traverseMapHP mh fn set = do os <- newEmptyMap traverseMapHP_ mh fn set os return os -- | Variant of `traverseMap_` that optionally ties the handlers to a pool. traverseMapHP_ :: (Ord k, Eq b) => Maybe HandlerPool -> (k -> a -> Par d s b) -> IMap k s a -> IMap k s b -> Par d s () traverseMapHP_ mh fn set os = do forEachHP mh set $ \ k x -> do x' <- fn k x insert k x' os -- | Return a new map which will (ultimately) contain everything in either input -- map. Conflicting entries will result in a multiple put exception. -- Optionally ties the handlers to a pool. unionHP :: (Ord k, Eq a) => Maybe HandlerPool -> IMap k s a -> IMap k s a -> Par d s (IMap k s a) unionHP mh m1 m2 = do os <- newEmptyMap forEachHP mh m1 (\ k v -> insert k v os) forEachHP mh m2 (\ k v -> insert k v os) return os levelCounts :: IMap k s a -> IO [Int] levelCounts (IMap (WrapLVar lv)) = let slm = L.state lv in SLM.counts slm -------------------------------------------------------------------------------- -- Operations on frozen Maps -------------------------------------------------------------------------------- -- As with all LVars, after freezing, map elements can be consumed. In -- the case of this `IMap` implementation, it need only be `Frzn`, not -- `Trvrsbl`. instance F.Foldable (IMap k Frzn) where -- Note: making these strict for now: foldr fn zer (IMap (WrapLVar lv)) = -- TODO: this isn't a fold RIGHT, it's a fold left. Need to fix that: unsafeDupablePerformIO $ SLM.foldlWithKey id (\ a _k v -> return (fn v a)) zer (L.state lv) -- Of course, the stronger `Trvrsbl` state is still fine for folding. instance F.Foldable (IMap k Trvrsbl) where foldr fn zer mp = F.foldr fn zer (castFrzn mp) #ifdef GENERIC_PAR #warning "Creating instances for generic programming with IMaps" instance PC.Generator (IMap k Frzn a) where type ElemOf (IMap k Frzn a) = (k,a) {-# INLINE fold #-} fold fn zer (IMap (WrapLVar lv)) = unsafeDupablePerformIO $ SLM.foldlWithKey id (\ a k v -> return $! fn a (k,v)) zer (L.state lv) {-# INLINE foldMP #-} -- | More efficient, not requiring unsafePerformIO or risk of duplication. foldMP fn zer (IMap (WrapLVar lv)) = SLM.foldlWithKey internalLiftIO (\ a k v -> fn a (k,v)) zer (L.state lv) instance Show k => PC.ParFoldable (IMap k Frzn a) where {-# INLINE pmapFold #-} -- Can't split directly but can slice and then split: pmapFold mfn rfn initAcc (IMap lv) = do let slm = state lv slc = SLM.toSlice slm -- Is it worth using unsafeDupablePerformIO here? Or is the granularity large -- enough that we might as well use unsafePerformIO? splitter s = -- Some unfortunate conversion between protocols: case unsafeDupablePerformIO (SLM.splitSlice s) of Nothing -> [s] Just (s1,s2) -> [s1,s2] -- Ideally we could liftIO into the Par monad here. seqfold fn zer (SLM.Slice slm st en) = do internalLiftIO $ putStrLn $ "[DBG] dropping to seqfold.., st/en: "++show (st,en) -- FIXME: Fold over only the range in the slice: SLM.foldlWithKey internalLiftIO (\ a k v -> fn a (k,v)) zer slm internalLiftIO $ putStrLn$ "[DBG] pmapFold on frzn IMap... calling mkMapReduce" mkMapReduce splitter seqfold PC.spawn_ slc mfn rfn initAcc -- UNSAFE! It is naughty if this instance escapes to the outside world, which it can... instance F.Foldable (SLMapSlice k) where #endif instance (Show k, Show a) => Show (IMap k Frzn a) where show (IMap (WrapLVar lv)) = "{IMap: " ++ (concat $ intersperse ", " $ unsafeDupablePerformIO $ SLM.foldlWithKey id (\ acc k v -> return$ show (k, v) : acc) [] (L.state lv) ) ++ "}" -- | For convenience only; the user could define this. instance (Show k, Show a) => Show (IMap k Trvrsbl a) where show lv = show (castFrzn lv) -------------------------------------------------------------------------------- -- #ifdef GENERIC_PAR -- Not exported yet: #if 0 instance PC.ParIMap (Par d s) where type PC.IMap (Par d s) k = IMap k s type PC.IMapContents (Par d s) k v = (Ord k, Eq v) PC.waitSize = waitSize PC.newEmptyMap = newEmptyMap PC.insert = insert PC.getKey = getKey #endif