Duarte, Hu, and Young (2019) JFE Code

PIN Code

Note: This code is provided as-is, and this write-up is for illustrative purposes. Since the publication of the paper we have received numerous requests for code in different languages, and I decided to revisit the code, update it for Python 3 and make it available for those that are interested in learning how the estimation works.

This code runs on the WRDS Cloud and prepares the data and does the estimation for the models of information asymmetry found in Duarte, Hu, and Young (2019) JFE. Unlike the paper, this data is based on the WRDS Intraday Indicators, but otherwise the variable construction and filtering are very similar.

Prepare Data

This SAS code constructs the yearly stock-day files necessary to estimate the various structural models. To save time, I am using various SAS macros that can be found here.

It requires access to CRSP (for market cap), COMPUSTAT (for book values), and TAQ—specifically the intraday indicators to get daily order imbalance, volume, and intraday and overnight returns.

The final file will be out.taqdfx_all6.

/* this first piece merges CRSP/COMPUSTAT */

%INCLUDE "~/git/sas/CC_LINK.sas";
%CC_LINK(dsetin=comp.funda,
    dsetout=compx,
    datevar=datadate,
    keep_vars=at lt);

data crspm6;
    set crsp.msf;
    where month(date)=6;
    ME6=abs(prc*shrout);
    keep permno date ME6;
data crspm;
    set crsp.msf;
    ME=abs(prc*shrout);
    datadate=date;
    keep permno datadate date ME;
run;

/* MERGE_ASOF merges the most recent 
observation in dataset B into dataset A */
%INCLUDE "~/git/sas/MERGE_ASOF.sas";
%MERGE_ASOF(a=crspm,b=crspm6,
    merged=crspm2,
    datevar=date,
    num_vars=ME6);
%MERGE_ASOF(a=crspm2,b=compx,
    merged=crspm3,
    datevar=datadate,
    num_vars=BE ME_COMP at lt gp);
data crspm3;
    set crspm3;
    BM = BE/ME6;
    bm_log = log(BM);
    me_log = log(ME);
run;

proc print data=crspm3(obs=25) width=min;
    where permno=11850 and year(date) between 1993 and 2018;;
var permno date me: bm:;run;

/* This macro creates yearly stock-day files
pulling from both master files and then WRDS IID 
for the second-level TAQ data */
%MACRO TAQ_OWR_GPIN(yyyy=2004);
data work.mastm_&yyyy. ;
    set %if &yyyy > 1993
    %then %do;
    taq.mast_%SYSEVALF(&yyyy.-1):
    %end;
    taq.mast_&yyyy.:
    taq.mast_%SYSEVALF(&yyyy.+1):;
    SYM_ROOT=scan(SYMBOL, 1, ' ');
    SYM_SUFFIX=scan(SYMBOL, 2, ' ');
    DATE=coalesce(FDATE,DATEF);
    format date yymmdd10.;
run;
proc sort data=work.mastm_&yyyy. NODUPKEY;
    by SYMBOL DATE;
run;
proc sql;
    create table work.mastm_crsp_&yyyy. as
    select a.date, sym_root, sym_suffix, symbol,
    substr(coalesce(b.ncusip, b.cusip),1,8) as cusip8,
    a.permno, a.permco, shrcd, exchcd,
    a.prc, a.ret, a.retx, a.shrout, a.vol, c.divamt, c.distcd,
    coalesce(e.SP500,0) as SP500
    from crsp.dsf a
    left join
    crsp.dsenames b
    on a.permno = b.permno
    and a.date between b.namedt and coalesce(b.nameendt, today())
    left join
    crsp.dsedist c
    on a.permno = c.permno
    and a.date = c.paydt
    left join
    (select distinct cusip, sym_root, sym_suffix, symbol,
    min(date) as mindt, max(date) as maxdt
    from work.mastm_&yyyy.
    group by cusip, sym_root, sym_suffix, symbol) d
    on substr(d.cusip,1,8) = substr(coalesce(b.ncusip, b.cusip),1,8)
    and a.date ge d.mindt
    and a.date le coalesce(d.maxdt,today())
    left join
    (select *, 1 as SP500 from crsp.dsp500list) e
    on a.permno = e.permno
    and a.date between e.start and e.ending
    where year(a.date) = &yyyy.
    and symbol is not null
    order by a.date, sym_root, sym_suffix;
quit;
proc sort data=work.mastm_crsp_&yyyy. nodupkey;
    by date sym_root sym_suffix;
run;
proc sort data=taq.wrds_iid_&yyyy.
    out=work.wrds_iid_&yyyy.;
    by date symbol;
