module Nettle.FRPControl.NetInfo
(
SwitchTable,
SwitchRecord(..),
PortTable,
PortRecord(..),
HasDataPathID(..),
NetInfo,
portTable,
switchTable,
NetworkMonitorPolicy(..),
defaultMonitorPolicy,
networkInfoRequester,
switchInfo,
HostDirectionMap,
hostDirectionsSF,
hostDirectionsChangeSF,
HostLocationMap,
hostLocationSF,
withPortStats,
portRatesMapSF,
nAveragePortRateMap
) where
import Nettle.OpenFlow.Messages hiding (Features)
import Nettle.OpenFlow.Switch hiding (SwitchFeatures(..))
import qualified Nettle.OpenFlow.Switch as M
import qualified Nettle.OpenFlow.Port as P
import Nettle.OpenFlow.Port hiding (Port,portID)
import Nettle.OpenFlow.Action
import Nettle.OpenFlow.Packet
import Nettle.OpenFlow.Statistics hiding (StatsReply(..))
import Nettle.Ethernet.EthernetAddress
import Nettle.IPv4.IPPacket
import Nettle.FRPControl.NettleSF
import Nettle.Discovery.Topology
import Data.List as List
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Monoid
type SwitchTable = [ SwitchRecord ]
data SwitchRecord
= SwitchRecord
{ switchID :: SwitchID
, packetBufferSize :: Integer
, numberFlowTables :: Integer
, capabilities :: [SwitchCapability]
, supportedActions :: [ActionType]
} deriving (Show,Eq,Ord)
type PortTable = [ PortRecord ]
data PortRecord =
PortRecord { portSwitch :: SwitchID
, portID :: PortID
, portAddr :: EthernetAddress
, spanningTreeState :: SpanningTreePortState
, isPortDown :: Bool
, isLinkDown :: Bool
, isUsedForFlooding :: Bool
} deriving (Show, Eq)
class HasDataPathID a where
dPID :: a -> SwitchID
instance HasDataPathID SwitchID where
dPID = id
instance HasDataPathID PortRecord where
dPID = portSwitch
newtype NetInfo = NetInfo (Map SwitchID M.SwitchFeatures) deriving Show
switchTable :: NetInfo -> SwitchTable
switchTable (NetInfo smap) =
[ SwitchRecord { switchID = id,
packetBufferSize = packetBufferSize,
numberFlowTables = numberFlowTables,
capabilities = capabilities,
supportedActions = supportedActions
}
| (id, M.SwitchFeatures {..}) <- Map.assocs smap ]
portTable :: NetInfo -> PortTable
portTable (NetInfo smap)
= concat $
Map.elems $
Map.map f smap
where f sfr = [ PortRecord { portSwitch = M.switchID sfr,
portID = portNumber,
portAddr = portAddr,
spanningTreeState = portState,
isPortDown = PortDown `elem` portConfig,
isLinkDown = linkDown,
isUsedForFlooding = not (NoFlooding `elem` portConfig)
}
| P.Port portNumber portName portAddr portConfig linkDown portState _ _ _ _ <- M.ports sfr,
portNumber <= maxNumberPorts
]
type HostDirectionMap = Map (SwitchID, EthernetAddress) PortID
hostDirectionsSF :: SF (Event (SwitchID, SwitchMessage)) HostDirectionMap
hostDirectionsSF = hostDirectionsChangeSF >>> arr (liftE snd) >>> hold Map.empty
hostDirectionsChangeSF :: SF (Event (SwitchID, SwitchMessage)) (Event (HostDirectionMap, HostDirectionMap))
hostDirectionsChangeSF = arr packetInE >>> accumFilter learn Map.empty
where learn dict (sid, pktRecord) =
case packetInFrame pktRecord of
Left msg -> (dict, Nothing)
Right frame ->
let src = sourceAddress frame
port' = receivedOnPort pktRecord
dict' = Map.insert (sid,src) port' dict
in case Map.lookup (sid, src) dict of
Nothing -> (dict', Just (dict, dict'))
Just port -> if port == port'
then (dict, Nothing)
else (dict', Just (dict,dict'))
type HostLocationMap = Map EthernetAddress (SwitchID, PortID)
hostLocationSF :: SF (Event (SwitchID, SwitchMessage), Topology) HostLocationMap
hostLocationSF = proc (i, topology) -> do
hold Map.empty <<< accumBy learn Map.empty -< packetInE i `attach` topology
where learn hlMap ((dpid, pktInfo), topology) =
if (dpid, inPort) `portInTopology` topology
then hlMap
else case packetInFrame pktInfo of
Left str -> hlMap
Right frame -> Map.insert (sourceAddress frame) (dpid, inPort) hlMap
where inPort = receivedOnPort pktInfo
portInTopology :: Port -> Topology -> Bool
portInTopology port = not . Set.null . Set.filter (port `Set.member`)
data NetworkMonitorPolicy
= NetworkMonitorPolicy { switchFeaturesRefreshPeriod :: Time
, portStatisticsRefreshPeriod :: Time
} deriving (Show,Eq)
defaultMonitorPolicy :: NetworkMonitorPolicy
defaultMonitorPolicy =
NetworkMonitorPolicy { switchFeaturesRefreshPeriod = 10
, portStatisticsRefreshPeriod = 5
}
networkInfoRequester :: NetworkMonitorPolicy ->
SF (Event (SwitchID, SwitchMessage)) (Event SwitchCommand)
networkInfoRequester policy =
proc i -> do
dpids <- activeSwitches -< i
cmdE <- switchFeatureMonitor (switchFeaturesRefreshPeriod policy) -< (i, dpids)
cmdE' <- requestPortFlows (portStatisticsRefreshPeriod policy) -< (i, dpids)
returnA -< mergeBy (<+>) cmdE cmdE'
switchFeatureMonitor :: Time -> SF (i, Set SwitchID) (Event SwitchCommand)
switchFeatureMonitor refreshPeriod =
proc (evt,dpids) -> do
timeOut <- repeatedly refreshPeriod () -< ()
returnA -< tag timeOut (mconcat [ requestFeatures swid | swid <- Set.toList dpids ])
activeSwitches :: SF (Event (SwitchID, SwitchMessage)) (Set SwitchID)
activeSwitches = proc i -> do
hold Set.empty <<< accum Set.empty -< (liftE (\(dpid,_) -> Set.insert dpid) (arrivalE i) `lMerge`
liftE (Set.delete . fst) (departureE i)
)
switchInfo :: SF (Event (SwitchID, SwitchMessage)) NetInfo
switchInfo = proc i -> do
let update = liftE (\(sw, sfr) -> Map.insert sw sfr) (arrivalE i) `lMerge`
liftE (\(sw, _) -> Map.delete sw) (departureE i) `lMerge`
liftE (\(sw, sfr) -> Map.insert sw sfr) (featureUpdateE i)
arr NetInfo <<< hold Map.empty <<< accum Map.empty -< update
requestPortFlows :: Time -> SF (Event (SwitchID, SwitchMessage), Set SwitchID) (Event SwitchCommand)
#if OPENFLOW_VERSION==151 || OPENFLOW_VERSION==152
requestPortFlows refreshPeriod =
proc (i, dpids) -> do
clock <- repeatedly refreshPeriod () -< ()
let periodicQuery = tag clock ( mconcat [requestStats dpid PortStatsRequest | dpid <- Set.toList dpids])
let joinQuery = liftE (\(dpid,_) -> requestStats dpid PortStatsRequest) (arrivalE i)
writeToSwitch -< mergeBy (<+>) joinQuery periodicQuery
#endif
#if OPENFLOW_VERSION==1
requestPortFlows refreshPeriod =
proc (i, dpids) -> do
clock <- repeatedly refreshPeriod () -< ()
let periodicQuery = tag clock ( mconcat [requestStats dpid (PortStatsRequest AllPorts) | dpid <- Set.toList dpids])
let joinQuery = liftE (\(dpid,_) -> requestStats dpid (PortStatsRequest AllPorts)) (arrivalE i)
returnA -< mergeBy (<+>) joinQuery periodicQuery
#endif
withPortStats :: SF (Event PortStats) a -> SF (Event (SwitchID, SwitchMessage)) (Map (SwitchID, PortID) a)
withPortStats sf = proc i -> do
let inserts = liftE (\(swid,sfr) -> Map.union (newMap swid sfr)) (arrivalE i)
let deletes = liftE (\(swid,_) -> Map.filterWithKey (\(swid',_) _ -> swid /= swid')) (departureE i)
rpSwitchB Map.empty -< (i, inserts `lMerge` deletes)
where newMap swid sfr = Map.fromList [ ((swid, pid), statsForPort swid pid >>> sf)
| p <- M.ports sfr, let pid = P.portID p, pid <= maxNumberPorts ]
statsForPort :: SwitchID -> PortID -> SF (Event (SwitchID, SwitchMessage)) (Event PortStats)
statsForPort dpid pid =
proc i -> do
returnA -< mapFilterE f (portStatReplyE i)
where f (swid, ports)
| swid /= dpid = Nothing
| swid == dpid = lookup pid ports
portRatesMapSF :: SF (Event (SwitchID, SwitchMessage)) (Map (SwitchID, PortID) PortStats)
portRatesMapSF = withPortStats (oneStepDifferenceSF >>> hold nullPortStats)
averageRateMap :: Int -> SF (Event (SwitchID, SwitchMessage)) (Map (SwitchID, PortID) PortStats)
averageRateMap n = withPortStats (averageRateN n >>> hold nullPortStats)
averageRateN :: Int -> SF (Event PortStats) (Event PortStats)
averageRateN n =
proc aEvent -> do
t <- time -< ()
accumFilter f [] -< aEvent `attach` t
where f xs (a,t) = (xs', y)
where y = if length xs' == n
then Just (liftIntoPortStats1 (/dt) vdiff)
else Nothing
xs' = (a,t) : take (n1) xs
vdiff = liftIntoPortStats2 () vmax vmin
(vmax,tmax) = head xs'
(vmin,tmin) = last xs'
dt = tmax tmin
nEvents :: Int -> SF (Event a) (Event [(Time,a)])
nEvents n = proc e -> do
t <- time -< ()
accum [] -< liftE (\a -> take n . ((t,a):)) e
nAverage :: Int -> SF (Event PortStats) (Event PortStats)
nAverage n = proc e -> do
arr (mapFilterE average) <<< nEvents n -< e
where average [] = Nothing
average ((t1,v1):tvs)
| null tvs = Nothing
| otherwise = let (tn,vn) = last tvs
dt = t1 tn
vdiff = liftIntoPortStats2 () v1 vn
slope = liftIntoPortStats1 (/dt) vdiff
in Just slope
nAveragePortRateMap :: Int -> SF (Event (SwitchID, SwitchMessage)) (Map (SwitchID, PortID) PortStats)
nAveragePortRateMap n = withPortStats (nAverage n >>> hold zeroPortStats)
smoothPortRateMap ::SF (Event (SwitchID, SwitchMessage)) (Map (SwitchID, PortID) PortStats)
smoothPortRateMap = withPortStats portRatesSmooth
portRatesSmooth :: SF (Event PortStats) PortStats
portRatesSmooth = oneStepDifferenceSF >>> oneStepDifferenceSF >>> hold zeroPortStats >>> integralPortStats
integralPortStats :: SF PortStats PortStats
integralPortStats = proc v -> do
receivedPackets' <- integral -< maybe 0 id (portStatsReceivedPackets v)
sentPackets' <- integral -< maybe 0 id (portStatsSentPackets v)
receivedBytes' <- integral -< maybe 0 id (portStatsReceivedBytes v)
sentBytes' <- integral -< maybe 0 id (portStatsSentBytes v)
receiverDropped' <- integral -< maybe 0 id (portStatsReceiverDropped v)
senderDropped' <- integral -< maybe 0 id (portStatsSenderDropped v)
receiveErrors' <- integral -< maybe 0 id (portStatsReceiveErrors v)
transmitErrors' <- integral -< maybe 0 id (portStatsTransmitError v)
receivedFrameErrors' <- integral -< maybe 0 id (portStatsReceivedFrameErrors v)
receiverOverrunError' <- integral -< maybe 0 id (portStatsReceiverOverrunError v)
receiverCRCError' <- integral -< maybe 0 id (portStatsReceiverCRCError v)
collisions' <- integral -< maybe 0 id (portStatsCollisions v)
let v' = PortStats { portStatsReceivedPackets = Just receivedPackets',
portStatsSentPackets = Just sentPackets',
portStatsReceivedBytes = Just receivedBytes',
portStatsSentBytes = Just sentBytes',
portStatsReceiverDropped = Just receiverDropped',
portStatsSenderDropped = Just senderDropped',
portStatsReceiveErrors = Just receiveErrors',
portStatsTransmitError = Just transmitErrors',
portStatsReceivedFrameErrors = Just receivedFrameErrors',
portStatsReceiverOverrunError = Just receiverOverrunError',
portStatsReceiverCRCError = Just receiverCRCError',
portStatsCollisions = Just collisions'
}
returnA -< v'
oneStepDifferenceSF :: SF (Event PortStats) (Event PortStats)
oneStepDifferenceSF =
proc statEvent -> do
statPair <- consecutiveEvents -< statEvent
returnA -< liftE slope statPair
where slope ((pr1, t1), (pr2, t2)) =
liftIntoPortStats2 (\a1 a2 -> (a2 a1) / (t2 t1)) pr1 pr2
consecutiveEvents :: SF (Event a) (Event ((a,Time),(a,Time)))
consecutiveEvents = proc aEvent -> do
t <- time -< ()
accumFilter f Nothing -< aEvent `attach` t
where f ma' (a,t) = (Just (a,t), mb)
where mb = case ma' of
Just (a',t') -> Just ((a',t'),(a,t))
Nothing -> Nothing