module Math.HiddenMarkovModel.Private where
import qualified Math.HiddenMarkovModel.Distribution as Distr
import qualified Math.HiddenMarkovModel.CSV as HMMCSV
import Math.HiddenMarkovModel.Distribution (State(State))
import qualified Numeric.LinearAlgebra.Algorithms as Algo
import qualified Numeric.LinearAlgebra.Util as LinAlg
import qualified Numeric.Container as NC
import qualified Data.Packed.Development as Dev
import qualified Data.Packed.Matrix as Matrix
import qualified Data.Packed.Vector as Vector
import Numeric.Container ((<>))
import Data.Packed.Matrix (Matrix)
import Data.Packed.Vector (Vector)
import qualified Data.NonEmpty.Class as NonEmptyC
import qualified Data.NonEmpty as NonEmpty
import qualified Data.Semigroup as Sg
import qualified Data.List as List
import Data.Traversable (Traversable, mapAccumL)
import Data.Tuple.HT (mapPair, mapFst, mapSnd, swap)
data T distr prob =
Cons {
initial :: Vector prob,
transition :: Matrix prob,
distribution :: distr
}
deriving (Show, Read)
emission ::
(Distr.EmissionProb distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission) =>
T distr prob -> emission -> Vector prob
emission = Distr.emissionProb . distribution
forward ::
(Distr.EmissionProb distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f) =>
T distr prob -> NonEmpty.T f emission -> prob
forward hmm = NC.sumElements . NonEmpty.last . alpha hmm
alpha ::
(Distr.EmissionProb distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f) =>
T distr prob ->
NonEmpty.T f emission -> NonEmpty.T f (Vector prob)
alpha hmm (NonEmpty.Cons x xs) =
NonEmpty.scanl
(\alphai xi -> NC.mul (emission hmm xi) (transition hmm <> alphai))
(NC.mul (emission hmm x) (initial hmm))
xs
backward ::
(Distr.EmissionProb distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f) =>
T distr prob -> NonEmpty.T f emission -> prob
backward hmm (NonEmpty.Cons x xs) =
NC.sumElements $
NC.mul (initial hmm) $
NC.mul (emission hmm x) $
NonEmpty.head $ beta hmm xs
beta ::
(Distr.EmissionProb distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f) =>
T distr prob ->
f emission -> NonEmpty.T f (Vector prob)
beta hmm =
NonEmpty.scanr
(\xi betai -> NC.mul (emission hmm xi) betai <> transition hmm)
(NC.constant 1 (NC.dim $ initial hmm))
alphaBeta ::
(Distr.EmissionProb distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f) =>
T distr prob ->
NonEmpty.T f emission ->
(prob, NonEmpty.T f (Vector prob), NonEmpty.T f (Vector prob))
alphaBeta hmm xs =
let alphas = alpha hmm xs
betas = beta hmm $ NonEmpty.tail xs
recipLikelihood = recip $ NC.sumElements $ NonEmpty.last alphas
in (recipLikelihood, alphas, betas)
xiFromAlphaBeta ::
(Distr.EmissionProb distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission) =>
T distr prob -> prob ->
NonEmpty.T [] emission ->
NonEmpty.T [] (Vector prob) ->
NonEmpty.T [] (Vector prob) ->
[Matrix prob]
xiFromAlphaBeta hmm recipLikelihood xs alphas betas =
zipWith3
(\x alpha0 beta1 ->
NC.scale recipLikelihood $
NC.mul
(NC.outer (NC.mul (emission hmm x) beta1) alpha0)
(transition hmm))
(NonEmpty.tail xs)
(NonEmpty.init alphas)
(NonEmpty.tail betas)
zetaFromXi ::
(Distr.Probability distr ~ prob, Num prob, NC.Product prob) =>
T distr prob -> [Matrix prob] -> [Vector prob]
zetaFromXi hmm xis =
map (NC.constant 1 (Matrix.rows $ transition hmm) <>) xis
zetaFromAlphaBeta ::
(NC.Container Vector prob) =>
prob ->
NonEmpty.T [] (Vector prob) ->
NonEmpty.T [] (Vector prob) ->
NonEmpty.T [] (Vector prob)
zetaFromAlphaBeta recipLikelihood alphas betas =
fmap (NC.scale recipLikelihood) $
NonEmptyC.zipWith NC.mul alphas betas
reveal ::
(Distr.EmissionProb distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f) =>
T distr prob -> NonEmpty.T f emission -> NonEmpty.T f State
reveal hmm (NonEmpty.Cons x xs) =
fmap State $
uncurry (NonEmpty.scanr Dev.at') $
mapFst NC.maxIndex $
mapAccumL
(\alphai xi ->
swap $ mapSnd (NC.mul (emission hmm xi)) $
matrixMaxMul (transition hmm) alphai)
(NC.mul (emission hmm x) (initial hmm)) xs
matrixMaxMul ::
(NC.Container Vector a) =>
Matrix a -> Vector a -> (Vector Int, Vector a)
matrixMaxMul m v =
mapPair (Vector.fromList, Vector.fromList) $ unzip $
map ((\x -> (NC.maxIndex x, NC.maxElement x)) . NC.mul v) $
Matrix.toRows m
data Trained distr prob =
Trained {
trainedInitial :: Vector prob,
trainedTransition :: Matrix prob,
trainedDistribution :: distr
}
deriving (Show, Read)
sumTransitions ::
(NC.Container Vector e, Num e) =>
T distr t -> [Matrix e] -> Matrix e
sumTransitions hmm =
List.foldl' NC.add (NC.konst 0 $ LinAlg.size $ transition hmm)
trainUnsupervised ::
(Distr.Estimate tdistr, Distr.Distribution tdistr ~ distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission) =>
T distr prob -> NonEmpty.T [] emission -> Trained tdistr prob
trainUnsupervised hmm xs =
let (recipLikelihood, alphas, betas) = alphaBeta hmm xs
zetas = zetaFromAlphaBeta recipLikelihood alphas betas
in Trained {
trainedInitial = NonEmpty.head zetas,
trainedTransition =
sumTransitions hmm $
xiFromAlphaBeta hmm recipLikelihood xs alphas betas,
trainedDistribution =
Distr.accumulateEmissions $ map (zip (NonEmpty.flatten xs)) $
List.transpose $ map Vector.toList $ NonEmpty.flatten zetas
}
mergeTrained ::
(Distr.Estimate tdistr, Distr.Distribution tdistr ~ distr,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission) =>
Trained tdistr prob -> Trained tdistr prob -> Trained tdistr prob
mergeTrained hmm0 hmm1 =
Trained {
trainedInitial = NC.add (trainedInitial hmm0) (trainedInitial hmm1),
trainedTransition =
NC.add (trainedTransition hmm0) (trainedTransition hmm1),
trainedDistribution =
Distr.combine
(trainedDistribution hmm0) (trainedDistribution hmm1)
}
instance
(Distr.Estimate tdistr, Distr.Distribution tdistr ~ distr,
Distr.Probability distr ~ prob) =>
Sg.Semigroup (Trained tdistr prob) where
(<>) = mergeTrained
toCells ::
(Distr.CSV distr, Algo.Field prob, Show prob) =>
T distr prob -> [[String]]
toCells hmm =
(HMMCSV.cellsFromVector $ initial hmm) :
(HMMCSV.cellsFromMatrix $ transition hmm) ++
[] :
(Distr.toCells $ distribution hmm)
parseCSV ::
(Distr.CSV distr, Algo.Field prob, Read prob) =>
HMMCSV.CSVParser (T distr prob)
parseCSV = do
v <- HMMCSV.parseNonEmptyVectorCells
m <- HMMCSV.parseSquareMatrixCells $ Vector.dim v
HMMCSV.skipEmptyRow
distr <- Distr.parseCells $ Vector.dim v
return $ Cons {
initial = v,
transition = m,
distribution = distr
}