run;    
data work.taqdf_&yyyy.;
    length date 8;
    merge work.wrds_iid_&yyyy.(keep=date symbol
    buynumtrades_lri sellnumtrades_lri
    FPrice OPrice CPrc: ret_mkt_t
    vwap_m 
    SumVolume_m SumVolume_b SumVolume_a)
    work.mastm_crsp_&yyyy.;
    by date symbol;
    /* make names consistent with TAQMSEC */
    CCPrc = abs(coalesce(prc,cprc,cprc2));
    mid_after_open = coalesce((oprice+fprice)/2,oprice,fprice);
    y_e = divide(buynumtrades_lri-sellnumtrades_lri,buynumtrades_lri+sellnumtrades_lri);
    symbol_15=symbol;
     rename buynumtrades_lri = n_buys
    sellnumtrades_lri = n_sells
    vwap_m = vw_price_m
    ret_mkt_t = ret_mkt_m
    SumVolume_m = total_vol_m
    SumVolume_b = total_vol_b
    SumVolume_a = total_vol_a;
    label CCPrc='Closing Price (CRSP or TAQ)' y_e='Order Imbalance (%)';
run;
proc sort data=work.taqdf_&yyyy. out=taqdf_&yyyy.x nodupkey;
    by permno date;
    where permno > .Z
    and shrcd in (10,11)
    and exchcd in (1,2,3,4);
run;
%MEND;    

/* This macro creates yearly stock-day files
pulling from both master files and then WRDS IID 
for the millisecond-level TAQ data */
%MACRO TAQM_OWR_GPIN(yyyy=2014);
%let sysyear= %sysfunc(year("&sysdate"d));    
data work.mast1_&yyyy.;
    length date 8 sym_root $6 sym_suffix $10 symbol_15 $15;        
    set taqmsec.mastm_%SYSEVALF(&yyyy.-1):
    taqmsec.mastm_&yyyy.:
    %if %SYSEVALF(&yyyy.+1) <= &sysyear. %then %do;
    taqmsec.mastm_%SYSEVALF(&yyyy.+1):
    %end;;
    SYM_ROOT=scan(SYMBOL_15, 1, ' ');
    SYM_SUFFIX=scan(SYMBOL_15, 2, ' ');
    keep date cusip sym_root sym_suffix symbol_15;
    run;
data work.mast2_&yyyy. ;
    length date 8 sym_root $6 sym_suffix $10 symbol_15 $15;        
    set taq.mast_%SYSEVALF(&yyyy.-1):
    taq.mast_&yyyy.:
    %if %SYSEVALF(&yyyy.+1) <= &sysyear. %then %do;
    taq.mast_%SYSEVALF(&yyyy.+1):
    %end;;        
    SYM_ROOT=scan(SYMBOL, 1, ' ');
    SYM_SUFFIX=scan(SYMBOL, 2, ' ');
    DATE=coalesce(DATE,FDATE,DATEF);
    SYMBOL_15=coalescec(SYMBOL_15,SYMBOL);
    keep date cusip sym_root sym_suffix symbol_15;
run;
data work.mastm_&yyyy.;
    length date 8 cusip $12
    sym_root $6 sym_suffix $10 symbol_15 $15;    
    set work.mast1_&yyyy. work.mast2_&yyyy.;
run;
proc sort data=work.mastm_&yyyy. NODUPKEY;
    by SYM_ROOT SYM_SUFFIX DATE;
run;
proc sql;
    create table work.mastm_crsp_&yyyy. as
    select a.date, sym_root, sym_suffix, symbol_15,
    substr(coalesce(b.ncusip, b.cusip),1,8) as cusip8,
    a.permno, a.permco, shrcd, exchcd,
    a.prc, a.ret, a.retx, a.shrout, a.vol, c.divamt, c.distcd,
    coalesce(e.SP500,0) as SP500
    from crsp.dsf a
    left join
    crsp.dsenames b
    on a.permno = b.permno
    and a.date between b.namedt and coalesce(b.nameendt, today())
    left join
    crsp.dsedist c
    on a.permno = c.permno
    and a.date = c.paydt
    left join
    (select distinct cusip, sym_root, sym_suffix, symbol_15,
    min(date) as mindt, max(date) as maxdt
    from work.mastm_&yyyy.
    group by cusip, sym_root, sym_suffix, symbol_15) d
    on substr(d.cusip,1,8) = substr(coalesce(b.ncusip, b.cusip),1,8)
    and a.date ge d.mindt
    and a.date le coalesce(d.maxdt,today())
    left join
    (select *, 1 as SP500 from crsp.dsp500list) e
    on a.permno = e.permno
    and a.date between e.start and e.ending
    where year(a.date) = &yyyy.
    and symbol_15 is not null
    order by a.date, sym_root, sym_suffix;
quit;
proc sort data=work.mastm_crsp_&yyyy. nodupkey;
    by date sym_root sym_suffix;
