Duarte, Hu, and Young (2019) JFE Code
PIN Code
Note: This code is provided as-is, and this write-up is for illustrative purposes. Since the publication of the paper we have received numerous requests for code in different languages, and I decided to revisit the code, update it for Python 3 and make it available for those that are interested in learning how the estimation works.
This code runs on the WRDS Cloud and prepares the data and does the estimation for the models of information asymmetry found in Duarte, Hu, and Young (2019) JFE. Unlike the paper, this data is based on the WRDS Intraday Indicators, but otherwise the variable construction and filtering are very similar.
Prepare Data
This SAS code constructs the yearly stock-day files necessary to estimate the various structural models. To save time, I am using various SAS macros that can be found here.
It requires access to CRSP (for market cap), COMPUSTAT (for book values), and TAQ—specifically the intraday indicators to get daily order imbalance, volume, and intraday and overnight returns.
The final file will be out.taqdfx_all6
.
/* this first piece merges CRSP/COMPUSTAT */ %INCLUDE "~/git/sas/CC_LINK.sas"; %CC_LINK(dsetin=comp.funda, dsetout=compx, datevar=datadate, keep_vars=at lt); data crspm6; set crsp.msf; where month(date)=6; ME6=abs(prc*shrout); keep permno date ME6; data crspm; set crsp.msf; ME=abs(prc*shrout); datadate=date; keep permno datadate date ME; run; /* MERGE_ASOF merges the most recent observation in dataset B into dataset A */ %INCLUDE "~/git/sas/MERGE_ASOF.sas"; %MERGE_ASOF(a=crspm,b=crspm6, merged=crspm2, datevar=date, num_vars=ME6); %MERGE_ASOF(a=crspm2,b=compx, merged=crspm3, datevar=datadate, num_vars=BE ME_COMP at lt gp); data crspm3; set crspm3; BM = BE/ME6; bm_log = log(BM); me_log = log(ME); run; proc print data=crspm3(obs=25) width=min; where permno=11850 and year(date) between 1993 and 2018;; var permno date me: bm:;run; /* This macro creates yearly stock-day files pulling from both master files and then WRDS IID for the second-level TAQ data */ %MACRO TAQ_OWR_GPIN(yyyy=2004); data work.mastm_&yyyy. ; set %if &yyyy > 1993 %then %do; taq.mast_%SYSEVALF(&yyyy.-1): %end; taq.mast_&yyyy.: taq.mast_%SYSEVALF(&yyyy.+1):; SYM_ROOT=scan(SYMBOL, 1, ' '); SYM_SUFFIX=scan(SYMBOL, 2, ' '); DATE=coalesce(FDATE,DATEF); format date yymmdd10.; run; proc sort data=work.mastm_&yyyy. NODUPKEY; by SYMBOL DATE; run; proc sql; create table work.mastm_crsp_&yyyy. as select a.date, sym_root, sym_suffix, symbol, substr(coalesce(b.ncusip, b.cusip),1,8) as cusip8, a.permno, a.permco, shrcd, exchcd, a.prc, a.ret, a.retx, a.shrout, a.vol, c.divamt, c.distcd, coalesce(e.SP500,0) as SP500 from crsp.dsf a left join crsp.dsenames b on a.permno = b.permno and a.date between b.namedt and coalesce(b.nameendt, today()) left join crsp.dsedist c on a.permno = c.permno and a.date = c.paydt left join (select distinct cusip, sym_root, sym_suffix, symbol, min(date) as mindt, max(date) as maxdt from work.mastm_&yyyy. group by cusip, sym_root, sym_suffix, symbol) d on substr(d.cusip,1,8) = substr(coalesce(b.ncusip, b.cusip),1,8) and a.date ge d.mindt and a.date le coalesce(d.maxdt,today()) left join (select *, 1 as SP500 from crsp.dsp500list) e on a.permno = e.permno and a.date between e.start and e.ending where year(a.date) = &yyyy. and symbol is not null order by a.date, sym_root, sym_suffix; quit; proc sort data=work.mastm_crsp_&yyyy. nodupkey; by date sym_root sym_suffix; run; proc sort data=taq.wrds_iid_&yyyy. out=work.wrds_iid_&yyyy.; by date symbol; run; data work.taqdf_&yyyy.; length date 8; merge work.wrds_iid_&yyyy.(keep=date symbol buynumtrades_lri sellnumtrades_lri FPrice OPrice CPrc: ret_mkt_t vwap_m SumVolume_m SumVolume_b SumVolume_a) work.mastm_crsp_&yyyy.; by date symbol; /* make names consistent with TAQMSEC */ CCPrc = abs(coalesce(prc,cprc,cprc2)); mid_after_open = coalesce((oprice+fprice)/2,oprice,fprice); y_e = divide(buynumtrades_lri-sellnumtrades_lri,buynumtrades_lri+sellnumtrades_lri); symbol_15=symbol; rename buynumtrades_lri = n_buys sellnumtrades_lri = n_sells vwap_m = vw_price_m ret_mkt_t = ret_mkt_m SumVolume_m = total_vol_m SumVolume_b = total_vol_b SumVolume_a = total_vol_a; label CCPrc='Closing Price (CRSP or TAQ)' y_e='Order Imbalance (%)'; run; proc sort data=work.taqdf_&yyyy. out=taqdf_&yyyy.x nodupkey; by permno date; where permno > .Z and shrcd in (10,11) and exchcd in (1,2,3,4); run; %MEND; /* This macro creates yearly stock-day files pulling from both master files and then WRDS IID for the millisecond-level TAQ data */ %MACRO TAQM_OWR_GPIN(yyyy=2014); %let sysyear= %sysfunc(year("&sysdate"d)); data work.mast1_&yyyy.; length date 8 sym_root $6 sym_suffix $10 symbol_15 $15; set taqmsec.mastm_%SYSEVALF(&yyyy.-1): taqmsec.mastm_&yyyy.: %if %SYSEVALF(&yyyy.+1) <= &sysyear. %then %do; taqmsec.mastm_%SYSEVALF(&yyyy.+1): %end;; SYM_ROOT=scan(SYMBOL_15, 1, ' '); SYM_SUFFIX=scan(SYMBOL_15, 2, ' '); keep date cusip sym_root sym_suffix symbol_15; run; data work.mast2_&yyyy. ; length date 8 sym_root $6 sym_suffix $10 symbol_15 $15; set taq.mast_%SYSEVALF(&yyyy.-1): taq.mast_&yyyy.: %if %SYSEVALF(&yyyy.+1) <= &sysyear. %then %do; taq.mast_%SYSEVALF(&yyyy.+1): %end;; SYM_ROOT=scan(SYMBOL, 1, ' '); SYM_SUFFIX=scan(SYMBOL, 2, ' '); DATE=coalesce(DATE,FDATE,DATEF); SYMBOL_15=coalescec(SYMBOL_15,SYMBOL); keep date cusip sym_root sym_suffix symbol_15; run; data work.mastm_&yyyy.; length date 8 cusip $12 sym_root $6 sym_suffix $10 symbol_15 $15; set work.mast1_&yyyy. work.mast2_&yyyy.; run; proc sort data=work.mastm_&yyyy. NODUPKEY; by SYM_ROOT SYM_SUFFIX DATE; run; proc sql; create table work.mastm_crsp_&yyyy. as select a.date, sym_root, sym_suffix, symbol_15, substr(coalesce(b.ncusip, b.cusip),1,8) as cusip8, a.permno, a.permco, shrcd, exchcd, a.prc, a.ret, a.retx, a.shrout, a.vol, c.divamt, c.distcd, coalesce(e.SP500,0) as SP500 from crsp.dsf a left join crsp.dsenames b on a.permno = b.permno and a.date between b.namedt and coalesce(b.nameendt, today()) left join crsp.dsedist c on a.permno = c.permno and a.date = c.paydt left join (select distinct cusip, sym_root, sym_suffix, symbol_15, min(date) as mindt, max(date) as maxdt from work.mastm_&yyyy. group by cusip, sym_root, sym_suffix, symbol_15) d on substr(d.cusip,1,8) = substr(coalesce(b.ncusip, b.cusip),1,8) and a.date ge d.mindt and a.date le coalesce(d.maxdt,today()) left join (select *, 1 as SP500 from crsp.dsp500list) e on a.permno = e.permno and a.date between e.start and e.ending where year(a.date) = &yyyy. and symbol_15 is not null order by a.date, sym_root, sym_suffix; quit; proc sort data=work.mastm_crsp_&yyyy. nodupkey; by date sym_root sym_suffix; run; proc sort data=taqmsec.wrds_iid_&yyyy. out=work.wrds_iid_&yyyy.; by date sym_root sym_suffix; run; data work.taqdf_&yyyy.; length date 8 sym_root $6 sym_suffix $10; merge work.wrds_iid_&yyyy.(keep=date sym_root sym_suffix buynumtrades_lr sellnumtrades_lr oprc cprc ret_mkt_m vw_price_m mid_after_open total_vol_m total_vol_b total_vol_a) work.mastm_crsp_&yyyy.; by date sym_root sym_suffix; CCPrc = abs(coalesce(prc,cprc)); y_e = divide(buynumtrades_lr-sellnumtrades_lr,buynumtrades_lr+sellnumtrades_lr); rename buynumtrades_lr=n_buys sellnumtrades_lr=n_sells; label CCPrc='Closing Price (CRSP or TAQ)' y_e='Order Imbalance (%)'; run; proc sort data=work.taqdf_&yyyy. out=taqdf_&yyyy.x nodupkey; by permno date; where permno > .Z and shrcd in (10,11) and exchcd in (1,2,3,4); run; %MEND; %TAQ_OWR_GPIN(yyyy=1993); %TAQ_OWR_GPIN(yyyy=1994); %TAQ_OWR_GPIN(yyyy=1995); %TAQ_OWR_GPIN(yyyy=1996); %TAQ_OWR_GPIN(yyyy=1997); %TAQ_OWR_GPIN(yyyy=1998); %TAQ_OWR_GPIN(yyyy=1999); %TAQ_OWR_GPIN(yyyy=2000); %TAQ_OWR_GPIN(yyyy=2001); %TAQ_OWR_GPIN(yyyy=2002); %TAQ_OWR_GPIN(yyyy=2003); %TAQ_OWR_GPIN(yyyy=2004); %TAQ_OWR_GPIN(yyyy=2005); %TAQ_OWR_GPIN(yyyy=2006); /* NMS Implementation Feb 2007 */ %TAQM_OWR_GPIN(yyyy=2007); %TAQM_OWR_GPIN(yyyy=2008); %TAQM_OWR_GPIN(yyyy=2009); %TAQM_OWR_GPIN(yyyy=2010); %TAQM_OWR_GPIN(yyyy=2011); %TAQM_OWR_GPIN(yyyy=2012); %TAQM_OWR_GPIN(yyyy=2013); %TAQM_OWR_GPIN(yyyy=2014); %TAQM_OWR_GPIN(yyyy=2015); %TAQM_OWR_GPIN(yyyy=2016); %TAQM_OWR_GPIN(yyyy=2017); %TAQM_OWR_GPIN(yyyy=2018); %TAQM_OWR_GPIN(yyyy=2019); data taqdfx_all; set taqdf_:; run; proc sql; create table taqdfx_all1 as select a.*, b.vwretd, b.vwretx from taqdfx_all a left join crsp.dsiy b on a.date = b.caldt order by a.permno, a.date; quit; /* Compute and adjust OWR variables */ proc printto log='/dev/null';run; proc expand data=taqdfx_all1 out=taqdfx_all2 method=none; by permno; convert y_e = y_eL1 / transformout = (lag 1); convert ccprc = CCPrcL1 / transformout = (lag 1); convert mid_after_open = omF1 / transformout = (lead 1); run; proc printto;run; %put expand &syslast. done; data taqdfx_all2; set taqdfx_all2; yyyy=year(date); r_d = (vw_price_m-mid_after_open+coalesce(divamt,0))/mid_after_open; r_o = (omF1-vw_price_m)/mid_after_open; run; %MERGE_ASOF(a=taqdfx_all2,b=crspm3, merged=taqdfx_all3, datevar=date, num_vars=bm_log me_log); proc printto log='/dev/null';run; proc reg data=taqdfx_all3 outest=_beta (drop=_: retx rename=(Intercept=alpha vwretx=beta)) noprint; by permno yyyy; model retx = vwretx; run; proc printto;run; data taqdfx_all4; merge taqdfx_all3 _beta; by permno yyyy; run; proc sort data=taqdfx_all4 nodupkey; by date permno; run; proc printto log='/dev/null';run; proc reg data=taqdfx_all4 noprint; model r_o r_d = beta me_log bm_log; output out=_ret_resid(keep=permno date ur_o ur_d) r=ur_o ur_d; model y_e = y_eL1 me_log; output out=_oib_resid(keep=permno date uy_e) r=uy_e; by date; run; proc printto;run; data taqdfx_all5; merge taqdfx_all4 _ret_resid _oib_resid; by date permno; run; %INCLUDE "~/git/sas/WINSORIZE_TRUNCATE.sas"; %WINSORIZE_TRUNCATE(dsetin = taqdfx_all5, dsetout = taqdfx_all6, byvar = date, vars = ur_o ur_d, type = W, pctl = 1 99, filter = and exchcd eq 1); /* Output files */ proc sort data=taqdfx_all6 out=out.taqdfx_all6(compress=no) nodupkey; by permno date; proc sort data=crspm3 out=out.crspm3 nodupkey; by permno date; run;
This python script loads the SAS file and writes it to a PyTables HDF5
file, a data format that is much better suited for multiple read/write
and query. This will allow for much easier parallelization (see
est.py
).
The last piece actually shows an example of estimating three of the models. Given the raw data, we try one iteration for XOM in 2015, and get as output a dictionary of parameter estimates. We’ll get into this later after going through the model code.
import os import pandas as pd from importlib import reload os.chdir('/home/nyu/eddyhu/git/pin-code') import eo_model as eo import gpin_model as gpin import owr_model as owr # setup data df = pd.read_sas('/scratch/nyu/hue/taqdfx_all6.sas7bdat') df['yyyy'] = df.yyyy.astype('int') df['date'] = df.DATE df['permno'] = df.permno.astype('int') df['ticker'] = df.symbol_15.str.decode('UTF-8') df.set_index('permno yyyy'.split(),inplace=True) c = df.groupby(level=(0,1))\ ['n_buys n_sells ur_d ur_o uy_e'.split()]\ .count().min(axis=1) c.name = 'count_min' df1 = df.join(c) df1.loc[df1.count_min>=230]\ ['date ticker n_buys n_sells ur_d ur_o uy_e'.split()]\ .to_hdf('/scratch/nyu/hue/taqdf_1319.h5','data',format='table') d = pd.read_hdf('/scratch/nyu/hue/taqdf_1319.h5',where='permno==11850 & yyyy==2015') # rest run of each model eo.fit(d.n_buys,d.n_sells,starts=1) gpin.fit(d.n_buys,d.n_sells,starts=1) owr.fit(d.uy_e,d.ur_d,d.ur_o,starts=1)
Model code
The model code includes eo_model.py
, dy_model.py
, gpin_model.py
,
and owr_model.py
. These files also rely on some utility files like
common.py
and regressions.py
.
To make things simple we will start with eo_model.py
as it is the
simplest model and code. The code for dy
and gpin
are nearly
structurally identical to eo
, except for differences in
parameterization, the degree of involvement in running simulations,
and the likelihood functions.
I will describe owr_model.py
in detail as it involves quite a few
optimization tricks.
EOModel
Let’s start with the import statements. Because Python is a general
purpose programming language, we will need to import the mathematical
functions that we need, including basics like log
, exponential
,
etc. common.py
also imports and defines some functions like the log
factorial
using the gammaln
function from scipy.
# numpy for matrix algebra import numpy as np from numpy import log, exp # some scipy special mathematical functions from scipy.special import logsumexp from scipy.linalg import inv # this is the main optimization library import scipy.optimize as op # import common functions from common import *
Each model is defined as a Python Class. A Python Class is an object
that we define, which contains attributes (data) and methods
(functions). In the EOModel
attributes include the parameters:
α, δ, ε, etc.; and the methods include functions
that simulate the PIN model, define the likelihood functions, and run
the model estimation (fit()
).
Every Class needs to have an __init__()
function, which sets up the
model Class. Let’s take a look at the Class definition.
class EOModel(object): # because we are defining custom models, we are subclassing the most generic Python object def __init__(self,a,d,es,eb,u,n=1,t=252): # here we describe the EOModel parameters """Initializes parameters of an Easley and O'Hara Sequential Trade Model a : $\alpha$, the unconditional probability of an information event d : $\delta$, the unconditional probability of good news es : $\epsilon_s$, the average number of sells on a day with no news eb : $\epsilon_b$, the average number of buys on a day with no news u : $\mu$, the average number of (additional) trades on a day with news n : the number of stocks to simulate, default 1 t : the number of periods to simulate, default 252 (one trading year) """ # Assign model parameters self.a, self.d, self.es, self.eb, self.u, self.N, self.T = a, d, es, eb, u, n, t self.states = self._draw_states() self.buys = np.random.poisson((eb+(self.states == 1)*u)) self.sells = np.random.poisson((es+(self.states == -1)*u)) self.alpha = compute_alpha(a, d, eb, es, u, self.buys, self.sells)
In addition to the standard PIN model parameters, our class includes n, the number of stocks to simulate, and t, the number of periods to simulate.
We can initialize an EOModel
like this:
a = 0.41 d = 0.58 es = 2719 eb = 2672 u = 2700 N = 1000 T = 252 model = EOModel(a,d,es,eb,u,n=N,t=T)
Behind the scenes this will initialize an instance of a PIN model, and
will simulate 1000 stock-year observations (252 days in a trading
year). This happens because the __init__()
function draws the states
and then draws buys and sells from Poisson
distributions. _draw_states()
works by drawing independent binomials
based on the probability of an event α, and probability of good
nes δ.
def _draw_states(self): """Draws the states for N stocks and T periods. In the Easley and O'Hara sequential trade model at the beginning of each period nature determines whether there is an information event with probability $\alpha$ (a). If there is information, nature determines whether the signal is good news with probability $\delta$ (d) or bad news $1-\delta$ (1-d). A quick way to implement this is to draw all of the event states at once as an `NxT` matrix from a binomial distribution with $p=\alpha$, and independently draw all of the news states as an `NxT` matrix from a binomial with $p=\delta$. An information event occurs for stock i on day t if `events[i][t]=1`, and zero otherwise. The news is good if `news[i][t]=1` and bad if `news[i][t]=-1`. The element-wise product of `events` with `news` gives a complete description of the states for the sequential trade model, where the state variable can take the values (-1,0,1) for bad news, no news, and good news respectively. self : EOSequentialTradeModel instance which contains parameter definitions """ events = np.random.binomial(1, self.a, (self.N,self.T)) news = np.random.binomial(1, self.d, (self.N,self.T)) news[news == 0] = -1 states = events*news return states
The last step, compute_alpha
is a function that will compute CPIEs
for real or simulated data. The computation of the CPIE depends on the
likelihood function definitions.
def _lf(eb, es, n_buys, n_sells): return -eb+n_buys*log(eb)-lfact(n_buys)-es+n_sells*log(es)-lfact(n_sells) def _ll(a, d, eb, es, u, n_buys, n_sells): return np.array([log(a*(1-d))+_lf(eb,es+u,n_buys,n_sells), log(a*d)+_lf(eb+u,es,n_buys,n_sells), log(1-a)+_lf(eb,es,n_buys,n_sells)]) def compute_alpha(a, d, eb, es, u, n_buys, n_sells): '''Compute the conditional alpha given parameters, buys, and sells. ''' ll = _ll(a, d, eb, es, u, n_buys, n_sells) llmax = ll.max(axis=0) y = exp(ll-llmax) alpha = y[:-1].sum(axis=0)/y.sum(axis=0) return alpha def loglik(theta, n_buys, n_sells): a,d,eb,es,u = theta ll = _ll(a, d, eb, es, u, n_buys, n_sells) return sum(logsumexp(ll,axis=0))
_lf()
is a function that represents the Poisson log-likelihood which
is common to each of the three states: good, bad, and no news.
_ll()
is a function that represents the full vector of
log-likelihoods for the PIN model.
compute_alpha()
computes CPIEs, using a numerical trick. We compute
the vector of likelihoods by calling _ll()
, we get a vector of the
max across the three states, and then we scale the vector of
likelihoods by the max before computing the ratio that represents the
CPIE.
Finally, loglik()
computes the total likelihood that will be used in
the optimization.
At this point you are probably wondering why some these functions are
named with underscores (_
) in front, and others are not. In Python
this indicates that these are “hidden” functions. This is helpful for
users that are exploring the code interactively, as we want them to
only see/interact with the higher-level functions, like
compute_alpha
and loglik
.
The actual estimation is handled by the fit()
function.
The fit()
function does a number of things that are seemingly
complex, but necessary to get the numerical optimization to work well.
For instance we have up to 10 random starts
, and we will try each
optimization up to maxiter=100
times.
def fit(n_buys, n_sells, starts=10, maxiter=100, a=None, d=None, eb=None, es=None, u=None, se=None, **kwargs): nll = lambda *args: -loglik(*args) # define the negative log likelihood that we will minimize bounds = [(0.00001,0.99999)]*2+[(0.00001,np.inf)]*3 # we will do a constrained optimization ranges = [(0.00001,0.99999)]*2 # we will define the min-max range for our random guesses # if we do not have a prior on what the estimates are, we compute them here a0,d0 = [x or 0.5 for x in (a,d)] # 50% chance of information/news eb0,es0 = eb or np.mean(n_buys), es or np.mean(n_sells) # expected buys/sells = mean of observed buy/sells oib = n_buys - n_sells u0 = u or np.mean(abs(oib)) # expected order imbalance = mean of absolute order imbalance res_final = [a0,d0,eb0,es0,u0] # define the vector that will hold all the parameters stderr = np.zeros_like(res_final) # define the vector that will hold our standard errors f = nll(res_final,n_buys,n_sells) # initialize the log likelihood function with the buys/sells data for i in range(starts): # rc is going to be our return code rc = -1 j = 0 while (rc != 0) & (j <= maxiter): if (None in (res_final)) or i: # guess parameters a0,d0 = [np.random.uniform(l,np.nan_to_num(h)) for (l,h) in ranges] eb0,es0,u0 = np.random.poisson([eb,es,u]) # do actual optimization here res = op.minimize(nll, [a0,d0,eb0,es0,u0], method=None, bounds=bounds, args=(n_buys,n_sells)) rc = res['status'] # see if the optimization step violated any constraints check_bounds = list(imap(lambda x,y: x in y, res['x'], bounds)) if any(check_bounds): rc = 3 j+=1 if (res['success']) & (res['fun'] <= f): # if everything worked fine and we have a # smaller (negative) likelihood then store these parameters f,rc = res['fun'],res['status'] res_final = res['x'].tolist() # and compute standard errors stderr = 1/np.sqrt(inv(res['hess_inv'].todense()).diagonal()) # output the final parameter estimates param_names = ['a','d','eb','es','u'] output = dict(zip(param_names+['f','rc'], res_final+[f,rc])) if se: output = {'params': dict(zip(param_names,res_final)), 'se': dict(zip(param_names,stderr)), 'stats':{'f': f,'rc': rc} } return output
The last function is cpie_mech()
which is very simple for EOModel
:
a dummy variable for whether observed turnover is higher than the
average.
def cpie_mech(turn): mech = np.zeros_like(turn) mech[turn > turn.mean()] = 1 return mech
The last piece defines the behavior for when you try to run
eo_model.py
as a stand-alone script. In this case it simulates an
example PIN model and runs regressions based on the simulated data to
show how the model identifies information. This was part of an older
version of our paper but is useful for building intuition.
if __name__ == '__main__': import pandas as pd from regressions import * a = 0.41 d = 0.58 es = 2719 eb = 2672 u = 2700 N = 1000 T = 252 model = EOModel(a,d,es,eb,u,n=N,t=T) buys = to_series(model.buys) sells = to_series(model.sells) aoib = abs(buys-sells) turn = buys+sells alpha = to_series(model.alpha) def run_regs(df): # run regression m = [] m.append(partial_r2(df['alpha'],df[['aoib','aoib2']], df[['aoib','aoib2','turn','turn2']])) out = pd.DataFrame(m, columns=['results']) out.index.names = ['model'] return out regtab = pd.DataFrame({'alpha':alpha,'aoib':aoib,'aoib2':aoib**2,'turn':turn,'turn2':turn**2}) res = run_regs(regtab) print(est_tab(res.results, est=['params','tvalues'], stats=['rsquared','rsquared_sp']))
OWRModel
Let’s start again with the import statement.
from __future__ import division # numpy for matrix algebra import numpy as np from numpy import log, exp import pandas as pd from scipy.special import logsumexp import scipy.optimize as op # optimization from numba import jit from common import *
Many of the libraries we need are the same (numpy
, scipy
). For
convenience we also import pandas
to make handling the data a bit
easier, although the code could be re-written without it.
The most important new library is numba
from which we import the
jit
: just-in-time compiler to compile some of our matrix algebra
function loops.
The Class __init__()
is analogous to that of EOModel
, but the
parameterization is much more complex because we are dealing with
multivariate normal distributions.
class OWRModel(object): def __init__(self,a,su,sz,si,spd,spo,n=1,t=252): """Initializes parameters of an Odders-White and Ready (2008) Sequential Trade Model a: $\alpha$, the unconditional probability of an information event ... #TODO# """ # Assign model parameters self.a, self.su, self.sz, self.si, self.spd, self.spo, self.N, self.T = a, su, sz, si, spd, spo, n, t self.s_n, self.s_e = _compute_cov(a, su, sz, si, spd, spo) # pre-computing the dets and invs saves a lot of time self.dsn, self.isn = det3(self.s_n), inv3(self.s_n) self.dse, self.ise = det3(self.s_e), inv3(self.s_e) self.dsd = self.dsn/self.dse self.isd = self.isn - self.ise self.states = np.random.binomial(1, a, n*t) mean = [0]*3 x_n = np.random.multivariate_normal(mean,self.s_n,n*t).T x_e = np.random.multivariate_normal(mean,self.s_e,n*t).T self.x = x_n*self.states+x_e+(1-self.states) self.oib, self.ret_d, self.ret_o = self.x.reshape(3,n,t) self.alpha = _compute_alpha(self.x, self.a,self.dsd,self.isd)\ .reshape((n,t))
Here I want to highlight two functions in particular: det3
and
inv3
. Based on extensive profiling I found that pre-computing the
determinants and inverses saved me a lot of time. This makes sense
because these are expensive functions that you do not want to compute
each time you need one of these matrices. Furthermore, because these
are only 3x3 matrices, I further sped things up by hand-coding the
matrix algebra. Python’s numpy
matrix algebra library is fast, but a
general-purpose matrix algebra function will never be as fast as
dead-simple hand-coded matrix algebra computation.
This may seem like overkill, but when you think about how many times these functions could potentially be called in the optimization loop, you will realize how quickly the computation time can add up.
def det2(a): return (a[0][0] * a[1][1]) - (a[0][1] * a[1][0]) def det3(a): return (a[0][0] * (a[1][1] * a[2][2] - a[2][1] * a[1][2]) -a[1][0] * (a[0][1] * a[2][2] - a[2][1] * a[0][2]) +a[2][0] * (a[0][1] * a[1][2] - a[1][1] * a[0][2])) def inv3(a): invdet = 1/det3(a) m = np.zeros((3,3)) m[0, 0] = a[1, 1] * a[2, 2] - a[2, 1] * a[1, 2] m[0, 1] = a[0, 2] * a[2, 1] - a[0, 1] * a[2, 2] m[0, 2] = a[0, 1] * a[1, 2] - a[0, 2] * a[1, 1] m[1, 0] = a[1, 2] * a[2, 0] - a[1, 0] * a[2, 2] m[1, 1] = a[0, 0] * a[2, 2] - a[0, 2] * a[2, 0] m[1, 2] = a[1, 0] * a[0, 2] - a[0, 0] * a[1, 2] m[2, 0] = a[1, 0] * a[2, 1] - a[2, 0] * a[1, 1] m[2, 1] = a[2, 0] * a[0, 1] - a[0, 0] * a[2, 1] m[2, 2] = a[0, 0] * a[1, 1] - a[1, 0] * a[0, 1] return m*invdet
Next let’s jump ahead a bit and talk about the OWRModel
’s _lf()
function, along with its corresponding speed-up trick.
def _lf(x, det, inv): return -0.5*log(det)-0.5*_qvmv(x,inv) @jit def _qvmv(x,A): """Computes x'Ax. """ m,n = A.shape qsum = 0 for i in range(m): for j in range(n): qsum += A[i,j] * x[i] * x[j] return qsum
The likelihood for a multivariate normal is dead simple to write. It
involves only the vector of data x
, and the determinant
and
inverse
of the covariance matrix based on the model parameters
(which we have pre-computed).
The only “tricky” part is the matrix-vector multiplication x'Ax
,
where A
is the inverse of the covariance matrix. That is because we
will have to call the numpy
matrix multiplication function twice,
which is expensive because it is a general purpose function.
As before, we can write our own dead-simple version, replacing two
multiplication calls with two loops! That is where _qvmv()
comes in.
Of course, loops in Python are much slower than loops in a language
like C++
. But we can get near-C
speeds for very simple code by
compiling the function with numba
jit
. All this takes is adding
the @jit
decorator on top of the function which tells python to
compile this piece of code and running the compiled version.
Note: Python is an interpreted language, not a compiled one. It makes it much easier to write, and you usually never have to compile, but it can also make it slow. Hence we only need to compile a few pieces of code to see dramatic speed-ups while maintaining maximum flexibility.
Finally, to see how everything comes together, let’s look at loglik()
:
def loglik(theta, oib, ret_d, ret_o): a, su, sz, si, spd, spo = theta s_n, s_e = _compute_cov(a, su, sz, si, spd, spo) dsn, isn = det3(s_n), inv3(s_n) dse, ise = det3(s_e), inv3(s_e) x = np.array([oib,ret_d,ret_o]) t = x.shape[1] ll = np.zeros((2,t)) for i in range(t): ll[:,i] = log(a)+_lf(x[:,i],dse,ise), log(1-a)+_lf(x[:,i],dsn,isn) return sum(logsumexp(ll,axis=0))
Here we end up computing det
and inv
again. This is necessary
because each iteration in the optimization process we change the
vector of parameters theta
, and therefore need new determinants and
inverses.
And that’s pretty much it! The OWRModel
has its own fit
based on
the specifics of the model, but there are no new tricks.
Estimation code
In this section we go through the setup for estimating the models in
parallel. The main estimation code, which will call a given model’s
fit()
function is est.py
.
Let’s look at the import statement. Parallelization is done using the
ipyparallel
library. As mentioned before the data is stored in
PyTables, so we will use two libraries to work with this data:
pandas
and tables
. os
handles operating system functions like
changing working directories/making new files or folders. argparse
parses the input arguments so that we can call the estimation script
like this: python est.py owr 2015
to estimate the owr model in 2015
without having to write additional code.
import ipyparallel as ipp import pandas as pd import tables as tb import os import argparse
The next piece handles the parsing of the arguments. The model name comes first, then the year.
parser = argparse.ArgumentParser(description='Model and year to estimate.') parser.add_argument('model', type=str, nargs='?', default='gpin') parser.add_argument('year', type=int, nargs='?', default=2014) args = parser.parse_args() print(vars(args))
The next piece sets up the ipyparallel
client, and finds the
necessary data. Rather than actually send the data to the worker node,
we will just tell the worker where the data starts and ends (finding
the index idx
) so that it knows where to get it. This reduces memory
overhead.
rc = ipp.Client(cluster_id="{0}-{1}".format(args.model,args.year)) print(len(rc)) dv = rc[:] dv.push(vars(args)) lv = rc.load_balanced_view() h5 = tb.open_file('/scratch/nyu/hue/taqdf_1319.h5', mode='r') df = h5.get_node('/data/table') idx = list(set(filter(lambda x: x[1]==args.year, zip(df.col('permno'),df.col('yyyy')))))
Finally, we define the actual function that each worker node will run
est()
. Because each worker node is independent it needs its own
import statements, connection to the data, etc. Then all it has to do
is call the right fit()
function, and write the resulting parameter
estimates to disk.
@ipp.interactive def est(x): import os import pandas as pd import tables as tb import json os.chdir('/home/nyu/eddyhu/git/pin-code') import eo_model as eo import gpin_model as gpin import owr_model as owr d = pd.read_hdf('/scratch/nyu/hue/taqdf_1319.h5', where='permno=={permno} & yyyy=={yyyy}'.format(permno=x[0], yyyy=x[1])) d = d.dropna() if model == 'eo': r = eo.fit(d.n_buys, d.n_sells, starts=1) if model == 'gpin': r = gpin.fit(d.n_buys, d.n_sells, starts=1) if model == 'owr': r = owr.fit(d.uy_e, d.ur_d, d.ur_o, starts=5) r.update({'permno':int(x[0]),'yyyy':int(x[1])}) fname = '/home/nyu/eddyhu/pin-estimates/{model}/{permno}-{yyyy}.json'.format(model=model, permno=x[0], yyyy=x[1]) with open(fname, 'w') as outfile: json.dump(r, outfile) return r res = lv.map_async(est, idx) res.wait()
The final piece is run_est.sh
: a shell script which starts the
ipyparallel
cluster, calls est.py
, and shuts down the cluster once
we finish all the stocks in a given year.
#!/bin/bash #$ -cwd #$ -m abe #$ -pe onenode 4 #$ -M [your@email.com] model=$1;shift year=$1;shift source ~/miniconda3/bin/activate ipcluster start -n 7 --cluster-id="$model-$year" & sleep 45 ipython est.py $model $year ipcluster stop
There are a few tricks here that are worth pointing out. The header is
actually read as instructions to the UNIVA
grid engine. -cwd
tells
the job scheduler to start each worker in the current directory, which
is where our scripts are stored. -m abe
tells the scheduler to send
a message in the event of a job abort, error, etc. -pe 4
requests 4
job nodes, which thanks to hyperthreading gives us 8 processes. -M
[your@email.com]
tells the scheduler to send status update emails to
me.
In the actual body of the script we grab the arguments, load up our
anaconda environment, start ipcluster
which manages the
ipyparallel
cluster, wait 45 seconds for the clsuter to start, then
run our estimation script.
Note that we are only using -n 7
compute nodes, as we are leaving
one for the cluster manager. Also we call the script using ipython
rather than python
. This is not strictly necessary, but gives us
some more flexibility in case we want to utilize ipython
specific
convenience functions that are not available in base
python
. However, I have written the code to work with base python.
run_est.sh
is called similarly to est.py
, as it is really just a
wrapper for the grid engine:
qsub run_est.sh owr 2015
To collect the estimates we can run a quick jq script to make a csv file.
cd ~/git/pin-estimates/gpin jq -r '[.permno, .yyyy, .a, .p, .eta, .r, .d, .th] | @csv' *.json > gpin_1319.csv