import numpy as np
import scipy.optimize as opt
import scipy.interpolate as si
from ogcore import parameter_plots as pp
from ogcore import utils
from ogcore.parameters import Specifications
import os
import json
import urllib.request
CUR_PATH = os.path.abspath(os.path.dirname(__file__))
OUTPUT_DIR = os.path.join(CUR_PATH, "OUTPUT", "ability")
[docs]
def get_e_interp(E, S, J, lambdas, age_wgts, gini_to_match=40.7, plot=False):
"""
This function takes the calibrated lifetime earnings profiles
(abilities, e matrix) from OG-USA and then adjusts the shape of those
profiles to match the Gini coefficient for another economy. The
Gini coefficient to match is given in the argument gini_to_match.
Note that the calibrated OG-USA e matrix is of size (80, 10), where
80 is the number of ages and 10 is the number of ability types.
Users of this function specify their own number of age groups (S)
and ability types (J). The function will map the fitted functions
into these dimensions so long as the percentiles of the ability types
given in lambdas is not more refined at the top end than those in
OG-USA (which identifies up to the top 0.1%).
Args:
E (int): the age agents become economically active
S (int): number of ages to interpolate. This method assumes that
ages are evenly spaced between the beginning of age E
up to E+S, >= 3
J (int): number of ability types to interpolate
lambdas (Numpy array): distribution of population in each
ability group, length J
age_wgts (Numpy array): distribution of population in each age
group, length S
gini_to_match (float): Gini coefficient to match, default is
40.7, the Gini coefficient for PHL in 2023
https://data.worldbank.org/indicator/SI.POV.GINI
plot (bool): if True, creates plots of emat_orig and the new
interpolated emat_new
Returns:
emat_new_scaled (Numpy array): interpolated ability matrix scaled
so that population-weighted average is 1, size SxJ
"""
assert lambdas.shape[0] == J
assert age_wgts.shape[0] == S
# Load USA e matrix as a baseline
usa_params = Specifications()
usa_params.update_specifications(
json.load(
urllib.request.urlopen(
"https://raw.githubusercontent.com/PSLmodels/OG-USA/master/ogusa/ogusa_default_parameters.json"
)
)
)
# Define a function that will find the "a" in the equation:
# e_Y = e_USA * exp(a * e_USA)
# such that the e_Y produces a gini coefficient in the model that
# gives the same ratio between the model implied Gini's in the USA
# and the target country and the empirical Gini's in the USA and given
# by gin_to_match for the target country
def f(
a,
emat_orig,
age_wgts,
abil_wgts,
gini_to_match,
gini_usa_data,
gini_usa_model,
):
gini_target_model = utils.Inequality(
emat_orig * np.exp(a * emat_orig),
age_wgts,
abil_wgts,
len(age_wgts),
len(abil_wgts),
).gini()
error = (gini_to_match / gini_usa_data) - (
gini_target_model / gini_usa_model
)
return error
# Note, USA gini in the World Bank data is 41.5
# See https://data.worldbank.org/indicator/SI.POV.GINI
gini_usa_data = 41.5
# Find the model implied Gini for the USA
gini_usa_model = utils.Inequality(
usa_params.e[0, :, :],
usa_params.omega_SS,
usa_params.lambdas,
usa_params.S,
usa_params.J,
).gini()
x = opt.root_scalar(
f,
args=(
usa_params.e[0, :, :],
usa_params.omega_SS,
usa_params.lambdas,
gini_to_match,
gini_usa_data,
gini_usa_model,
),
method="bisect",
bracket=[-1, 1],
xtol=1e-10,
)
a = x.root
e_new = usa_params.e[0, :, :] * np.exp(a * usa_params.e[0, :, :])
emat_new_scaled = (
e_new
/ (
e_new
* usa_params.omega_SS.reshape(usa_params.S, 1)
* usa_params.lambdas.reshape(1, usa_params.J)
).sum()
)
# Now interpolate for the cases where S and/or J not the same in the
# country parameterization as in the default USA parameterization
if (
S == usa_params.S
and np.array_equal(
usa_params.lambdas,
lambdas,
)
is True
):
pass # will return the e_new_scaled found above since dims the same
else:
# generate vector of mid points for the Filipino ability groups
abil_midp = np.zeros(J)
pct_lb = 0.0
for j in range(J):
abil_midp[j] = pct_lb + 0.5 * lambdas[j]
pct_lb += lambdas[j]
# generate vector of mid points for the USA ability groups
M = usa_params.lambdas.shape[0]
emat_j_midp = np.zeros(M)
pct_lb = 0.0
for m in range(M):
emat_j_midp[m] = pct_lb + 0.5 * usa_params.lambdas[m]
pct_lb += usa_params.lambdas[m]
# Make sure that values in abil_midp are within interpolating
# bounds
if abil_midp.min() < emat_j_midp.min() or abil_midp.max() > (
1 - usa_params.lambdas[-1]
):
err = (
"One or more entries in abilities vector (lambdas) is outside the "
+ "allowable bounds for interpolation."
)
raise RuntimeError(err)
usa_step = 80 / usa_params.S
emat_s_midp = np.linspace(
usa_params.E + 0.5 * usa_step,
usa_params.E + usa_params.S - 0.5 * usa_step,
usa_params.S,
)
emat_j_mesh, emat_s_mesh = np.meshgrid(emat_j_midp, emat_s_midp)
newstep = 80 / S
new_s_midp = np.linspace(E + 0.5 * newstep, E + S - 0.5 * newstep, S)
new_j_mesh, new_s_mesh = np.meshgrid(abil_midp, new_s_midp)
newcoords = np.hstack(
(
emat_s_mesh.reshape((usa_params.S * usa_params.J, 1)),
emat_j_mesh.reshape((usa_params.S * usa_params.J, 1)),
)
)
emat_new = si.griddata(
newcoords,
emat_new_scaled.flatten(),
(new_s_mesh, new_j_mesh),
method="linear",
)
emat_new_scaled = (
emat_new
/ (emat_new * age_wgts.reshape(S, 1) * lambdas.reshape(1, J)).sum()
)
if plot:
kwargs = {"filesuffix": "_intrp_scaled"}
pp.plot_income_data(
new_s_midp,
abil_midp,
lambdas,
emat_new_scaled,
OUTPUT_DIR,
**kwargs,
)
return emat_new_scaled