run;
proc sort data=taqmsec.wrds_iid_&yyyy.
    out=work.wrds_iid_&yyyy.;
    by date sym_root sym_suffix;
run;        
data work.taqdf_&yyyy.;
    length date 8 sym_root $6 sym_suffix $10;
    merge work.wrds_iid_&yyyy.(keep=date sym_root sym_suffix
    buynumtrades_lr sellnumtrades_lr oprc cprc ret_mkt_m
    vw_price_m mid_after_open
    total_vol_m total_vol_b total_vol_a)
    work.mastm_crsp_&yyyy.;
    by date sym_root sym_suffix;
    CCPrc = abs(coalesce(prc,cprc));
    y_e = divide(buynumtrades_lr-sellnumtrades_lr,buynumtrades_lr+sellnumtrades_lr);
    rename buynumtrades_lr=n_buys sellnumtrades_lr=n_sells;
    label CCPrc='Closing Price (CRSP or TAQ)' y_e='Order Imbalance (%)';
run;
proc sort data=work.taqdf_&yyyy. out=taqdf_&yyyy.x nodupkey;
    by permno date;
    where permno > .Z
    and shrcd in (10,11)
    and exchcd in (1,2,3,4);
run;
%MEND;

%TAQ_OWR_GPIN(yyyy=1993);
%TAQ_OWR_GPIN(yyyy=1994);
%TAQ_OWR_GPIN(yyyy=1995);
%TAQ_OWR_GPIN(yyyy=1996);
%TAQ_OWR_GPIN(yyyy=1997);
%TAQ_OWR_GPIN(yyyy=1998);
%TAQ_OWR_GPIN(yyyy=1999);
%TAQ_OWR_GPIN(yyyy=2000);
%TAQ_OWR_GPIN(yyyy=2001);
%TAQ_OWR_GPIN(yyyy=2002);
%TAQ_OWR_GPIN(yyyy=2003);
%TAQ_OWR_GPIN(yyyy=2004);
%TAQ_OWR_GPIN(yyyy=2005);
%TAQ_OWR_GPIN(yyyy=2006);
/* NMS Implementation Feb 2007 */
%TAQM_OWR_GPIN(yyyy=2007);
%TAQM_OWR_GPIN(yyyy=2008);
%TAQM_OWR_GPIN(yyyy=2009);
%TAQM_OWR_GPIN(yyyy=2010);
%TAQM_OWR_GPIN(yyyy=2011);
%TAQM_OWR_GPIN(yyyy=2012);
%TAQM_OWR_GPIN(yyyy=2013);
%TAQM_OWR_GPIN(yyyy=2014);
%TAQM_OWR_GPIN(yyyy=2015);
%TAQM_OWR_GPIN(yyyy=2016);
%TAQM_OWR_GPIN(yyyy=2017);
%TAQM_OWR_GPIN(yyyy=2018);
%TAQM_OWR_GPIN(yyyy=2019);

data taqdfx_all;
    set taqdf_:;
run;

proc sql;
    create table taqdfx_all1 as
    select a.*, b.vwretd, b.vwretx
    from taqdfx_all a
    left join crsp.dsiy b
    on a.date = b.caldt
    order by a.permno, a.date;
quit;

/* Compute and adjust OWR variables */
proc printto log='/dev/null';run;
proc expand data=taqdfx_all1
    out=taqdfx_all2
    method=none;
    by permno;
    convert y_e = y_eL1 / transformout = (lag 1);
    convert ccprc = CCPrcL1 / transformout = (lag 1);
    convert mid_after_open = omF1 / transformout = (lead 1);
run;
proc printto;run;
%put expand &syslast. done;

data taqdfx_all2;
    set taqdfx_all2;
    yyyy=year(date);
    r_d = (vw_price_m-mid_after_open+coalesce(divamt,0))/mid_after_open;
    r_o = (omF1-vw_price_m)/mid_after_open;
run;

%MERGE_ASOF(a=taqdfx_all2,b=crspm3,
    merged=taqdfx_all3,
    datevar=date,
    num_vars=bm_log me_log);

proc printto log='/dev/null';run;
proc reg data=taqdfx_all3 outest=_beta
    (drop=_: retx rename=(Intercept=alpha vwretx=beta)) noprint;
    by permno yyyy;
    model retx = vwretx;
run;
proc printto;run;

data taqdfx_all4;
    merge taqdfx_all3 _beta;
    by permno yyyy;
run;
proc sort data=taqdfx_all4 nodupkey;
    by date permno;
run;

proc printto log='/dev/null';run;
proc reg data=taqdfx_all4 noprint;
      model r_o r_d = beta me_log bm_log;
      output out=_ret_resid(keep=permno date ur_o ur_d) r=ur_o ur_d;
      model y_e = y_eL1 me_log;
      output out=_oib_resid(keep=permno date uy_e) r=uy_e;
      by date;
