Source code for ormm.markov.main

from quantecon import MarkovChain
import numpy as np


[docs]def analyze_ctmc(states, rate_matrix, t=None, d=None, n=None, init=None): """ Perform Markov Analysis of continuous time discrete state markov chain (CTMC) process. Parameters ---------- states : array-like Vector-like of length M containing the values associated with the states, which must be homogeneous in type. If None, the values default to integers 0 through M-1. rate_matrix : array-like matrix of size MxM detailing the stationary probabilities of moving from one state to another. t : int Integer for the end time period for the transient probability analysis. If this is given, then either d or n must be given, and init must be given as well. d : float Float for the small delta (amount of time per step) for numerically solving the transient probabilities. Either this or n (number of steps) must be given (one can be inferred from the other and 't'). n : int Integer of the number of steps to take for numerically solving the transient probabilities. Either this or 'd' must be given. If both are given and they do not coincide with t (d*n = t), an error will be raised. init : array-like Vector-like of length n containing the initial state values for the transient probability analyis. This must be given if 't', 'd', or 'n' is given. Returns ------- analysis : dict Dictionary with results of markov analysis Raises ------ ValueError If some but not all of the required transient probability analysis arguments are given, or if t, d, and n are given, but their values are invalid (t = n * d). """ num_states = len(states) if init is not None: # transient solutions # have to use numerical approaches, no closed form in general # DTMC approximation (not embedded here) if (d is not None) and (n is None) and (t is not None): n = int(t / d) elif (d is None) and (n is not None) and (t is not None): d = t / n elif (d is not None) and (n is not None) and (t is None): t = n * d elif (d is not None) and (n is not None) and (t is not None): true_t = n * d if round(true_t) != t: raise ValueError("Given values for arguments 't', 'd', and" " 'n' are not compatible. t must be equal" " to n * d.") else: raise ValueError("Transient analysis requires two of the three " "arguments: 't' (end time), 'd' (delta), " "'n' (num steps).") if init is None: raise ValueError("Argument 't' was provided without 'init'." " Transient analysis requires 'init' for" " the initial state vector.") if ((d is not None) or (n is not None) or (t is not None)) and (init is None): raise ValueError("Argument 'init' was not provided, but other" " Transient analysis arguments were." " 'init' is required for Transient analysis.") # transition_rates_i is sum of transition rates out of state i transition_rates = rate_matrix.sum(axis=1) # sum across cols if t is not None: # P is state-transition matrix determined from rate matrix & d P = np.array([[1 - d * transition_rates[col] if row == col else d * rate_matrix[row, col] for col in states] for row in states]) # prob that system in state i at time t: q_i(t) # q(t) = [q_0(t), q_1(t), ..., q_(m-1)(t)] # Note sum(q_t) == 1 # transient sol. approx. at time t = n*d with DTMC # by solving following equation: # eq = q(n*d) == q(0)P^(n) # or q(n*d + d) == q(n*d)P q_step = np.matmul(init, np.linalg.matrix_power(P, n)) # Steady State probabilities # limit of q(t) as t goes to infinity # generator matrix gen_matrix = np.array([[-transition_rates[row] if row == col else rate_matrix[row, col] for col in states] for row in states]) # augmented generator matrix - replace first col with 1s gen_matrix[:, 0] = np.ones(num_states) # Solve linear equations for steady state probs unit_vector = np.zeros(num_states) unit_vector[0] = 1 steady_state = np.matmul(unit_vector.T, np.linalg.inv(gen_matrix)) analysis = {'transition_rates': transition_rates, 'P': P, 'init': init, 'transient': q_step, 'generator_matrix': gen_matrix, 'steady_state': steady_state} return analysis
[docs]def analyze_dtmc(P, states=None, sim_kwargs=None, trans_kwargs=None, cost_kwargs=None): """ Perform Markov Analysis of discrete time discrete state markov chain (DTMC) process. Parameters ---------- P : array-like The transition matrix. Must be of shape n x n. states : array-like Array_like of length n containing the values associated with the states, which must be homogeneous in type. If None, the values default to integers 0 through n-1. sim_kwargs : dict Dictionary of key word arguments to be passed to the simulation of the markov process. If None, then no simulation will be performed. These include ts_length (length of each simulation) and init (Initial state values). trans_kwargs : dict Dictionary of options for the transient probability analysis (tsa). If None is passed instead of dict, no tsa will be done. ts_length is the number of time periods to analyze, while init is the initial state probability vector. cost_kwargs : dict Dictionary of cost parameters for cost analysis. If None, then no cost analysis will be performed. These include state (vector of costs of being in each state), transition (matrix of costs of transitioning from one state to another), and num (number of these processes - total cost multiplied by this, default 1). Returns ------- analysis : dict Dictionary with results of markov analysis Raises ------ ValueError If sim_kwargs, trans_kwargs, or cost_kwargs is given, but their required arguments are not passed. These are described in the Notes section. Notes ----- The required arguments if the kwargs are passed are: * sim_kwargs: `ts_length` is required, the length of the sim. * trans_kwargs: `ts_length` is required, the number of periods to analyze * cost_kwargs: `state` and `transition` are required, which are the costs of being in any state and the costs of transitioning from one state to another. """ analysis = {} markov = MarkovChain(P, states) analysis["cdfs"] = markov.cdfs steady_state = markov.stationary_distributions[0] analysis["steady_state"] = {"output": steady_state} if sim_kwargs: if "ts_length" not in sim_kwargs: raise ValueError(("Required argument `ts_length` in sim_kwargs! " "None was given.")) if "init" not in sim_kwargs: sim_kwargs["init"] = None analysis["sim"] = {"kwargs": sim_kwargs, "output": markov.simulate(**sim_kwargs)} if trans_kwargs: if "ts_length" not in trans_kwargs: raise ValueError(("Required argument `ts_length` in trans_kwargs! " "None was given.")) if "init" not in trans_kwargs: trans_kwargs["init"] = None trans_probs = _transient_probs(P, states, **trans_kwargs) analysis["transient"] = {"kwargs": trans_kwargs, "output": trans_probs} if cost_kwargs: if "state" not in cost_kwargs: raise ValueError(("Required argument `state` in trans_kwargs! " "None was given.")) if "transition" not in cost_kwargs: raise ValueError(("Required argument `transition` in trans_kwargs!" " None was given.")) if "num" not in cost_kwargs: cost_kwargs["num"] = 1 # Cost of steady state cost_vector, cost_total = _cost_analysis(P, steady_state, **cost_kwargs) # Cost of transient analysis analysis["steady_state"]["cost"] = \ {"kwargs": cost_kwargs, "total": cost_total, "vector": cost_vector} if trans_kwargs: cost_vector, cost_total = _cost_analysis(P, trans_probs, **cost_kwargs) analysis["transient"]["cost"] = {"kwargs": cost_kwargs, "total": cost_total, "vector": cost_vector} return analysis
def _cost_analysis(P, probs, state, transition, num): """ Cost analysis for markov process probs could be transient or steady state """ # Get cost vector - element wise mult, sum across cols cost_vector = state + np.sum(transition * P, 1) # Calculate expected costs exp_cost = np.matmul(probs, cost_vector) total_cost = np.sum(exp_cost) * num return exp_cost, total_cost def _transient_probs(P, states, ts_length, init=None): """ Calculate transient probabilities q(n): probability dist at time n q(n) = q(n-1) * P """ if init is None: init = [np.random.choice(states, size=P.shape[0])] q = np.array([init]) # q_n = q_(n-1) * P for _ in range(1, ts_length + 1): q = np.vstack((q, np.matmul(q[-1], P))) return q