Source code for ogzaf.income

"""
-----------------------------------------------------------------
Functions for created the matrix of ability levels, e.  This can
only be used for looking at the 25, 50, 70, 80, 90, 99, and 100th
percentiles, as it uses fitted polynomials to those percentiles.
-----------------------------------------------------------------
"""

import numpy as np
import scipy.optimize as opt
import scipy.interpolate as si
from ogcore import parameter_plots as pp


[docs] def arctan_func(xvals, a, b, c): r""" This function generates predicted ability levels given data (xvals) and parameters a, b, and c, from the following arctan function: .. math:: y = (-a / \pi) * \arctan(b * x + c) + (a / 2) Args: xvals (Numpy array): data inputs to arctan function a (scalar): scale parameter for arctan function b (scalar): curvature parameter for arctan function c (scalar): shift parameter for arctan function Returns: yvals (Numpy array): predicted values (output) of arctan function """ yvals = (-a / np.pi) * np.arctan(b * xvals + c) + (a / 2) return yvals
[docs] def arctan_deriv_func(xvals, a, b, c): r""" This function generates predicted derivatives of arctan function given data (xvals) and parameters a, b, and c. The functional form of the derivative of the function is the following: .. math:: y = - (a * b) / (\pi * (1 + (b * xvals + c)^2)) Args: xvals (Numpy array): data inputs to arctan derivative function a (scalar): scale parameter for arctan function b (scalar): curvature parameter for arctan function c (scalar): shift parameter for arctan function Returns: yvals (Numpy array): predicted values (output) of arctan derivative function """ yvals = -(a * b) / (np.pi * (1 + (b * xvals + c) ** 2)) return yvals
[docs] def arc_error(abc_vals, params): """ This function returns a vector of errors in the three criteria on which the arctan function is fit to predict extrapolated ability in ages 81 to 100.:: 1) The arctan function value at age 80 must match the estimated original function value at age 80. 2) The arctan function slope at age 80 must match the estimated original function slope at age 80. 3) The level of ability at age 100 must be a given fraction (abil_deprec) below the ability level at age 80. Args: abc_vals (tuple): contains (a,b,c) * a (scalar): scale parameter for arctan function * b (scalar): curvature parameter for arctan function * c (scalar): shift parameter for arctan function params (tuple): contains (first_point, coef1, coef2, coef3, abil_deprec) * first_point (scalar): ability level at age 80, > 0 * coef1 (scalar): coefficient in log ability equation on linear term in age * coef2 (scalar): coefficient in log ability equation on quadratic term in age * coef3 (scalar): coefficient in log ability equation on cubic term in age * abil_deprec (scalar): ability depreciation rate between ages 80 and 100, in (0, 1). Returns: error_vec (Numpy array): errors ([error1, error2, error3]) * error1 (scalar): error between ability level at age 80 from original function minus the predicted ability at age 80 from the arctan function given a, b, and c * error2 (scalar): error between the slope of the original function at age 80 minus the slope of the arctan function at age 80 given a, b, and c * error3 (scalar): error between the ability level at age 100 predicted by the original model value times abil_deprec minus the ability predicted by the arctan function at age 100 given a, b, and c """ a, b, c = abc_vals first_point, coef1, coef2, coef3, abil_deprec = params error1 = first_point - arctan_func(80, a, b, c) if (3 * coef3 * 80**2 + 2 * coef2 * 80 + coef1) < 0: error2 = ( 3 * coef3 * 80**2 + 2 * coef2 * 80 + coef1 ) * first_point - arctan_deriv_func(80, a, b, c) else: error2 = -0.02 * first_point - arctan_deriv_func(80, a, b, c) error3 = abil_deprec * first_point - arctan_func(100, a, b, c) error_vec = np.array([error1, error2, error3]) return error_vec
[docs] def arctan_fit(first_point, coef1, coef2, coef3, abil_deprec, init_guesses): """ This function fits an arctan function to the last 20 years of the ability levels of a particular ability group to extrapolate abilities by trying to match the slope in the 80th year and the ability depreciation rate between years 80 and 100. Args: first_point (scalar): ability level at age 80, > 0 coef1 (scalar): coefficient in log ability equation on linear term in age coef2 (scalar): coefficient in log ability equation on quadratic term in age coef3 (scalar): coefficient in log ability equation on cubic term in age abil_deprec (scalar): ability depreciation rate between ages 80 and 100, in (0, 1) init_guesses (Numpy array): initial guesses Returns: abil_last (Numpy array): extrapolated ability levels for ages 81 to 100, length 20 """ params = [first_point, coef1, coef2, coef3, abil_deprec] solution = opt.root(arc_error, init_guesses, args=params, method="lm") [a, b, c] = solution.x old_ages = np.linspace(81, 100, 20) abil_last = arctan_func(old_ages, a, b, c) return abil_last
[docs] def get_e_interp(S, age_wgts, age_wgts_80, abil_wgts, plot_path=None): """ This function takes a source matrix of lifetime earnings profiles (abilities, emat) of size (80, 7), where 80 is the number of ages and 7 is the number of ability types in the source matrix, and interpolates new values of a new S x J sized matrix of abilities using linear interpolation. [NOTE: For this application, cubic spline interpolation introduces too much curvature.] This function also includes the two cases in which J = 9 and J = 10 that include higher lifetime earning percentiles calibrated using Piketty and Saez (2003). Args: S (int): number of ages to interpolate. This method assumes that ages are evenly spaced between the beginning of the 21st year and the end of the 100th year, >= 3 age_wgts (Numpy array): distribution of population in each age for the interpolated ages, length S age_wgts_80 (Numpy array): percent of population in each one-year age from 21 to 100, length 80 abil_wgts (Numpy array): distribution of population in each ability group, length J plot_path (str)): if True, creates plots of emat_orig and the new interpolated emat_new Returns: emat_new_scaled (Numpy array): interpolated ability matrix scaled so that population-weighted average is 1, size SxJ """ # Get original 80 x 7 ability matrix abil_wgts_orig = np.array([0.25, 0.25, 0.2, 0.1, 0.1, 0.09, 0.01]) emat_orig = get_e_orig(age_wgts_80, abil_wgts_orig, plot_path) if ( S == 80 and np.array_equal( np.squeeze(abil_wgts), np.array([0.25, 0.25, 0.2, 0.1, 0.1, 0.09, 0.01]), ) is True ): emat_new_scaled = emat_orig elif ( S == 80 and np.array_equal( np.squeeze(abil_wgts), np.array( [0.25, 0.25, 0.2, 0.1, 0.1, 0.09, 0.005, 0.004, 0.0009, 0.0001] ), ) is True ): emat_new = np.zeros((S, len(abil_wgts))) emat_new[:, :7] = emat_orig # Create profiles for top 0.5%, top 0.1% and top 0.01% using # Piketty and Saez estimates # (https://eml.berkeley.edu/~saez/pikettyqje.pdf) # updated for 2018 to create scaling factor # assumption is that profile shape of these top 3 groups are # same as the top 1% estimated in tax data, just scaled up by # ratio determined from P&S 2018 estimates (Table 0, ex cap gains) emat_new[:, 5] = emat_orig[:, -2] * 1.25 emat_new[:, 6] = emat_orig[:, -1] * 0.458759521 * 2.75 emat_new[:, 7] = emat_orig[:, -1] * 0.847252448 * 3.5 emat_new[:, 8] = emat_orig[:, -1] * 2.713698465 * 3.5 emat_new[:, 9] = emat_orig[:, -1] * 18.74863983 * 4.0 emat_new_scaled = ( emat_new / ( emat_new * age_wgts.reshape(80, 1) * abil_wgts.reshape(1, 10) ).sum() ) elif ( S == 80 and np.array_equal( np.squeeze(abil_wgts), np.array([0.25, 0.25, 0.2, 0.1, 0.1, 0.09, 0.005, 0.004, 0.001]), ) is True ): emat_new = np.zeros((S, len(abil_wgts))) emat_new[:, :7] = emat_orig # Create profiles for top 0.5%, top 0.1% using # Piketty and Saez estimates # (https://eml.berkeley.edu/~saez/pikettyqje.pdf) # updated for 2018 to create scaling factor # assumption is that profile shape of these top 3 groups are # same as the top 1% estimated in tax data, just scaled up by # ratio determined from P&S 2018 estimates (Table 0, ex cap gains) emat_new[:, 6] = emat_orig[:, -1] * 0.458759521 emat_new[:, 7] = emat_orig[:, -1] * 0.847252448 emat_new[:, 8] = emat_orig[:, -1] * 4.317192601 emat_new_scaled = ( emat_new / ( emat_new * age_wgts.reshape(80, 1) * abil_wgts.reshape(1, 9) ).sum() ) else: # generate abil_midp vector J = abil_wgts.shape[0] abil_midp = np.zeros(J) pct_lb = 0.0 for j in range(J): abil_midp[j] = pct_lb + 0.5 * abil_wgts[j] pct_lb += abil_wgts[j] # Make sure that values in abil_midp are within interpolating # bounds set by the hard coded abil_wgts_orig if abil_midp.min() < 0.125 or abil_midp.max() > 0.995: err = ( "One or more entries in abils vector is outside the " + "allowable bounds." ) raise RuntimeError(err) emat_j_midp = np.array( [0.125, 0.375, 0.600, 0.750, 0.850, 0.945, 0.995] ) emat_s_midp = np.linspace(20.5, 99.5, 80) emat_j_mesh, emat_s_mesh = np.meshgrid(emat_j_midp, emat_s_midp) newstep = 80 / S new_s_midp = np.linspace(20 + 0.5 * newstep, 100 - 0.5 * newstep, S) new_j_mesh, new_s_mesh = np.meshgrid(abil_midp, new_s_midp) newcoords = np.hstack( ( emat_s_mesh.reshape((80 * 7, 1)), emat_j_mesh.reshape((80 * 7, 1)), ) ) emat_new = si.griddata( newcoords, emat_orig.flatten(), (new_s_mesh, new_j_mesh), method="linear", ) emat_new_scaled = ( emat_new / ( emat_new * age_wgts.reshape(S, 1) * abil_wgts.reshape(1, J) ).sum() ) if plot_path is not None: kwargs = {"path": plot_path, "filesuffix": "_intrp_scaled"} pp.plot_income_data( new_s_midp, abil_midp, abil_wgts, emat_new_scaled, plot_path, **kwargs, ) return emat_new_scaled
[docs] def get_e_orig(age_wgts, abil_wgts, plot_path=None): r""" This function generates the 80 x 7 matrix of lifetime earnings ability profiles, corresponding to annual ages from 21 to 100 and to paths based on income percentiles 0-25, 25-50, 50-70, 70-80, 80-90, 90-99, 99-100. The ergodic population distribution is an input in order to rescale the paths so that the weighted average equals 1. The base curves are the ones in OG-USA, which are then adjusted for ZAF. The polynomials are of the form .. math:: \ln(abil) = \alpha + \beta_{1}\text{age} + \beta_{2}\text{age}^2 + \beta_{3}\text{age}^3 To calibrate for ZAF, the USA curves are adjusted in 2 ways (in this order) 1) Adjustment by income (J): adjust the gaps between the J-income earning curves using data from WID. 2) Adjustment by age (S): adjust the shape/distribution of each J-income earning profile curve using data from NTA. The methodology is described here: https://github.com/EAPD-DRB/OG-ZAF/issues/18#issuecomment-1368580323 Args: age_wgts (Numpy array): ergodic age distribution, length S abil_wgts (Numpy array): population weights in each lifetime earnings group, length J plot_path (str): Path to save plots to Returns: e_orig_scaled (Numpy array): = lifetime ability profiles scaled so that population-weighted average is 1, size SxJ """ # Return and error if age_wgts is not a vector of size (80,) if age_wgts.shape[0] != 80: err = "Vector age_wgts does not have 80 elements." raise RuntimeError(err) # Return and error if abil_wgts is not a vector of size (7,) if abil_wgts.shape[0] != 7: err = "Vector abil_wgts does not have 7 elements." raise RuntimeError(err) # 1) Generate polynomials using USA data and use them to get income profiles for # ages 21 to 80. one = np.array( [ -0.09720122, 0.05995294, 0.17654618, 0.21168263, 0.21638731, 0.04500235, 0.09229392, ] ) two = np.array( [ 0.00247639, -0.00004086, -0.00240656, -0.00306555, -0.00321041, 0.00094253, 0.00012902, ] ) three = np.array( [ -0.00001842, -0.00000521, 0.00001039, 0.00001438, 0.00001579, -0.00001470, -0.00001169, ] ) const = np.array( [ 3.41e00, 0.69689692, -0.78761958, -1.11e00, -0.93939272, 1.60e00, 1.89e00, ] ) ages_short = np.tile(np.linspace(21, 80, 60).reshape((60, 1)), (1, 7)) log_abil_paths = ( const + (one * ages_short) + (two * (ages_short**2)) + (three * (ages_short**3)) ) # New estimated coefficients for ZAF after adjustment by income (J) and by age (S) const = np.array( [ 1.10766851280735, -1.47205271208099, -2.79826519632522, -2.84592025503416, -2.33264177437992, 0.820108734133472, 0.573684959034946, ] ) one = np.array( [ -0.0577752937758472, 0.0993788662241527, 0.215972106224152, 0.251108556224153, 0.255813236224153, 0.0844282762241525, 0.131719846224152, ] ) two = np.array( [ 0.00313926193376278, 0.000622011933762788, -0.00174368806623721, -0.00240267806623721, -0.00254753806623722, 0.00160540193376279, 0.000791891933762785, ] ) three = np.array( [ -0.000035350068460927, -2.21400684609271e-05, -6.54006846092713e-06, -2.55006846092713e-06, -1.14006846092704e-06, -3.16300684609271e-05, -2.86200684609271e-05, ] ) # compute the lifetime income profiles using the new coefficients ages_short_adj = np.tile(np.linspace(21, 80, 60).reshape((60, 1)), (1, 7)) log_abil_paths_adj = ( const + (one * ages_short_adj) + (two * (ages_short_adj**2)) + (three * (ages_short_adj**3)) ) abil_paths_adj = np.exp(log_abil_paths_adj) e_orig = np.zeros((80, 7)) e_orig[:60, :] = abil_paths_adj e_orig[60:, :] = 0.0 # 2) Forecast (with some art) the path of the final 20 years of # ability types. This following variable is what percentage of # ability at age 80 ability falls to at age 100. In general, we # wanted people to lose half of their ability over a 20-year # period. The first entry is 0.47, though, because nothing higher # would converge. The second-to-last is 0.7 because this group # actually has a slightly higher ability at age 80 than the last # group, so this value makes it decrease more so it ends up being # monotonic. abil_deprec = np.array([0.47, 0.5, 0.5, 0.5, 0.5, 0.7, 0.5]) # Initial guesses for the arctan. They're pretty sensitive. init_guesses = np.array( [ [58, 0.0756438545595, -5.6940142786], [27, 0.069, -5], [35, 0.06, -5], [37, 0.339936555352, -33.5987329144], [70.5229181668, 0.0701993896947, -6.37746859905], [35, 0.06, -5], [35, 0.06, -5], ] ) for j in range(7): e_orig[60:, j] = arctan_fit( e_orig[59, j], one[j], two[j], three[j], abil_deprec[j], init_guesses[j], ) # 3) Rescale the lifetime earnings path matrix so that the # population weighted average equals 1. e_orig_scaled = ( e_orig / (e_orig * age_wgts.reshape(80, 1) * abil_wgts.reshape(1, 7)).sum() ) if plot_path is not None: ages_long = np.linspace(21, 100, 80) abil_midp = np.array([12.5, 37.5, 60.0, 75.0, 85.0, 94.5, 99.5]) # Plot original unscaled 80 x 7 ability matrix kwargs = {"path": plot_path, "filesuffix": "_orig_unscaled"} pp.plot_income_data(ages_long, abil_midp, abil_wgts, e_orig, **kwargs) # Plot original scaled 80 x 7 ability matrix kwargs = {"path": plot_path, "filesuffix": "_orig_scaled"} pp.plot_income_data( ages_long, abil_midp, abil_wgts, e_orig_scaled, **kwargs, ) return e_orig_scaled