run;
proc printto;run;

data taqdfx_all5;
    merge taqdfx_all4 _ret_resid _oib_resid;
    by date permno;
run;

%INCLUDE "~/git/sas/WINSORIZE_TRUNCATE.sas";
%WINSORIZE_TRUNCATE(dsetin = taqdfx_all5, 
    dsetout = taqdfx_all6, 
    byvar = date, 
    vars = ur_o ur_d, 
    type = W, 
    pctl = 1 99,
    filter = and exchcd eq 1);

/* Output files */
proc sort data=taqdfx_all6
    out=out.taqdfx_all6(compress=no) nodupkey;
    by permno date;
proc sort data=crspm3
    out=out.crspm3 nodupkey;
    by permno date;
run;

This python script loads the SAS file and writes it to a PyTables HDF5 file, a data format that is much better suited for multiple read/write and query. This will allow for much easier parallelization (see est.py).

The last piece actually shows an example of estimating three of the models. Given the raw data, we try one iteration for XOM in 2015, and get as output a dictionary of parameter estimates. We’ll get into this later after going through the model code.

import os
import pandas as pd
from importlib import reload
os.chdir('/home/nyu/eddyhu/git/pin-code')
import eo_model as eo
import gpin_model as gpin
import owr_model as owr

# setup data
df = pd.read_sas('/scratch/nyu/hue/taqdfx_all6.sas7bdat')
df['yyyy'] = df.yyyy.astype('int')
df['date'] = df.DATE
df['permno'] = df.permno.astype('int')
df['ticker'] = df.symbol_15.str.decode('UTF-8')
df.set_index('permno yyyy'.split(),inplace=True)
c = df.groupby(level=(0,1))\
    ['n_buys n_sells ur_d ur_o uy_e'.split()]\
    .count().min(axis=1)
c.name = 'count_min'
df1 = df.join(c)
df1.loc[df1.count_min>=230]\
    ['date ticker n_buys n_sells ur_d ur_o uy_e'.split()]\
    .to_hdf('/scratch/nyu/hue/taqdf_1319.h5','data',format='table')

d = pd.read_hdf('/scratch/nyu/hue/taqdf_1319.h5',where='permno==11850 & yyyy==2015')

# rest run of each model
eo.fit(d.n_buys,d.n_sells,starts=1)
gpin.fit(d.n_buys,d.n_sells,starts=1)
owr.fit(d.uy_e,d.ur_d,d.ur_o,starts=1)

Model code

The model code includes eo_model.py, dy_model.py, gpin_model.py, and owr_model.py. These files also rely on some utility files like common.py and regressions.py.

To make things simple we will start with eo_model.py as it is the simplest model and code. The code for dy and gpin are nearly structurally identical to eo, except for differences in parameterization, the degree of involvement in running simulations, and the likelihood functions.

I will describe owr_model.py in detail as it involves quite a few optimization tricks.

EOModel

Let’s start with the import statements. Because Python is a general purpose programming language, we will need to import the mathematical functions that we need, including basics like log, exponential, etc. common.py also imports and defines some functions like the log factorial using the gammaln function from scipy.

# numpy for matrix algebra
import numpy as np
from numpy import log, exp

# some scipy special mathematical functions
from scipy.special import logsumexp
from scipy.linalg import inv

# this is the main optimization library
import scipy.optimize as op

# import common functions
from common import *

Each model is defined as a Python Class. A Python Class is an object that we define, which contains attributes (data) and methods (functions). In the EOModel attributes include the parameters: α, δ, ε, etc.; and the methods include functions that simulate the PIN model, define the likelihood functions, and run the model estimation (fit()).

Every Class needs to have an __init__() function, which sets up the model Class. Let’s take a look at the Class definition.

class EOModel(object): # because we are defining custom models, we are subclassing the most generic Python object

    def __init__(self,a,d,es,eb,u,n=1,t=252): # here we describe the EOModel parameters
        """Initializes parameters of an Easley and O'Hara Sequential Trade Model
        
        a : $\alpha$, the unconditional probability of an information event
        d : $\delta$, the unconditional probability of good news
        es : $\epsilon_s$, the average number of sells on a day with no news
        eb : $\epsilon_b$, the average number of buys on a day with no news
        u : $\mu$, the average number of (additional) trades on a day with news

        n : the number of stocks to simulate, default 1
        t : the number of periods to simulate, default 252 (one trading year)
        """

        # Assign model parameters
        self.a, self.d, self.es, self.eb, self.u, self.N, self.T = a, d, es, eb, u, n, t
        self.states = self._draw_states()
        self.buys = np.random.poisson((eb+(self.states == 1)*u))
        self.sells = np.random.poisson((es+(self.states == -1)*u))
        self.alpha = compute_alpha(a, d, eb, es, u, self.buys, self.sells)

In addition to the standard PIN model parameters, our class includes n, the number of stocks to simulate, and t, the number of periods to simulate.

We can initialize an EOModel like this:

a = 0.41
d = 0.58
es = 2719
eb = 2672
u = 2700

N = 1000
T = 252

model = EOModel(a,d,es,eb,u,n=N,t=T)

Behind the scenes this will initialize an instance of a PIN model, and will simulate 1000 stock-year observations (252 days in a trading year). This happens because the __init__() function draws the states and then draws buys and sells from Poisson distributions. _draw_states() works by drawing independent binomials based on the probability of an event α, and probability of good nes δ.

    def _draw_states(self):
        """Draws the states for N stocks and T periods.

        In the Easley and O'Hara sequential trade model at the beginning of each period nature determines whether there is an information event with probability $\alpha$ (a). If there is information, nature determines whether the signal is good news with probability $\delta$ (d) or bad news $1-\delta$ (1-d).

        A quick way to implement this is to draw all of the event states at once as an `NxT` matrix from a binomial distribution with $p=\alpha$, and independently draw all of the news states as an `NxT` matrix from a binomial with $p=\delta$. 
        
        An information event occurs for stock i on day t if `events[i][t]=1`, and zero otherwise. The news is good if `news[i][t]=1` and bad if `news[i][t]=-1`. 

        The element-wise product of `events` with `news` gives a complete description of the states for the sequential trade model, where the state variable can take the values (-1,0,1) for bad news, no news, and good news respectively.

        self : EOSequentialTradeModel instance which contains parameter definitions
        """
        events = np.random.binomial(1, self.a, (self.N,self.T))
        news = np.random.binomial(1, self.d, (self.N,self.T))
        news[news == 0] = -1

        states = events*news

        return states

The last step, compute_alpha is a function that will compute CPIEs for real or simulated data. The computation of the CPIE depends on the likelihood function definitions.

def _lf(eb, es, n_buys, n_sells):
    return -eb+n_buys*log(eb)-lfact(n_buys)-es+n_sells*log(es)-lfact(n_sells)

def _ll(a, d, eb, es, u, n_buys, n_sells):
    return np.array([log(a*(1-d))+_lf(eb,es+u,n_buys,n_sells), 
                   log(a*d)+_lf(eb+u,es,n_buys,n_sells), 
                   log(1-a)+_lf(eb,es,n_buys,n_sells)])
            
def compute_alpha(a, d, eb, es, u, n_buys, n_sells):
    '''Compute the conditional alpha given parameters, buys, and sells.

    '''
    ll = _ll(a, d, eb, es, u, n_buys, n_sells)    
    llmax = ll.max(axis=0)
    y = exp(ll-llmax)
    alpha = y[:-1].sum(axis=0)/y.sum(axis=0)
    
    return alpha

def loglik(theta, n_buys, n_sells):
    a,d,eb,es,u = theta
    ll = _ll(a, d, eb, es, u, n_buys, n_sells)
    
    return sum(logsumexp(ll,axis=0))

_lf() is a function that represents the Poisson log-likelihood which is common to each of the three states: good, bad, and no news.

_ll() is a function that represents the full vector of log-likelihoods for the PIN model.

compute_alpha() computes CPIEs, using a numerical trick. We compute the vector of likelihoods by calling _ll(), we get a vector of the max across the three states, and then we scale the vector of likelihoods by the max before computing the ratio that represents the CPIE.

Finally, loglik() computes the total likelihood that will be used in the optimization.

At this point you are probably wondering why some these functions are named with underscores (_) in front, and others are not. In Python this indicates that these are “hidden” functions. This is helpful for users that are exploring the code interactively, as we want them to only see/interact with the higher-level functions, like compute_alpha and loglik.

The actual estimation is handled by the fit() function.

The fit() function does a number of things that are seemingly complex, but necessary to get the numerical optimization to work well.

For instance we have up to 10 random starts, and we will try each optimization up to maxiter=100 times.

def fit(n_buys, n_sells, starts=10, maxiter=100, 
        a=None, d=None, eb=None, es=None, u=None,
        se=None, **kwargs):

    nll = lambda *args: -loglik(*args) # define the negative log likelihood that we will minimize
    bounds = [(0.00001,0.99999)]*2+[(0.00001,np.inf)]*3 # we will do a constrained optimization
    ranges = [(0.00001,0.99999)]*2 # we will define the min-max range for our random guesses
    
    # if we do not have a prior on what the estimates are, we compute them here
    a0,d0 = [x or 0.5 for x in (a,d)] # 50% chance of information/news
    eb0,es0 = eb or np.mean(n_buys), es or np.mean(n_sells) # expected buys/sells = mean of observed buy/sells
    oib = n_buys - n_sells
    u0 = u or np.mean(abs(oib)) # expected order imbalance = mean of absolute order imbalance

    res_final = [a0,d0,eb0,es0,u0] # define the vector that will hold all the parameters
    stderr = np.zeros_like(res_final) # define the vector that will hold our standard errors
    f = nll(res_final,n_buys,n_sells) # initialize the log likelihood function with the buys/sells data
    for i in range(starts):
        # rc is going to be our return code
        rc = -1
        j = 0
        while (rc != 0) & (j <= maxiter):
            if (None in (res_final)) or i:
                # guess parameters
                a0,d0 = [np.random.uniform(l,np.nan_to_num(h)) for (l,h) in ranges]
                eb0,es0,u0 = np.random.poisson([eb,es,u])
            # do actual optimization here
            res = op.minimize(nll, [a0,d0,eb0,es0,u0], method=None,
                              bounds=bounds, args=(n_buys,n_sells))
            rc = res['status']
            # see if the optimization step violated any constraints
            check_bounds = list(imap(lambda x,y: x in y, res['x'], bounds))
            if any(check_bounds):
                rc = 3
            j+=1
        if (res['success']) & (res['fun'] <= f):
            # if everything worked fine and we have a 
            # smaller (negative) likelihood then store these parameters
            f,rc = res['fun'],res['status']
            res_final = res['x'].tolist()
            # and compute standard errors
            stderr = 1/np.sqrt(inv(res['hess_inv'].todense()).diagonal())

    # output the final parameter estimates
    param_names = ['a','d','eb','es','u']
    output = dict(zip(param_names+['f','rc'],
                    res_final+[f,rc]))
    if se:
        output = {'params': dict(zip(param_names,res_final)),
                  'se': dict(zip(param_names,stderr)),
                  'stats':{'f': f,'rc': rc}
                 }
    return output

The last function is cpie_mech() which is very simple for EOModel: a dummy variable for whether observed turnover is higher than the average.

def cpie_mech(turn):
    mech = np.zeros_like(turn)
    mech[turn > turn.mean()] = 1
    return mech

The last piece defines the behavior for when you try to run eo_model.py as a stand-alone script. In this case it simulates an example PIN model and runs regressions based on the simulated data to show how the model identifies information. This was part of an older version of our paper but is useful for building intuition.

if __name__ == '__main__':
    
    import pandas as pd
    from regressions import *

    a = 0.41
    d = 0.58
    es = 2719
    eb = 2672
    u = 2700

    N = 1000
    T = 252

    model = EOModel(a,d,es,eb,u,n=N,t=T)

    buys = to_series(model.buys)
    sells = to_series(model.sells)
    
    aoib = abs(buys-sells)
    turn = buys+sells
    alpha = to_series(model.alpha)

    def run_regs(df):
        # run regression
        m = []
        m.append(partial_r2(df['alpha'],df[['aoib','aoib2']], df[['aoib','aoib2','turn','turn2']]))
        out = pd.DataFrame(m, columns=['results'])
        out.index.names = ['model']
        return out

    regtab = pd.DataFrame({'alpha':alpha,'aoib':aoib,'aoib2':aoib**2,'turn':turn,'turn2':turn**2})
    
    res = run_regs(regtab)

    print(est_tab(res.results, est=['params','tvalues'], stats=['rsquared','rsquared_sp']))

OWRModel

Let’s start again with the import statement.

from __future__ import division

# numpy for matrix algebra
import numpy as np
from numpy import log, exp

import pandas as pd

from scipy.special import logsumexp
import scipy.optimize as op

# optimization
from numba import jit

from common import *

Many of the libraries we need are the same (numpy, scipy). For convenience we also import pandas to make handling the data a bit easier, although the code could be re-written without it.

The most important new library is numba from which we import the jit: just-in-time compiler to compile some of our matrix algebra function loops.

The Class __init__() is analogous to that of EOModel, but the parameterization is much more complex because we are dealing with multivariate normal distributions.

class OWRModel(object):
    
    def __init__(self,a,su,sz,si,spd,spo,n=1,t=252):
        """Initializes parameters of an Odders-White and Ready (2008) Sequential Trade Model

        a: $\alpha$, the unconditional probability of an information event
        ... #TODO#
        """

        # Assign model parameters
        self.a, self.su, self.sz, self.si, self.spd, self.spo, self.N, self.T = a, su, sz, si, spd, spo, n, t
        self.s_n, self.s_e = _compute_cov(a, su, sz, si, spd, spo)

        # pre-computing the dets and invs saves a lot of time
        self.dsn, self.isn = det3(self.s_n), inv3(self.s_n)
        self.dse, self.ise = det3(self.s_e), inv3(self.s_e)
        self.dsd = self.dsn/self.dse
        self.isd = self.isn - self.ise

        self.states = np.random.binomial(1, a, n*t)
        
        mean = [0]*3
        x_n = np.random.multivariate_normal(mean,self.s_n,n*t).T
        x_e = np.random.multivariate_normal(mean,self.s_e,n*t).T
        self.x = x_n*self.states+x_e+(1-self.states)
        self.oib, self.ret_d, self.ret_o = self.x.reshape(3,n,t)
        self.alpha = _compute_alpha(self.x,
                                    self.a,self.dsd,self.isd)\
                                    .reshape((n,t))

Here I want to highlight two functions in particular: det3 and inv3. Based on extensive profiling I found that pre-computing the determinants and inverses saved me a lot of time. This makes sense because these are expensive functions that you do not want to compute each time you need one of these matrices. Furthermore, because these are only 3x3 matrices, I further sped things up by hand-coding the matrix algebra. Python’s numpy matrix algebra library is fast, but a general-purpose matrix algebra function will never be as fast as dead-simple hand-coded matrix algebra computation.

This may seem like overkill, but when you think about how many times these functions could potentially be called in the optimization loop, you will realize how quickly the computation time can add up.

def det2(a):
    return (a[0][0] * a[1][1]) - (a[0][1] * a[1][0])

def det3(a):
    return (a[0][0] * (a[1][1] * a[2][2] - a[2][1] * a[1][2])
           -a[1][0] * (a[0][1] * a[2][2] - a[2][1] * a[0][2])
           +a[2][0] * (a[0][1] * a[1][2] - a[1][1] * a[0][2]))

def inv3(a):
    invdet = 1/det3(a)
    m = np.zeros((3,3))
    m[0, 0] = a[1, 1] * a[2, 2] - a[2, 1] * a[1, 2]
    m[0, 1] = a[0, 2] * a[2, 1] - a[0, 1] * a[2, 2]
    m[0, 2] = a[0, 1] * a[1, 2] - a[0, 2] * a[1, 1]
    m[1, 0] = a[1, 2] * a[2, 0] - a[1, 0] * a[2, 2]
    m[1, 1] = a[0, 0] * a[2, 2] - a[0, 2] * a[2, 0]
    m[1, 2] = a[1, 0] * a[0, 2] - a[0, 0] * a[1, 2]
    m[2, 0] = a[1, 0] * a[2, 1] - a[2, 0] * a[1, 1]
    m[2, 1] = a[2, 0] * a[0, 1] - a[0, 0] * a[2, 1]
    m[2, 2] = a[0, 0] * a[1, 1] - a[1, 0] * a[0, 1]
    return m*invdet

Next let’s jump ahead a bit and talk about the OWRModel’s _lf() function, along with its corresponding speed-up trick.

def _lf(x, det, inv):
    return -0.5*log(det)-0.5*_qvmv(x,inv)

@jit
def _qvmv(x,A):
    """Computes x'Ax.
    """
    m,n = A.shape
    qsum = 0
    
    for i in range(m):
        for j in range(n):
            qsum += A[i,j] * x[i] * x[j]
            
    return qsum

The likelihood for a multivariate normal is dead simple to write. It involves only the vector of data x, and the determinant and inverse of the covariance matrix based on the model parameters (which we have pre-computed).

The only “tricky” part is the matrix-vector multiplication x'Ax, where A is the inverse of the covariance matrix. That is because we will have to call the numpy matrix multiplication function twice, which is expensive because it is a general purpose function.

As before, we can write our own dead-simple version, replacing two multiplication calls with two loops! That is where _qvmv() comes in.

Of course, loops in Python are much slower than loops in a language like C++. But we can get near-C speeds for very simple code by compiling the function with numba jit. All this takes is adding the @jit decorator on top of the function which tells python to compile this piece of code and running the compiled version.

Note: Python is an interpreted language, not a compiled one. It makes it much easier to write, and you usually never have to compile, but it can also make it slow. Hence we only need to compile a few pieces of code to see dramatic speed-ups while maintaining maximum flexibility.

Finally, to see how everything comes together, let’s look at loglik():

def loglik(theta, oib, ret_d, ret_o):
    a, su, sz, si, spd, spo = theta
    s_n, s_e = _compute_cov(a, su, sz, si, spd, spo)
    dsn, isn = det3(s_n), inv3(s_n)
    dse, ise = det3(s_e), inv3(s_e)
    
    x = np.array([oib,ret_d,ret_o])
    t = x.shape[1]
    ll = np.zeros((2,t))
    for i in range(t):
        ll[:,i] = log(a)+_lf(x[:,i],dse,ise), log(1-a)+_lf(x[:,i],dsn,isn)
    
    return sum(logsumexp(ll,axis=0))

Here we end up computing det and inv again. This is necessary because each iteration in the optimization process we change the vector of parameters theta, and therefore need new determinants and inverses.

And that’s pretty much it! The OWRModel has its own fit based on the specifics of the model, but there are no new tricks.

Estimation code

In this section we go through the setup for estimating the models in parallel. The main estimation code, which will call a given model’s fit() function is est.py.

Let’s look at the import statement. Parallelization is done using the ipyparallel library. As mentioned before the data is stored in PyTables, so we will use two libraries to work with this data: pandas and tables. os handles operating system functions like changing working directories/making new files or folders. argparse parses the input arguments so that we can call the estimation script like this: python est.py owr 2015 to estimate the owr model in 2015 without having to write additional code.

import ipyparallel as ipp
import pandas as pd
import tables as tb
import os
import argparse

The next piece handles the parsing of the arguments. The model name comes first, then the year.

parser = argparse.ArgumentParser(description='Model and year to estimate.')
parser.add_argument('model', type=str, nargs='?', default='gpin')
parser.add_argument('year', type=int, nargs='?', default=2014)
args = parser.parse_args()
print(vars(args))

The next piece sets up the ipyparallel client, and finds the necessary data. Rather than actually send the data to the worker node, we will just tell the worker where the data starts and ends (finding the index idx) so that it knows where to get it. This reduces memory overhead.

rc = ipp.Client(cluster_id="{0}-{1}".format(args.model,args.year))
print(len(rc))
dv = rc[:]
dv.push(vars(args))
lv = rc.load_balanced_view()

h5 = tb.open_file('/scratch/nyu/hue/taqdf_1319.h5', mode='r')
df = h5.get_node('/data/table')
idx = list(set(filter(lambda x: x[1]==args.year, zip(df.col('permno'),df.col('yyyy')))))

Finally, we define the actual function that each worker node will run est(). Because each worker node is independent it needs its own import statements, connection to the data, etc. Then all it has to do is call the right fit() function, and write the resulting parameter estimates to disk.

@ipp.interactive
def est(x):
    import os
    import pandas as pd
    import tables as tb
    import json
    os.chdir('/home/nyu/eddyhu/git/pin-code')

    import eo_model as eo
    import gpin_model as gpin
    import owr_model as owr
    
    d = pd.read_hdf('/scratch/nyu/hue/taqdf_1319.h5',
                    where='permno=={permno} & yyyy=={yyyy}'.format(permno=x[0], yyyy=x[1]))
    d = d.dropna()
    if model == 'eo':
        r = eo.fit(d.n_buys, d.n_sells, starts=1)
    if model == 'gpin':
        r = gpin.fit(d.n_buys, d.n_sells, starts=1)
    if model == 'owr':
        r = owr.fit(d.uy_e, d.ur_d, d.ur_o, starts=5)
    r.update({'permno':int(x[0]),'yyyy':int(x[1])})
    fname = '/home/nyu/eddyhu/pin-estimates/{model}/{permno}-{yyyy}.json'.format(model=model, permno=x[0], yyyy=x[1])
    with open(fname, 'w') as outfile:
        json.dump(r, outfile)
    return r

res = lv.map_async(est, idx)
res.wait()

The final piece is run_est.sh: a shell script which starts the ipyparallel cluster, calls est.py, and shuts down the cluster once we finish all the stocks in a given year.

#!/bin/bash
#$ -cwd
#$ -m abe
#$ -pe onenode 4
#$ -M [your@email.com]
model=$1;shift
year=$1;shift
source ~/miniconda3/bin/activate
ipcluster start -n 7 --cluster-id="$model-$year" &
sleep 45
ipython est.py $model $year
ipcluster stop

There are a few tricks here that are worth pointing out. The header is actually read as instructions to the UNIVA grid engine. -cwd tells the job scheduler to start each worker in the current directory, which is where our scripts are stored. -m abe tells the scheduler to send a message in the event of a job abort, error, etc. -pe 4 requests 4 job nodes, which thanks to hyperthreading gives us 8 processes. -M [your@email.com] tells the scheduler to send status update emails to me.

In the actual body of the script we grab the arguments, load up our anaconda environment, start ipcluster which manages the ipyparallel cluster, wait 45 seconds for the clsuter to start, then run our estimation script.

Note that we are only using -n 7 compute nodes, as we are leaving one for the cluster manager. Also we call the script using ipython rather than python. This is not strictly necessary, but gives us some more flexibility in case we want to utilize ipython specific convenience functions that are not available in base python. However, I have written the code to work with base python.

run_est.sh is called similarly to est.py, as it is really just a wrapper for the grid engine:

qsub run_est.sh owr 2015

To collect the estimates we can run a quick jq script to make a csv file.

cd ~/git/pin-estimates/gpin
jq -r '[.permno, .yyyy, .a, .p, .eta, .r, .d, .th] | @csv' *.json > gpin_1319.csv