import yfinance as yf
from . import yfc_dat as yfcd
from . import yfc_cache_manager as yfcm
from . import yfc_utils as yfcu
import numpy as np
import pandas as pd
import scipy.stats as stats
from time import sleep
from datetime import datetime, date, timedelta
from dateutil.relativedelta import relativedelta
import os
from statistics import mean
import math
from decimal import Decimal
from pprint import pprint
d_today = date.today()
yf_spam_window = timedelta(days=7)
# give Yahoo time to update their financials
yf_min_grace_days_period = timedelta(days=2)
yf_max_grace_days_period = timedelta(days=28)
company_release_delay = timedelta(days=2)
print_fetches = False
# print_fetches = True
[docs]
def sort_estimates(lst):
if len(lst) < 2:
return lst
pivot_index = len(lst) // 2
pivot = lst[pivot_index]
less = []
greater = []
for i, val in enumerate(lst):
if i == pivot_index:
continue
try:
less_than = val < pivot
except yfcd.AmbiguousComparisonException:
if hasattr(val, "prob_lt"):
less_than = val.prob_lt(pivot) > 0.5
else:
less_than = pivot.prob_gt(val) > 0.5
if less_than:
less.append(val)
else:
greater.append(val)
return sort_estimates(less) + [pivot] + sort_estimates(greater)
[docs]
class EarningsRelease():
def __init__(self, interval, period_end, release_date, full_year_end):
if not isinstance(period_end, (date, yfcd.DateEstimate)):
raise Exception("'period_end' must be a 'yfcd.DateEstimate' or date object or None, not {0}".format(type(period_end)))
if (release_date is not None):
if not isinstance(release_date, (date, yfcd.DateEstimate)):
raise Exception("'release_date' must be a 'yfcd.DateEstimate' or date object or None, not {0}".format(type(release_date)))
if release_date < period_end:
raise Exception("release_date={0} cannot occur before period_end={1}".format(release_date, period_end))
if release_date > (period_end + timedelta(days=90)):
raise Exception("release_date={0} shouldn't occur 90 days after period_end={1}".format(release_date, period_end))
if not isinstance(full_year_end, date):
raise Exception("'full_year_end' must be a date object or None, not {0}".format(type(full_year_end)))
self.interval = interval
self.period_end = period_end
self.release_date = release_date
self.full_year_end = full_year_end
def __str__(self):
s = f'{self.interval} earnings'
s += f" ending {self.period_end}"
s += " released"
s += " ?" if self.release_date is None else f" {self.release_date}"
return s
def __repr__(self):
return self.__str__()
def __lt__(self, other):
return self.period_end < other.period_end or (self.period_end == other.period_end and self.release_date < other.release_date)
def __le__(self, other):
return (self == other) or (self < other)
def __eq__(self, other):
return self.period_end == other.period_end and self.release_date == other.release_date
def __gt__(self, other):
return self.period_end > other.period_end or (self.period_end == other.period_end and self.release_date > other.release_date)
def __ge__(self, other):
return (self == other) or (self > other)
[docs]
def is_end_of_year(self):
r_is_end_of_year = False
rpe = self.period_end
diff = (rpe - self.full_year_end)
diff += timedelta(days=365) # just in case is negative
diff = diff % timedelta(days=365)
try:
if (diff > timedelta(days=-15) and diff < timedelta(days=15)) or\
(diff > timedelta(days=350) and diff < timedelta(days=370)):
# Aligns with annual release date
r_is_end_of_year = True
except yfcd.AmbiguousComparisonException:
r_is_end_of_year = True
return r_is_end_of_year
interval_str_to_days = {}
interval_str_to_days['ANNUAL'] = yfcd.ComparableRelativedelta(years=1)
interval_str_to_days['HALF'] = yfcd.ComparableRelativedelta(months=6)
interval_str_to_days['QUART'] = yfcd.ComparableRelativedelta(months=3)
[docs]
class FinancialsManager:
def __init__(self, ticker, exchange, tzName, session):
yfcu.TypeCheckStr(ticker, "ticker")
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckStr(tzName, "tzName")
self.ticker = ticker
self.exchange = exchange
self.tzName = tzName
self.session = session
self.dat = yf.Ticker(self.ticker, session=self.session)
# self._earnings = None
# self._quarterly_earnings = None
self._income_stmt = None
self._quarterly_income_stmt = None
self._balance_sheet = None
self._quarterly_balance_sheet = None
self._cashflow = None
self._quarterly_cashflow = None
self._earnings_dates = None
self._calendar = None
self._calendar_clean = None
self._pruned_tbl_cache = {}
self._fin_tbl_cache = {}
[docs]
def get_income_stmt(self, refresh=True):
if self._income_stmt is not None:
return self._income_stmt
self._income_stmt = self._get_fin_table(yfcd.Financials.IncomeStmt, yfcd.ReportingPeriod.Full, refresh)
return self._income_stmt
[docs]
def get_quarterly_income_stmt(self, refresh=True):
if self._quarterly_income_stmt is not None:
return self._quarterly_income_stmt
self._quarterly_income_stmt = self._get_fin_table(yfcd.Financials.IncomeStmt, yfcd.ReportingPeriod.Interim, refresh)
return self._quarterly_income_stmt
[docs]
def get_balance_sheet(self, refresh=True):
if self._balance_sheet is not None:
return self._balance_sheet
self._balance_sheet = self._get_fin_table(yfcd.Financials.BalanceSheet, yfcd.ReportingPeriod.Full, refresh)
return self._balance_sheet
[docs]
def get_quarterly_balance_sheet(self, refresh=True):
if self._quarterly_balance_sheet is not None:
return self._quarterly_balance_sheet
self._quarterly_balance_sheet = self._get_fin_table(yfcd.Financials.BalanceSheet, yfcd.ReportingPeriod.Interim, refresh)
return self._quarterly_balance_sheet
[docs]
def get_cashflow(self, refresh=True):
if self._cashflow is not None:
return self._cashflow
self._cashflow = self._get_fin_table(yfcd.Financials.CashFlow, yfcd.ReportingPeriod.Full, refresh)
return self._cashflow
[docs]
def get_quarterly_cashflow(self, refresh=True):
if self._quarterly_cashflow is not None:
return self._quarterly_cashflow
self._quarterly_cashflow = self._get_fin_table(yfcd.Financials.CashFlow, yfcd.ReportingPeriod.Interim, refresh)
return self._quarterly_cashflow
def _get_fin_table(self, finType, period, refresh=True):
debug = False
# debug = True
if debug:
print(f"_get_fin_table({finType}, {period}, refresh={refresh})")
if not isinstance(finType, yfcd.Financials):
raise Exception('Argument finType must be type Financials')
if not isinstance(period, yfcd.ReportingPeriod):
raise Exception('Argument period must be type ReportingPeriod')
cache_key = (finType, period, refresh)
if cache_key in self._fin_tbl_cache:
return self._fin_tbl_cache[cache_key]
if not refresh:
cache_key2 = (finType, period, True)
if cache_key2 in self._fin_tbl_cache:
return self._fin_tbl_cache[cache_key2]
if period == yfcd.ReportingPeriod.Interim:
name = 'quarterly_'
else:
name = ''
if finType == yfcd.Financials.IncomeStmt:
name += 'income_stmt'
elif finType == yfcd.Financials.BalanceSheet:
name += 'balance_sheet'
elif finType == yfcd.Financials.CashFlow:
name += 'cashflow'
df, md = None, None
if yfcm.IsDatumCached(self.ticker, name):
df, md = yfcm.ReadCacheDatum(self.ticker, name, True)
mod_dt = None
if md is None or len(md) == 0:
# Fix metadata
fp = yfcm.GetFilepath(self.ticker, name)
mod_dt = datetime.fromtimestamp(os.path.getmtime(fp)).astimezone()
md = {'FetchDates':{}}
for dt in df.columns:
md['FetchDates'][dt] = mod_dt
yfcm.WriteCacheMetadata(self.ticker, name, 'FetchDates', md['FetchDates'])
md['LastFetch'] = mod_dt
yfcm.WriteCacheMetadata(self.ticker, name, 'LastFetch', md['LastFetch'])
elif 'FetchDates' not in md:
if mod_dt is None:
fp = yfcm.GetFilepath(self.ticker, name)
mod_dt = datetime.fromtimestamp(os.path.getmtime(fp)).astimezone()
for dt in df.columns:
md['FetchDates'][dt] = mod_dt
yfcm.WriteCacheMetadata(self.ticker, name, 'FetchDates', md['FetchDates'])
elif 'LastFetch' not in md:
if mod_dt is None:
fp = yfcm.GetFilepath(self.ticker, name)
mod_dt = datetime.fromtimestamp(os.path.getmtime(fp)).astimezone()
md['LastFetch'] = mod_dt
yfcm.WriteCacheMetadata(self.ticker, name, 'LastFetch', md['LastFetch'])
if md['LastFetch'].tzinfo is None:
md['LastFetch'] = md['LastFetch'].astimezone()
yfcm.WriteCacheMetadata(self.ticker, name, 'LastFetch', md['LastFetch'])
do_fetch = False
if df is None:
do_fetch = True
elif refresh:
dt_now = pd.Timestamp.utcnow().tz_convert(self.tzName)
if df.empty:
# Nothing to estimate releases on, so just periodically check
try:
age = dt_now - md["LastFetch"]
except Exception:
print(md)
raise
if age > pd.Timedelta(days=30):
do_fetch = True
else:
td_1d = pd.Timedelta(1, unit='D')
releases = self.get_release_dates(period, refresh=False)
if debug:
print("- releases:") ; pprint(releases)
if releases is None:
# Use crude logic to estimate when to re-fetch
if 'LastFetch' in md.keys():
do_fetch = md['LastFetch'] < (dt_now - td_1d*30)
else:
do_fetch = True
else:
next_release = None
# last_d = df.columns.max().date()
# Update: analyse pruned dates:
last_d = self._prune_yf_financial_df(df).columns.max().date()
for r in releases:
# Release is newer than cache
try:
if r.period_end <= last_d:
continue
except yfcd.AmbiguousComparisonException:
# Treat as match
continue
try:
fetched_long_after_release = md['LastFetch'].date() > (r.release_date + yf_max_grace_days_period)
except yfcd.AmbiguousComparisonException:
fetched_long_after_release = False
if not fetched_long_after_release:
next_release = r
break
if next_release is None:
pprint(releases)
print("- last_d =", last_d)
raise Exception('Failed to determine next release after cached financials')
if debug:
print("- last_d =", last_d, ", last_fetch =", md['LastFetch'])
print("- next_release:", next_release)
rd = next_release.release_date
try:
next_release_in_future = rd > d_today
except yfcd.AmbiguousComparisonException:
next_release_in_future = False
if debug:
print("- next_release_in_future =", next_release_in_future)
if not next_release_in_future:
try:
fair_to_expect_Yahoo_updated = (d_today-rd) >= yf_min_grace_days_period
except yfcd.AmbiguousComparisonException:
fair_to_expect_Yahoo_updated = True
if debug:
print("- fair_to_expect_Yahoo_updated =", fair_to_expect_Yahoo_updated)
if fair_to_expect_Yahoo_updated:
if debug:
print("- expect new release, but did we already fetch recently?")
if md['LastFetch'] < (dt_now - yf_spam_window):
do_fetch = True
if debug:
print("- do_fetch =", do_fetch)
if do_fetch:
if print_fetches:
msg = f"{self.ticker}: fetching {name}"
if md is not None:
msg += f" (last fetch = {md['LastFetch']})"
print(msg)
df_new = getattr(self.dat, name)
fetch_dt = pd.Timestamp.utcnow().tz_convert(self.tzName)
if md is None:
md = {'FetchDates':{}}
for dt in df_new.columns:
md['FetchDates'][dt] = fetch_dt
md['LastFetch'] = fetch_dt
if df is None or df.empty:
df = df_new
elif df_new is not None and not df_new.empty:
df_pruned = df.drop([c for c in df.columns if c in df_new], axis=1)
df_new_pruned = df_new.drop([c for c in df_new.columns if c in df], axis=1)
if df_pruned.empty and df_new_pruned.empty:
if hasattr(next_release.release_date, 'confidence') and next_release.release_date.confidence == yfcd.Confidence.Low:
# Probably not released yet
pass
# else:
# # Update: also check if a large amount of time has passed since release.
# # Will Yahoo ever have it?
# td_since_release = d_today - next_release.release_date
# try:
# Yahoo_very_late = td_since_release > yf_max_grace_days_period
# except yfcd.AmbiguousComparisonException:
# Yahoo_very_late = False
# if Yahoo_very_late:
# # print("- next_release:", next_release)
# # print("- df:", df.columns, df.shape)
# # print("- df_new:", df_new.columns, df_new.shape)
# # print("- metadata old:") ; pprint(md_old)
# # print("- td_since_release:", td_since_release)
# ok = click.confirm(f"WARNING: Yahoo very late uploading newer {finType} for {self.ticker}, is this acceptable?", default=False)
# if ok:
# # print(f"WARNING: Yahoo missing newer financials for {self.ticker}")
# pass
# else:
# # print("- next_release:", next_release)
# # print("- df:", df.columns, df.shape)
# # print("- df_new:", df_new.columns, df_new.shape)
# # print("- metadata old:") ; pprint(md_old)
# raise Exception(f'Why asking Yahoo for {finType} when nothing new ready?')
elif not df_new.empty:
if df_pruned.empty:
df = df_new
else:
# Before merging, check for new/missing fields. Insert any with value NaN.
missing_keys = [k for k in df_pruned.index if k not in df_new.index]
new_keys = [k for k in df_new.index if k not in df_pruned.index]
actions = []
for k in missing_keys:
actions.append((k, "missing", df_pruned.index.get_loc(k)))
for k in new_keys:
actions.append((k, "new", df_new.index.get_loc(k)))
actions = sorted(actions, key=lambda x: x[2])
for a in actions:
k = a[0]
if a[1] == 'missing':
empty_row = pd.DataFrame(data={c:[np.nan] for c in df_new.columns}, index=[k])
idx = df_pruned.index.get_loc(k)
df_new = pd.concat([df_new.iloc[:idx], empty_row, df_new.iloc[idx:]])
else:
empty_row = pd.DataFrame(data={c:[np.nan] for c in df_pruned.columns}, index=[k])
idx = df_new.index.get_loc(k)
df_pruned = pd.concat([df_pruned.iloc[:idx], empty_row, df_pruned.iloc[idx:]])
df_new = df_new.reindex(df_pruned.index)
df = pd.concat([df_new, df_pruned], axis=1)
yfcm.StoreCacheDatum(self.ticker, name, df, metadata=md)
self._fin_tbl_cache[cache_key] = df
return df
def _get_interval_from_table(self, tbl):
debug = False
# debug = True
if debug:
print("_get_interval_from_table()")
dates = tbl.columns
# Ensure only well-populated columns are retained, corresponding to report releases
tbl = self._prune_yf_financial_df(tbl)
tbl = tbl[tbl.columns.sort_values(ascending=False)]
dates = tbl.columns
if debug:
print("- tbl:") ; print(tbl)
if len(dates) <= 1:
return yfcd.TimedeltaEstimate(yfcd.ComparableRelativedelta(months=6), yfcd.Confidence.Medium)
interval = None
intervals = [(dates[i-1] - dates[i]).days for i in range(1,len(dates))]
intervals = np.array(intervals)
# Cluster actual intervals
def safe_add_to_cluster(clusters, num, std_pct_threshold):
for c in clusters:
c2 = np.append(c, num)
if (np.std(c2) / np.mean(c2)) < std_pct_threshold:
c.append(num)
return True
return False
def cluster_numbers(numbers, std_pct):
clusters = []
for n in sorted(numbers):
if not clusters or not safe_add_to_cluster(clusters, n, std_pct):
clusters.append([n])
return clusters
clusters = cluster_numbers(intervals, 0.05)
# Map clusters to legal intervals
tol = 10
intervals = []
for i in range(len(clusters)-1, -1, -1):
m = np.mean(clusters[i])
if abs(m-365) < tol:
intervals.append(yfcd.ComparableRelativedelta(years=1))
elif abs(m-182) < tol:
intervals.append(yfcd.ComparableRelativedelta(months=6))
elif abs(m-91) < tol:
intervals.append(yfcd.ComparableRelativedelta(months=3))
elif abs(m-274) < tol:
# 9 months, nonsense, but implies quarterly
intervals.append(yfcd.TimedeltaEstimate(yfcd.ComparableRelativedelta(months=3), yfcd.Confidence.Medium))
else:
del clusters[i]
if len(intervals) == 1:
# good!
return intervals[0]
else:
# Return the smallest. In case of ambiguous comparison, keep most confident.
best = intervals[0]
for i in range(1, len(intervals)):
i2 = intervals[i]
try:
best = min(best, i2)
except yfcd.AmbiguousComparisonException:
best_confidence = best.confidence if hasattr(best, 'confidence') else yfcd.Confidence.High
i2_confidence = i2.confidence if hasattr(i2, 'confidence') else yfcd.Confidence.High
if i2_confidence > best_confidence:
best = i2
return best
def _get_interval(self, finType, refresh=True):
debug = False
# debug = True
if debug:
print(f"_get_interval({finType})")
if not isinstance(finType, yfcd.Financials):
raise Exception('Argument finType must be type Financials')
tbl = self._get_fin_table(finType, yfcd.ReportingPeriod.Interim, refresh)
return self._get_interval_from_table(tbl)
[docs]
def get_release_dates(self, period, as_df=False, refresh=True, check=False):
# First, check cache:
if period == yfcd.ReportingPeriod.Full:
cache_key = "full"
elif period == yfcd.ReportingPeriod.Interim:
cache_key = "interim"
else:
raise Exception(f"Unknown period value '{period}'")
cache_key += "-release-dates"
releases, md = None, None
if yfcm.IsDatumCached(self.ticker, cache_key):
releases, md = yfcm.ReadCacheDatum(self.ticker, cache_key, True)
if len(releases) == 0:
releases = None
max_age = pd.Timedelta(yfcm._option_manager.max_ages.calendar)
dt_now = pd.Timestamp.now()
d_exchange = pd.Timestamp.utcnow().tz_convert(self.tzName).date()
if releases is None:
if md is None:
do_calc = True
else:
do_calc = md['CalcDate'] < (dt_now - max_age)
else:
do_calc = False
# Check if cached release dates need a recalc
if md['CalcDate'] < (dt_now - max_age):
prev_r, next_r = None, None
for i in range(len(releases)-1):
r0 = releases[i]
r1 = releases[i+1]
try:
r_is_history = r0.release_date < d_exchange
except yfcd.AmbiguousComparisonException:
r_is_history = r0.release_date.prob_lt(d_exchange) > 0.9
if r_is_history:
prev_r = r0
next_r = r1
if hasattr(prev_r, 'confidence'):
do_calc = True
elif hasattr(next_r, 'confidence'):
try:
d_exchange < next_r.release_date
except yfcd.AmbiguousComparisonException:
print("- next release date is estimated, time to recalc:", next_r)
do_calc = True
# print("- releases:") ; pprint(releases)
# print("- md:") ; pprint(md)
# raise Exception('review cached release dates')
if do_calc:
releases = self._calc_release_dates(period, refresh, check)
md = {'CalcDate':pd.Timestamp.now()}
if releases is None:
yfcm.StoreCacheDatum(self.ticker, cache_key, [], metadata=md)
else:
yfcm.StoreCacheDatum(self.ticker, cache_key, releases, metadata=md)
if releases is None:
return None
if not as_df:
return releases
period_ends = []
period_ends_est = []
release_dates = []
release_dates_est = []
delays = []
for r in releases:
rpe = r.period_end ; rrd = r.release_date
if rpe is None or rrd is None:
print(r)
raise Exception('Release missing dates')
period_ends.append(rpe if isinstance(rpe, date) else rpe.date)
period_ends_est.append(rpe.confidence if isinstance(rpe, yfcd.DateEstimate) else yfcd.Confidence.High)
dt1 = rpe if isinstance(rpe, date) else rpe.date
if isinstance(rrd, yfcd.DateRange):
rrd_range = rrd.end - rrd.start
release_dates_est.append(yfcd.Confidence.High)
release_dates.append((rrd.start, rrd.end))
midpoint = rrd.start + timedelta(days=rrd_range.days//2)
delays.append(midpoint - dt1)
elif isinstance(rrd, yfcd.yfcd.DateRangeEstimate):
release_dates_est.append(rrd.confidence)
rrd_range = rrd.end - rrd.start
midpoint = rrd.start + rrd_range*0.5
if isinstance(midpoint, datetime):
midpoint = midpoint.date()
release_dates.append((rrd.start, rrd.end))
delays.append(midpoint - dt1)
else:
release_dates.append(rrd if isinstance(rrd, date) else rrd.date)
release_dates_est.append(rrd.confidence if isinstance(rrd, yfcd.DateEstimate) else yfcd.Confidence.High)
dt2 = rrd if isinstance(rrd, date) else rrd.date
delays.append(dt2 - dt1)
df = pd.DataFrame({'Period end':period_ends, 'PE confidence':period_ends_est, 'Release date':release_dates, 'RD confidence':release_dates_est, 'Delay':delays})
df['Period end'] = pd.to_datetime(df['Period end'])
df['Period end'] = df['Period end'].dt.tz_localize(self.tzName)
df = df.set_index('Period end')
# Set timezone
# release_dates_formatted = []
# for i in range(df.shape[0]):
# idx = df.index[i]
# x = df['Release date'].iloc[i]
# if isinstance(x, tuple):
# x = (pd.to_datetime(x[0]).tz_localize(self.tzName), pd.to_datetime(x[1]).tz_localize(self.tzName))
# else:
# x = pd.to_datetime(x).tz_localize(self.tzName)
# release_dates_formatted.append(x)
# df['Release date'] = release_dates_formatted
return df
def _calc_release_dates(self, period, refresh=True, check=False):
debug = False
# debug = True
if debug:
print(f"_calc_release_dates({period}, refresh={refresh})")
if not isinstance(period, yfcd.ReportingPeriod):
raise Exception('Argument period must be type ReportingPeriod')
yfcu.TypeCheckBool(refresh, 'refresh')
yfcu.TypeCheckBool(check, 'check')
# Get period ends
tbl = None
finType = None
for f in yfcd.Financials:
t = self._get_fin_table(f, period, refresh)
t = self._prune_yf_financial_df(t)
if tbl is None:
tbl = t ; finType = f
elif t is not None and t.shape[1] > tbl.shape[1]:
tbl_wasnt_empty = not tbl.empty
tbl = t ; finType = f
if tbl_wasnt_empty:
break
if tbl is None or tbl.empty:
return None
tbl_cols = tbl.columns
if isinstance(tbl_cols[0], (datetime, pd.Timestamp)):
tbl_cols = [c.date() for c in tbl_cols]
period_ends = [d.date() for d in tbl.columns if d.date() <= d_today]
period_ends.sort(reverse=True)
if debug:
print("- period_ends:")
for x in period_ends:
print(x)
# Get calendar
cal_release_dates = self._get_calendar_dates(refresh)
if debug:
if len(cal_release_dates) == 0:
print("- calendar empty")
else:
print("- cal_release_dates:")
for x in cal_release_dates:
print(x)
# Get earnings dates
edf = self.get_earnings_dates(start=tbl.columns.min().date(), refresh=refresh, clean=False)
# Get full year end date
tbl = None
for f in yfcd.Financials:
t = self._get_fin_table(f, yfcd.ReportingPeriod.Full, refresh=False) # minimise fetches
t = self._prune_yf_financial_df(t)
if t is not None and not t.empty:
tbl = t
break
if tbl is None and refresh:
for f in yfcd.Financials:
t = self._get_fin_table(f, yfcd.ReportingPeriod.Full, refresh)
t = self._prune_yf_financial_df(t)
if t is not None and not t.empty:
tbl = t
break
if not tbl.empty:
year_end = tbl.columns.max().date()
else:
year_end = None
if pd.isna(year_end):
print(tbl.iloc[0:4])
raise Exception("'year_end' is NaN")
if debug:
print("- year_end =", year_end)
# Clean earnings dates
if (edf is None) or (edf.shape[0]==0):
if debug:
print("- earnings_dates table is empty")
release_dates = cal_release_dates
else:
# Prune old dates
f_old = edf.index.date < period_ends[-1]
if f_old.any():
edf = edf[~f_old]
if edf.shape[0] > 1:
# Drop dates that occurred just before another
edf = edf.sort_index(ascending=True)
d = edf.index.to_series().diff()
d.iloc[0] = pd.Timedelta(999, unit='d')
x_near = np.abs(d) < pd.Timedelta(5, "days")
if x_near.any():
edf = edf[~x_near]
edf = edf.sort_index(ascending=False)
release_dates = cal_release_dates
for i in range(edf.shape[0]):
dt = edf.index[i].date()
r = edf.iloc[i]
td = None
if td is None:
if pd.isnull(r["Reported EPS"]) and pd.isnull(r["Surprise(%)"]) and not r['Date confirmed?']:
td = yfcd.DateEstimate(dt, yfcd.Confidence.Medium)
else:
td = dt
# Protect against duplicating entries in calendar
duplicate = False
for c in release_dates:
diff = c - td
try:
duplicate = diff > timedelta(days=-20) and diff < timedelta(days=20)
except yfcd.AmbiguousComparisonException:
p1 = diff.prob_gt(timedelta(days=-20))
p2 = diff.prob_lt(timedelta(days=20))
duplicate = p1 > 0.9 and p2 > 0.9
if duplicate:
break
if not duplicate:
release_dates.append(td)
if debug:
print("- edf:")
print(edf)
release_dates.sort(reverse=True)
print("- release_dates:")
pprint(release_dates)
# Deduce interval
if period == yfcd.ReportingPeriod.Full:
interval_td = interval_str_to_days['ANNUAL']
else:
interval_td = self._get_interval(finType, refresh)
if debug:
print(f"- interval_td = {interval_td}")
# Now combine known dates into 'Earnings Releases':
if debug:
print("# Now combine known dates into 'Earnings Releases':")
releases = []
for d in period_ends:
r = EarningsRelease(interval_td, d, None, year_end)
releases.append(r)
if debug:
releases.sort()
print("> releases with known period-end-dates:")
pprint(releases)
# Fill gap between last release and now+9mo with estimated releases
if debug:
print("# Fill gap between last release and now with estimated releases")
releases.sort(reverse=True)
last_release = releases[0]
if debug:
print("- last_release:", last_release)
ct = 0
while True:
ct += 1
if ct > 10:
for r in releases:
print(r)
print("interval_td = {0}".format(interval_td))
raise Exception("Infinite loop detected while estimating next financial report")
next_period_end = yfcd.DateEstimate(interval_td + last_release.period_end, yfcd.Confidence.High)
r = EarningsRelease(interval_td, next_period_end, None, year_end)
releases.insert(0, r)
last_release = r
if debug:
print("Inserting:", r)
try:
if r.period_end > (d_today+timedelta(days=270)):
break
except yfcd.AmbiguousComparisonException:
p = r.period_end.prob_gt(d_today+timedelta(days=270))
if p > 0.9:
break
if debug:
releases.sort()
print("# Intermediate set of releases:")
pprint(releases)
if release_dates is None or len(release_dates) == 0:
if debug:
print("No release dates in Yahoo so estimating all with Low confidence")
for i in range(len(releases)):
releases[i].release_date = yfcd.DateEstimate(releases[i].period_end+timedelta(days=5)+yfcd.confidence_to_buffer[yfcd.Confidence.Low], yfcd.Confidence.Low)
return releases
release_dates.sort()
# Add more releases to ensure their date range fully overlaps with release dates
release_dates.sort()
releases.sort()
ct = 0
while True:
try:
gt_than = releases[0].period_end > release_dates[0]
except yfcd.AmbiguousComparisonException:
if hasattr(releases[0].period_end, 'prob_gt'):
p = releases[0].period_end.prob_gt(release_dates[0])
else:
p = release_dates[0].prob_lt(releases[0].period_end)
gt_than = p > 0.9
if not gt_than:
break
ct += 1
if ct > 100:
raise Exception("Infinite loop detected while adding release objects")
prev_period_end = releases[-1].period_end - interval_td
conf = yfcd.Confidence.High
if isinstance(prev_period_end, date):
prev_period_end = yfcd.DateEstimate(prev_period_end, conf)
else:
prev_period_end = yfcd.DateEstimate(prev_period_end.date, min(prev_period_end.confidence, conf))
r = EarningsRelease(interval_td, prev_period_end, None, year_end)
releases.insert(0, r)
if debug:
print("Inserting:", r)
ct = 0
while True:
try:
less_than = releases[-1].period_end+interval_td < release_dates[-1]
except yfcd.AmbiguousComparisonException:
p = (releases[-1].period_end+interval_td).prob_lt(release_dates[-1])
less_than = p > 0.5
if not less_than:
break
ct += 1
if ct > 20:
raise Exception("Infinite loop detected while adding release objects")
next_period_end = releases[-1].period_end + interval_td
if isinstance(next_period_end, date):
next_period_end = yfcd.DateEstimate(next_period_end, yfcd.Confidence.Medium)
else:
next_period_end = yfcd.DateEstimate(next_period_end.date, min(next_period_end.confidence, yfcd.Confidence.Medium))
r = EarningsRelease(interval_td, next_period_end, None, year_end)
releases.append(r)
if debug:
print("Appending:", r)
# Fill in gaps in periods with estimates:
for i in range(len(releases)-2, -1, -1):
while True:
r0 = releases[i]
r1 = releases[i+1]
try:
diff = r1.period_end - r0.period_end
gap_too_large = (diff/1.5) > interval_td
except yfcd.AmbiguousComparisonException:
gap_too_large = False
if gap_too_large:
new_r = EarningsRelease(interval_td, r1.period_end - interval_td, None, year_end)
if debug:
print(f"Inserting release estimate into gap: {new_r} (diff={diff}, interval_td={interval_td}, {type(interval_td)})")
releases.insert(i+1, new_r)
else:
break
if debug:
releases.sort()
print("# Final set of releases:")
pprint(releases)
# Assign known dates to appropriate release(s) without dates
if debug:
print("# Assigning known dates to releases ...")
releases = sort_estimates(releases)
release_dates.sort()
for i in range(len(release_dates)):
dt = release_dates[i]
if debug:
print("- dt =", dt)
# Find most recent period-end:
rj = 0
for j in range(1, len(releases)):
try:
if releases[j].period_end > (dt-company_release_delay):
break
except yfcd.AmbiguousComparisonException:
if hasattr(releases[j].period_end, "prob_gt"):
p = releases[j].period_end.prob_gt(dt-company_release_delay)
else:
p = (dt-company_release_delay).prob_lt(releases[j].period_end)
if debug:
print(f" - prob. that {releases[j].period_end} > {dt-company_release_delay} = {100.0*p:.1f}%")
if p > 0.5:
break
rj = j
r = releases[rj]
if debug:
print(" - rj =", rj, ", r =", r)
if r.release_date is not None:
# Already assigned an earlier release date.
dt_is_for_same_period = False
if isinstance(dt, date) and dt in edf.index.date:
if isinstance(r.release_date, date) and r.release_date in edf.index.date:
# Not great because assumes two consecutive earnings don't report same EPS
dt_is_for_same_period1 = edf['Reported EPS'].loc[str(dt)].iloc[0] == edf['Reported EPS'].loc[str(r.release_date)].iloc[0]
dt_is_for_same_period2 = edf['EPS Estimate'].loc[str(dt)].iloc[0] == edf['EPS Estimate'].loc[str(r.release_date)].iloc[0]
dt_is_for_same_period = dt_is_for_same_period1 and dt_is_for_same_period2
if dt_is_for_same_period:
# Assume the earlier release was just a preliminary cash-flow update, and that
# this later release is the full financials report
if debug:
print(f" - assume earlier release {r.release_date} was just a preliminary cash-flow update, and that")
print(f" - this later release {dt} is the full financials report")
r.release_date = dt
continue
dt_is_better = False
if not hasattr(dt, 'confidence'):
if hasattr(r.release_date, 'confidence'):
# Maybe the previously-assigned date was estimate, from bad Yahoo data
dt_is_better = True
else:
if hasattr(r.release_date, 'confidence') and dt.confidence > r.release_date.confidence:
dt_is_better = True
if debug:
print(" - dt_is_better =", dt_is_better)
if dt_is_better:
if debug:
print(f" - dt={dt} is more accurate than date already assigned to {r}. so overwrite with dt")
print(f" - discarding previously assigned dt={r.release_date}")
r.release_date = dt
continue
try:
quarterly = interval_td <= timedelta(days=100)
except yfcd.AmbiguousComparisonException:
quarterly = True
if not quarterly:
# Not quarterly releases so can't safely reassign dates.
# Probably this date 'dt' is for a cashflow update, not
# full earnings release.
# So treat this date 'dt' unassignable.
if debug:
print(f" - because not quarterly, have to discard unassigned date {dt}")
continue
r_is_end_of_year = r.is_end_of_year()
if debug:
print(" - r_is_end_of_year =", r_is_end_of_year)
# if r_is_end_of_year and rj > 0:
# # For annual reports, allow reassigned date to previous release,
# # because annual reports can take longer to release.
# if (releases[rj-1].release_date is not None) and (not dt_is_better):
# # print("- dt =", dt)
# # print("- dt_is_for_same_period =", dt_is_for_same_period)
# # print("- r_is_end_of_year =", r_is_end_of_year)
# # print("- this release:", releases[rj])
# # print("- previous release:", releases[rj-1])
# # print("- edf:")
# # print(edf.drop(['FetchDate'], axis=1))
# # print(edf.columns)
# # print("- release_dates:") ; pprint(release_dates)
# # raise Exception('Expected prior report to not be assigned date')
# # Update:
# # If this date not better, then just discard.
# pass
# else:
# if debug:
# print(f" - reassigning dt={releases[rj].release_date} to previous report {releases[rj-1]}")
# releases[rj-1].release_date = r.release_date
# r.release_date = dt
# else:
# if debug:
# print(f" - discarding previously assigned dt={releases[rj].release_date}")
# Update: refactor logic
if r_is_end_of_year or dt_is_better:
# First, decide whether to reassign assigned date to previous release
if rj > 0 and releases[rj-1].release_date is None:
# if debug:
# print(f" - reassigning dt={releases[rj].release_date} to previous report {releases[rj-1]}")
# releases[rj-1].release_date = r.release_date
#
# But what if I don't? Might be causing trouble no benefit
pass
else:
if debug:
print(f" - discarding previously assigned dt={releases[rj].release_date}")
r.release_date = dt
if debug:
print(f" - assigning dt={dt} to report={r}")
r.release_date = dt
if debug:
releases.sort()
print("> releases with known release dates:")
for r in releases:
print(r)
# Discard date assignments where delays are much higher than average
delays = [(r.release_date - r.period_end) for r in releases if r.release_date is not None]
if len(delays) >= 3:
delays = sort_estimates(delays)
median_delay = delays[len(delays)//2]
delays = [(r.release_date - r.period_end) if r.release_date is not None else timedelta(0) for r in releases]
outliers = np.ones(len(delays), dtype=bool)
for i in range(len(delays)):
r = releases[i]
r_is_end_of_year = r.is_end_of_year()
if debug:
print(" - r_is_end_of_year =", r_is_end_of_year)
threshold = median_delay*3
if r_is_end_of_year:
threshold += timedelta(days=30)
try:
outliers[i] = delays[i] > threshold
except yfcd.AmbiguousComparisonException:
if hasattr(delays[i], 'prob_gt'):
p = delays[i].prob_gt(threshold)
else:
p = threshold.prob_lt(delays[i])
outliers[i] = p > 0.9
for i in np.where(outliers)[0]:
if debug:
print(f"discarding a release date because delay far above median {median_delay}:", releases[i])
releases[i].release_date = None
# For any releases still without release dates, estimate with the following heuristics:
# 1 - if release 12 months before/after has a date (or a multiple of 12), use that +/- 12 months
# 2 - else used previous release + interval
if debug:
print("# Estimating release dates from other releases at similar time-of-year")
report_delay = None
releases.sort()
if any([r.release_date is None for r in releases]):
for try_interval in [365, 365//2, 365//4]:
itd = timedelta(days=try_interval)
for i in range(len(releases)):
if releases[i].release_date is None:
# Need to find a similar release to extrapolate date from
date_set = False
for i2 in range(len(releases)):
if i2==i:
continue
if releases[i2].release_date is not None:
if period == yfcd.ReportingPeriod.Full:
tolerance = timedelta(days=40)
else:
tolerance = timedelta(days=10)
if releases[i2].period_end > releases[i].period_end:
rem = (releases[i2].period_end - releases[i].period_end) % itd
else:
rem = (releases[i].period_end - releases[i2].period_end) % itd
try:
m1 = rem < tolerance
except yfcd.AmbiguousComparisonException:
m1 = rem.prob_lt(tolerance) > 0.9
try:
m2 = abs(rem-itd) < tolerance
except yfcd.AmbiguousComparisonException:
m2 = abs(rem-itd).prob_lt(tolerance) > 0.9
match = m1 or m2
if match:
if debug:
print(f"- matching '{releases[i]}' with '{releases[i2]}' for interval '{try_interval}'")
delay = releases[i2].release_date - releases[i2].period_end
dt = delay + releases[i].period_end
r_is_end_of_year = releases[i].is_end_of_year()
if debug:
print(" - r_is_end_of_year =", r_is_end_of_year)
if r_is_end_of_year and try_interval != 365:
# Annual reports take longer than interims, so add on some more days
if debug:
print(" - adding 14d to dt")
dt += timedelta(days=14)
if not hasattr(dt, 'confidence'):
if r_is_end_of_year and try_interval != 365:
confidence = yfcd.Confidence.Low
else:
confidence = yfcd.Confidence.Medium
if isinstance(dt, date):
dt = yfcd.DateEstimate(dt, confidence)
elif isinstance(dt, yfcd.DateRange):
dt = yfcd.DateRangeEstimate(dt.start, dt.end, confidence)
else:
raise Exception('Need to ensure this value has confidence:', dt)
else:
if r_is_end_of_year and try_interval != 365:
confidences = [yfcd.Confidence.Low]
else:
confidences = [yfcd.Confidence.Medium]
if isinstance(releases[i2].period_end, (yfcd.DateEstimate, yfcd.DateRangeEstimate)):
confidences.append(releases[i2].period_end.confidence)
if isinstance(releases[i2].release_date, (yfcd.DateEstimate, yfcd.DateRangeEstimate)):
confidences.append(releases[i2].release_date.confidence)
dt.confidence = min(confidences)
if i > 0 and (releases[i-1].release_date is not None):
too_close_to_previous = False
try:
if isinstance(releases[i-1].release_date, yfcd.DateEstimate):
too_close_to_previous = releases[i-1].release_date.isclose(dt)
else:
if releases[i-1].is_end_of_year():
threshold = timedelta(days=1)
else:
threshold = timedelta(days=30)
if debug:
diff = dt-releases[i-1].release_date
print(f" - diff = {diff}")
print(f" - threshold = {threshold}")
too_close_to_previous = (dt-releases[i-1].release_date) < threshold
except yfcd.AmbiguousComparisonException:
p = (dt-releases[i-1].release_date).prob_lt(threshold)
too_close_to_previous = p > 0.9
if too_close_to_previous:
if debug:
print(f" - dt '{dt}' would be too close to previous release date '{releases[i-1]}'")
# Too close to last release date
continue
releases[i].release_date = dt
date_set = True
if debug:
print(" - estimated release date {} of period-end {} from period-end {}".format(releases[i].release_date, releases[i].period_end, releases[i2].period_end))
break
if date_set and (report_delay is not None):
releases[i].release_date.date += report_delay
if debug:
print("> releases after estimating release dates:")
for r in releases:
print(r)
any_release_has_date = False
for r in releases:
if r.release_date is not None:
any_release_has_date = True
break
if not any_release_has_date:
if debug:
print(f"- unable to map all {period} financials to release dates")
return None
# Check for any releases still missing a release date that could be the Last earnings release:
if any([r.release_date is None for r in releases]):
for i in range(len(releases)):
r = releases[i]
if r.release_date is None:
problem = False
if i == len(releases)-1:
problem = True
else:
r2 = releases[i+1]
if (r2.release_date is not None) and (r2.release_date > d_today):
problem = True
if problem:
print(r)
raise Exception("A release that could be last is missing release date")
if debug:
print("> releases after estimating release dates:")
for r in releases:
print(r)
if check:
self._check_release_dates(releases, finType, period, refresh)
return releases
def _check_release_dates(self, releases, finType, period, refresh):
# if period == yfcd.ReportingPeriod.Full:
# interval_td = interval_str_to_days['ANNUAL']
# else:
# interval_td = self._get_interval(finType, refresh)
# Ignore releases with no date:
# - can happen with nonsense financials dates from Yahoo that
# even my prune function couldn't safely remove
releases = [r for r in releases if r.release_date is not None]
for i0 in range(len(releases)-1):
r0 = releases[i0]
r0rd = r0.release_date
if hasattr(r0rd, 'confidence') and r0rd.confidence == yfcd.Confidence.Low:
continue
for i1 in range(i0+1, len(releases)):
r1 = releases[i1]
r1rd = r1.release_date
if hasattr(r1rd, 'confidence') and r1rd.confidence == yfcd.Confidence.Low:
continue
#
if isinstance(r0rd, date) and isinstance(r1rd, date):
isclose = r0rd == r1rd
elif isinstance(r0rd, date):
isclose = r1rd.isclose(r0rd)
else:
isclose = r0rd.isclose(r1rd)
if isclose:
print(r0)
print(r1)
raise Exception(f'{self.ticker} Release dates have been assigned multiple times')
if not r0.is_end_of_year():
try:
# bad_order = r0.release_date > r1.period_end
bad_order = r0.release_date > (r1.period_end+timedelta(days=7))
except yfcd.AmbiguousComparisonException:
p = r0.release_date.prob_gt(r1.period_end+timedelta(days=7))
bad_order = p > 0.9
# try:
# bad_order = bad_order and ((r1.period_end - r0.period_end)*2.0 > interval_td)
# except yfcd.AmbiguousComparisonException:
# bad_order = False
if bad_order:
pprint(releases)
print(r0)
print(r1)
raise Exception(f'{self.ticker} Some releases dates are after next period ends')
#
for r in releases:
try:
is_negative = r.release_date < r.period_end
except yfcd.AmbiguousComparisonException:
p = r.release_date.prob_lt(r.period_end)
is_negative = p > 0.9
if is_negative:
diff = r.release_date - r.period_end
print("- rd =", r.release_date, type(r.release_date))
print("- pe =", r.period_end, type(r.period_end))
print("- diff =", diff, type(diff))
print(r)
raise Exception('Release dates contains negative delays')
def _prune_yf_financial_df(self, df):
debug = False
# debug = True
if df is None or df.empty:
return df
## Fiddly to put dates into a list and sort without reordering dataframe and without down-casting the date types!
dates = [d for d in df.columns]
dates.sort()
cache_key = tuple([df.index[0]] + dates)
if cache_key in self._pruned_tbl_cache:
return self._pruned_tbl_cache[cache_key]
# Drop duplicated columns
if len(set(dates)) != len(dates):
## Search for duplicated columns
df = df.T.drop_duplicates().T
dates = [d for d in df.columns]
dates.sort()
# Drop mostly-NaN duplicated dates:
df_modified = False
if len(set(dates)) != len(dates):
for dt in set(dates):
dff = df[dt]
if len(dff.shape) == 2 and dff.shape[1] == 2:
# This date is duplicated, so count NaNs:
n_dups = dff.shape[1]
dt_indices = np.where(df.columns == dt)[0]
is_mostly_nans = np.array([False]*n_dups)
for i in range(n_dups):
dt_idx = dt_indices[i]
is_mostly_nans[i] = df.iloc[:,dt_idx].isnull().sum() > int(df.shape[0]*0.75)
if is_mostly_nans.sum() == n_dups-1:
## All but one column are mostly nans, perfect!
drop_indices = dt_indices[is_mostly_nans]
indices = np.array(range(df.shape[1]))
keep_indices = indices[~np.isin(indices, drop_indices)]
df = df.iloc[:,keep_indices].copy()
df_modified = True
dff = df[dt]
if len(dff.shape) == 2 and dff.shape[1] == 2:
# Date still duplicated.
# Find instance with most non-nan values; if
# all other instances are equal or nan then drop.
n_dups = dff.shape[1]
dt_indices = np.where(df.columns == dt)[0]
nan_counts = np.zeros(n_dups)
for i in range(n_dups):
dt_idx = dt_indices[i]
nan_counts[i] = df.iloc[:,dt_idx].isnull().sum()
idx_min_na = 0
for i in range(1,n_dups):
if nan_counts[i] < nan_counts[idx_min_na]:
idx_min_na = i
drop_indices = []
for i in range(n_dups):
if i == idx_min_na:
continue
min_idx = dt_indices[idx_min_na]
dt_idx = dt_indices[i]
f_match = df.iloc[:,dt_idx].isnull() | (df.iloc[:,dt_idx]==df.iloc[:,min_idx])
if f_match.all():
drop_indices.append(dt_idx)
if len(drop_indices)>0:
indices = np.array(range(df.shape[1]))
keep_indices = indices[~np.isin(indices, drop_indices)]
df = df.iloc[:,keep_indices].copy()
df_modified = True
if df_modified:
dates = [d for d in df.columns]
dates.sort()
# If duplicated date columns is very similar, then drop right-most:
df_modified = False
if len(set(dates)) != len(dates):
for dt in set(dates):
dff = df[dt]
if len(dff.shape) == 2 and dff.shape[1] == 2:
dff.columns = [str(dff.columns[i])+str(i) for i in range(dff.shape[1])]
# r = dff.diff(axis=1)
r = (dff[dff.columns[0]] - dff[dff.columns[1]]).abs() / dff[dff.columns[0]]
r = r.sum()
if r < 0.15:
df = df.drop(dt, axis=1)
df[dt] = dff[dff.columns[0]]
df_modified = True
if df_modified:
dates = [d for d in df.columns]
dates.sort()
if len(set(dates)) != len(dates):
print(df)
print("Dates: {}".format(dates))
raise Exception("Duplicate dates found in financial df")
# Search for mostly-nan columns, where the non-nan values are exact match to an adjacent column.
# Replace those nans with adjacent column values.
# Optimise:
df_isnull = df.isnull()
df_isnull_sums = df_isnull.sum()
nan_threshold = int(df.shape[0]*0.75)
for i1 in range(1, len(dates)):
d1 = dates[i1]
d0 = dates[i1-1]
d0_mostly_nans = df_isnull_sums[d0] > nan_threshold
d1_mostly_nans = df_isnull_sums[d1] > nan_threshold
if d0_mostly_nans and not d1_mostly_nans:
f = (~df_isnull[d0]) & (~df_isnull[d1])
if np.sum(f) >= 2:
# At least two actual values
if np.array_equal(df.loc[f,d0], df.loc[f,d1]):
# and those values match
df[d0] = df[d1].copy()
elif d1_mostly_nans and not d0_mostly_nans:
f = (~df_isnull[d1]) & (~df_isnull[d0])
if np.sum(f) >= 2:
# At least two actual values
if np.array_equal(df.loc[f,d1], df.loc[f,d0]):
# and those values match
df[d1] = df[d0].copy()
# Drop mostly-nan columns:
df_modified = False
for i in range(len(dates)-1, -1, -1):
d = dates[i]
# if df[d].isnull().sum() == df.shape[0]:
# # Full of nans, drop column:
if np.sum(df[d].isnull()) > nan_threshold:
# Mostly nans, drop column
if debug:
print(f"_prune_yf_financial_df(): column {d} is mostly NaNs")
df = df.drop(d, axis=1)
df_modified = True
if df_modified:
dates = [d for d in df.columns]
dates.sort()
# # Then drop all columns devoid of data (NaN and 0.0):
# for i in range(len(dates)-1, -1, -1):
# d = dates[i]
# fnan = df[d].isnull()
# fzero = df[d]==0.0
# if sum(np_or(fnan, fzero)) == df.shape[0]:
# # Completely devoid of data, drop column
# df = df.drop(d, axis=1)
# Search for populated columns, where values are very similar.
similarity_pct_threshold = 0.8
for i in range(len(dates)-2, -1, -1):
d1 = dates[i+1]
d0 = dates[i]
delta = d1 - d0
similarity_pct = np.sum(df[d0] == df[d1]) / df.shape[0]
if df.shape[0] > 10 and delta < timedelta(days=45) and similarity_pct > similarity_pct_threshold:
if debug:
print(f"{d0.date()} very similar & close to {d1.date()}, discarding later")
# df = df.drop(d1, axis=1)
# Instead of arbitrarily dropping one date, be smart.
# Keep the one that makes most sense relative to distances to other dates
diffs0 = [] ; diffs1 = []
if i > 0:
diffs0.append((dates[i] - dates[i-1]).days)
diffs1.append((dates[i+1] - dates[i-1]).days)
if i < (len(dates)-2):
diffs0.append((dates[i+2] - dates[i]).days)
diffs1.append((dates[i+2] - dates[i+1]).days)
diffs0 = [min(abs(d-91), abs(d-182), abs(d-365)) for d in diffs0]
diffs1 = [min(abs(d-91), abs(d-182), abs(d-365)) for d in diffs1]
if mean(diffs0) < mean(diffs1):
df = df.drop(d1, axis=1)
else:
df = df.drop(d0, axis=1)
dates = [d for d in df.columns]
dates.sort()
if len(set(dates)) != len(dates):
print(f"Dates: {dates}")
raise Exception("Duplicate dates found in financial df")
# Remove columns which YF created by backfilling
df = df[df.columns.sort_values(ascending=False)]
dates = [d for d in df.columns]
for i1 in range(1, len(dates)):
d0 = dates[i1-1]
d1 = dates[i1]
d0_values = df[d0].copy()
d1_values = df[d1].copy()
d0_values.loc[d0_values.isna()] = 0.0
d1_values.loc[d1_values.isna()] = 0.0
if np.array_equal(d0_values.values, d1_values.values):
if debug:
print(f"_prune_yf_financial_df(): column {d0} appears backfilled by Yahoo")
df = df.drop(d0, axis=1)
df = df[df.columns.sort_values(ascending=True)]
if df.empty:
raise Exception("_prune_yf_financial_df() has removed all columns")
self._pruned_tbl_cache[cache_key] = df
return df
def _earnings_interval(self, with_report, refresh=True):
# Use cached data to deduce interval regardless of 'refresh'.
# If refresh=True, only refresh if cached data not good enough.
yfcu.TypeCheckBool(with_report, 'with_report')
yfcu.TypeCheckBool(refresh, 'refresh')
debug = False
# debug = True
if debug:
print(f'_earnings_interval(with_report={with_report}, refresh={refresh})')
interval = None
inference_successful = False
if not with_report:
edf = self.get_earnings_dates(start=d_today-timedelta(days=730), refresh=False)
if (edf is None or edf.shape[0] <= 3) and refresh:
edf = self.get_earnings_dates(start=d_today-timedelta(days=730), refresh=refresh)
if edf is not None and edf.shape[0] > 3:
# First, remove duplicates:
deltas = np.flip((np.diff(np.flip(edf.index.date)) / pd.Timedelta(1, unit='D')))
f = np.append(deltas > 0.5, True)
edf = edf[f].copy()
edf_old = edf[edf.index.date < date.today()]
if edf_old.shape[0] > 3:
edf = edf_old.copy()
deltas = (np.diff(np.flip(edf.index.date)) / pd.Timedelta(1, unit='D'))
if (deltas == deltas[0]).all():
# Identical, perfect
interval_days = deltas[0]
std_pct_mean = 0.0
else:
# Discard large outliers
z_scores = np.abs(stats.zscore(deltas))
deltas_pruned = deltas[z_scores < 1.4]
# Discard small deltas
deltas_pruned = deltas_pruned[deltas_pruned > 10.0]
std_pct_mean = np.std(deltas) / np.mean(deltas)
interval_days = np.mean(deltas_pruned)
if debug:
print("- interval_days:", interval_days)
if std_pct_mean < 0.68:
tol = 20
if abs(interval_days-365) < tol:
interval = 'ANNUAL'
elif abs(interval_days-182) < tol:
interval = 'HALF'
elif abs(interval_days-91) < tol:
interval = 'QUART'
if interval is not None:
return interval_str_to_days[interval]
if debug:
print("- insufficient data in earnings_dates, analysing financials columns")
tbl_bs = self.get_quarterly_balance_sheet(refresh=False)
tbl_fi = self.get_quarterly_income_stmt(refresh=False)
tbl_cf = self.get_quarterly_cashflow(refresh=False)
if refresh:
if tbl_bs is None:
tbl_bs = self.get_quarterly_balance_sheet(refresh)
if tbl_fi is None:
tbl_fi = self.get_quarterly_income_stmt(refresh)
if tbl_cf is None:
tbl_cf = self.get_quarterly_cashflow(refresh)
tbl_bs = self._prune_yf_financial_df(tbl_bs)
tbl_fi = self._prune_yf_financial_df(tbl_fi)
tbl_cf = self._prune_yf_financial_df(tbl_cf)
if with_report:
# Expect all 3x financials present
if tbl_bs is None or tbl_bs.empty or tbl_fi is None or tbl_fi.empty or tbl_cf is None or tbl_cf.empty:
# Cannot be sure, but can estimate from any present table
if tbl_bs is not None and not tbl_bs.empty:
tbl = tbl_bs
elif tbl_fi is not None and not tbl_fi.empty:
tbl = tbl_fi
else:
tbl = tbl_cf
else:
tbl = tbl_bs
else:
# Use whichever is available with most columns
tbl = tbl_bs
if tbl_fi is not None and len(tbl_fi.columns) > len(tbl.columns):
tbl = tbl_fi
if tbl_cf is not None and len(tbl_cf.columns) > len(tbl.columns):
tbl = tbl_cf
if debug:
print("- tbl:") ; print(tbl)
if tbl is not None and not tbl.empty and tbl.shape[0] > 1:
return self._get_interval_from_table(tbl)
if not inference_successful:
interval = yfcd.TimedeltaEstimate(interval_str_to_days['HALF'], yfcd.Confidence.Medium)
return interval
[docs]
def get_earnings_dates(self, start, refresh=True, clean=True):
start_dt, start = yfcu.ProcessUserDt(start, self.tzName)
yfcu.TypeCheckDateStrict(start, 'start')
yfcu.TypeCheckBool(refresh, 'refresh')
yfcu.TypeCheckBool(clean, 'clean')
debug = False
# debug = True
if debug:
print(f"get_earnings_dates(start={start}, refresh={refresh})")
dt_now = pd.Timestamp.utcnow().tz_convert(self.tzName)
last_fetch = None
if self._earnings_dates is None:
if yfcm.IsDatumCached(self.ticker, "earnings_dates"):
if debug:
print("- retrieving earnings dates from cache")
self._earnings_dates, md = yfcm.ReadCacheDatum(self.ticker, "earnings_dates", True)
if md is None:
md = {}
if self._earnings_dates is None:
# Fine, just means last call failed to get earnings_dates
pass
else:
if 'LastFetch' not in md:
raise Exception("f{self.ticker}: Why earnings_dates metadata missing 'LastFetch'?")
fp = yfcm.GetFilepath(self.ticker, "earnings_dates")
last_fetch = datetime.fromtimestamp(os.path.getmtime(fp)).astimezone()
md['LastFetch'] = last_fetch
yfcm.WriteCacheMetadata(self.ticker, "earnings_dates", 'LastFetch', md['LastFetch'])
if self._earnings_dates.empty:
self._earnings_dates = None
else:
edf_clean = self._clean_earnings_dates(self._earnings_dates, refresh)
if len(edf_clean) < len(self._earnings_dates):
# This is ok, because since the last fetch, the calendar can be updated which then allows resolving a
# near-duplication in earnings_dates.
yfcm.StoreCacheDatum(self.ticker, "earnings_dates", edf_clean)
self._earnings_dates = edf_clean
last_fetch = yfcm.ReadCacheMetadata(self.ticker, "earnings_dates", "LastFetch")
if debug:
print("- last_fetch =", last_fetch)
# Ensure column 'Date confirmed?' is present, and update with calendar
df_modified = False
if self._earnings_dates is not None:
if 'Date confirmed?' not in self._earnings_dates.columns:
self._earnings_dates['Date confirmed?'] = False
df_modified = True
cal = self.get_calendar(refresh)
if cal is not None and len(cal['Earnings Date']) == 1:
x = cal['Earnings Date'][0]
for dt in self._earnings_dates.index:
if abs(dt.date() - x) < timedelta(days=7):
# Assume same release
try:
if not self._earnings_dates['Date confirmed?'].loc[dt]:
self._earnings_dates.loc[dt, 'Date confirmed?'] = True
df_modified = True
break
except Exception:
print("- dt:", dt)
print("- edf:") ; print(self._earnings_dates)
raise
if not refresh:
if df_modified:
yfcm.StoreCacheDatum(self.ticker, "earnings_dates", self._earnings_dates)
if debug:
print("get_earnings_dates() returning")
if self._earnings_dates is not None:
if start_dt > self._earnings_dates.index[-1]:
return self._earnings_dates.sort_index().loc[start_dt:].sort_index(ascending=False).copy()
else:
return self._earnings_dates.copy()
else:
return None
# Limit spam:
yf_start_date = yfcm.ReadCacheMetadata(self.ticker, 'earnings_dates', 'start_date')
if debug:
print("- yf_start_date =", yf_start_date)
if last_fetch is not None:
if (last_fetch + pd.Timedelta('14d')) > dt_now:
# Avoid spamming Yahoo for data it doesn't have (empty earnings_dates).
if self._earnings_dates is None:
# Already attempted a fetch recently, Yahoo has nothing.
if debug:
print("avoiding refetch")
refresh = False
# Avoid spamming Yahoo for new future dates
if self._earnings_dates is not None:
if yf_start_date is not None:
# Cache has all previous earnings dates
refresh = False
elif start > self._earnings_dates.index.date[-1]:
refresh = False
if debug:
print("- refresh =", refresh)
if refresh:
ei = self._earnings_interval(with_report=False, refresh=False)
if isinstance(ei, yfcd.TimedeltaEstimate):
# Don't care about confidence
ei = ei.td
elif isinstance(ei, yfcd.TimedeltaRangeEstimate):
ei = mean([ei.td1, ei.td2])
if isinstance(ei, (yfcd.ComparableRelativedelta, relativedelta)):
# Convert to normal Timedelta, don't need 100% precision
if ei.months == 3:
ei = pd.Timedelta('91d')
elif ei.months == 6:
ei = pd.Timedelta('182d')
elif ei.months == 12 or ei.years==1:
# ei = pd.Timedelta('365d')
# Don't believe it
ei = pd.Timedelta('182d')
else:
raise Exception(ei, type(ei))
lookahead_dt = dt_now + pd.Timedelta('365d')
if debug:
print("- ei =", ei)
print("- lookahead_dt =", lookahead_dt)
next_rd = None
if self._earnings_dates is None or (start_dt < self._earnings_dates.index[-1] and yf_start_date is None):
total_refetch = True
n_intervals_to_fetch = int(math.floor(Decimal(1.25*(lookahead_dt - start_dt) / ei)))
else:
total_refetch = False
df = self._earnings_dates.copy()
f_na = df['Reported EPS'].isna().to_numpy()
f_nna = ~f_na
f_expired = f_na & (df.index < dt_now) & ((dt_now - df['FetchDate']) > pd.Timedelta('7d')).to_numpy()
n = df.shape[0]
if debug:
print("- n =", n)
n_intervals_missing_after = int(math.floor(Decimal((lookahead_dt - df.index[0]) / ei)))
any_expired = f_expired.any()
if debug:
print("- n_intervals_missing_after =", n_intervals_missing_after)
print("- any_expired =", any_expired)
if not any_expired:
# ToDo: avoid refetching if next earnings after last fetch is (far) in future.
if f_nna.any():
if debug:
print("- checking against release dates ...")
rds = self.get_release_dates(yfcd.ReportingPeriod.Interim, as_df=False, refresh=False)
if rds is not None:
latest_certain_dt = df.index[np.where(f_nna)[0][0]].date()
for i in range(len(rds)):
try:
in_future = rds[i].release_date > latest_certain_dt
except yfcd.AmbiguousComparisonException:
p = rds[i].release_date.prob_gt(latest_certain_dt)
in_future = p > 0.9
if in_future:
next_rd = rds[i]
break
try:
next_rd_in_future = next_rd.release_date > (d_today + timedelta(days=7))
except yfcd.AmbiguousComparisonException:
p = next_rd.release_date.prob_gt(d_today + timedelta(days=7))
next_rd_in_future = p > 0.9
if next_rd_in_future:
# Avoid fetching while far from next earnings release
n_intervals_missing_after = 0
n_intervals_to_fetch = n_intervals_missing_after
else:
earliest_expired_idx = np.where(f_expired)[0][-1]
n_intervals_expired = earliest_expired_idx + 1
n_intervals_to_fetch = n_intervals_expired + n_intervals_missing_after
if n_intervals_to_fetch > 0:
# Ensure always fetching more than necessary
n_intervals_to_fetch += 8
if debug:
print("- n_intervals_to_fetch =", n_intervals_to_fetch)
if n_intervals_to_fetch > 0:
if debug:
print("- total_refetch =", total_refetch)
try:
new_df = self._fetch_earnings_dates(n_intervals_to_fetch, refresh)
except Exception:
print("- self._earnings_dates:") ; print(self._earnings_dates)
print("- start:", start)
print("- yf_start_date:", yf_start_date)
print("- last_fetch:", last_fetch)
print("- ei:", ei)
print("- next_rd:", next_rd)
print("- n_intervals_to_fetch:", n_intervals_to_fetch)
raise
# Sanity test:
if new_df is not None and not new_df.empty:
edf_clean = self._clean_earnings_dates(new_df, refresh)
if len(edf_clean) < len(new_df):
print("- edf:") ; print(new_df[['EPS Estimate', 'Reported EPS', 'FetchDate']])
print("- after clean:") ; print(edf_clean[['EPS Estimate', 'Reported EPS', 'FetchDate']])
raise Exception(f'{self.ticker}: We literally just fetched earnings dates, why not cleaned?')
yfcm.StoreCacheDatum(self.ticker, "earnings_dates", edf_clean)
yfcm.WriteCacheMetadata(self.ticker, "earnings_dates", 'LastFetch', dt_now)
if debug:
print("- new_df:") ; print(new_df)
if new_df is not None and not new_df.empty:
if self._earnings_dates is not None:
df_old = self._earnings_dates[self._earnings_dates.index < (new_df.index[-1]-timedelta(days=14))]
if not df_old.empty:
new_df = pd.concat([new_df, df_old])
if debug:
print("- new_df:") ; print(new_df)
self._earnings_dates = new_df
df_modified = True
if df_modified:
if self._earnings_dates is None:
yfcm.StoreCacheDatum(self.ticker, "earnings_dates", pd.DataFrame())
else:
yfcm.StoreCacheDatum(self.ticker, "earnings_dates", self._earnings_dates)
df = None
if debug:
print("get_earnings_dates() returning")
if self._earnings_dates is not None:
if start_dt > self._earnings_dates.index[-1]:
df = self._earnings_dates.sort_index().loc[start_dt:].sort_index(ascending=False)
else:
df = self._earnings_dates
if clean:
df = df.drop(["FetchDate", "Date confirmed?"], axis=1, errors='ignore')
return df.copy()
else:
return None
def _clean_earnings_dates(self, edf, refresh=True):
edf = edf.sort_index(ascending=False)
# In rare cases, Yahoo has duplicated a date with different company name.
# Retain the row with most data.
for i in range(len(edf)-1, 0, -1):
if edf.index[i-1] == edf.index[i]:
mask = np.ones(len(edf), dtype=bool)
if edf.iloc[i-1].isna().sum() > edf.iloc[i].isna().sum():
# Discard row i-1
mask[i-1] = False
else:
# Discard row i
mask[i] = False
edf = edf[mask].copy()
for i in range(len(edf)-2, -1, -1):
if (edf.index[i]-edf.index[i+1]) < timedelta(days=7):
# One must go
if edf['FetchDate'].iloc[i] > edf['FetchDate'].iloc[i+1]:
edf = edf.drop(edf.index[i+1])
elif edf['FetchDate'].iloc[i+1] > edf['FetchDate'].iloc[i]:
edf = edf.drop(edf.index[i])
else:
cal = self.get_calendar(refresh)
if cal is None:
# print(edf.iloc[i:i+2])
# raise Exception('Review how to handle 2x almost-equal earnings dates.')
# pass # Can't do anything with certainty
# Keep earlier
if edf.index[i] < edf.index[i+1]:
edf = edf.drop(edf.index[i+1])
else:
edf = edf.drop(edf.index[i])
else:
# Cross-check against calendar
dts = cal['Earnings Date']
if len(dts) == 1 and dts[0] in [edf.index[i].date(), edf.index[i+1].date()]:
if edf.index[i].date() == dts[0]:
edf = edf.drop(edf.index[i+1])
else:
edf = edf.drop(edf.index[i])
else:
# print(edf.iloc[i:i+2])
# raise Exception('Review how to handle 2x almost-equal earnings dates.')
# pass # Can't do anything with certainty
# Keep earlier
if edf.index[i] < edf.index[i+1]:
edf = edf.drop(edf.index[i+1])
else:
edf = edf.drop(edf.index[i])
return edf
def _fetch_earnings_dates(self, limit, refresh=True):
yfcu.TypeCheckInt(limit, "limit")
yfcu.TypeCheckBool(refresh, "refresh")
debug = False
# debug = True
if debug:
print(f"{self.ticker}: _fetch_earnings_dates(limit={limit}, refresh={refresh})")
elif print_fetches:
print(f"{self.ticker}: fetching {limit} earnings dates")
repeat_fetch = False
try:
df = self.dat.get_earnings_dates(limit)
except KeyError as e:
if "Earnings Date" in str(e):
# Rarely, Yahoo returns a completely different table for earnings dates.
# Try again.
repeat_fetch = True
else:
raise
if repeat_fetch:
sleep(1)
# Avoid cache this time, but add sleeps to maintain rate-limiting
df = yf.Ticker(self.ticker).get_earnings_dates(limit)
sleep(1)
if df is None or df.empty:
if debug:
print("- Yahoo returned None")
return None
df['FetchDate'] = pd.Timestamp.utcnow().tz_convert(self.tzName)
if df.shape[0] < limit:
if debug:
print("- detected earnings_dates start at", df.index.min())
yfcm.WriteCacheMetadata(self.ticker, 'earnings_dates', 'start_date', df.index.min())
cal = self.get_calendar(refresh)
df['Date confirmed?'] = False
if cal is not None and len(cal['Earnings Date']) == 1:
x = cal['Earnings Date'][0]
for dt in df.index:
if abs(dt.date() - x) < timedelta(days=7):
# Assume same release
df.loc[dt, 'Date confirmed?'] = True
break
df = self._clean_earnings_dates(df, refresh)
return df
[docs]
def get_calendar(self, refresh=True):
yfcu.TypeCheckBool(refresh, 'refresh')
max_age = pd.Timedelta(yfcm._option_manager.max_ages.calendar)
if self._calendar is None:
if yfcm.IsDatumCached(self.ticker, "calendar"):
self._calendar = yfcm.ReadCacheDatum(self.ticker, "calendar")
self._calendar_clean = dict(self._calendar)
del self._calendar_clean['FetchDate']
if len(self._calendar_clean.keys()) == 0:
self._calendar_clean = None
if (self._calendar is not None) and (self._calendar["FetchDate"] + max_age) > pd.Timestamp.now():
return self._calendar_clean
if not refresh:
return self._calendar_clean
if print_fetches:
print(f"{self.ticker}: Fetching calendar (last fetch = {self._calendar['FetchDate'].date()})")
c = self.dat.calendar
c["FetchDate"] = pd.Timestamp.now()
if self._calendar is not None:
# Check calendar is not downgrade
diff = len(c) - len(self._calendar)
if diff < -1:
# More than 1 element disappeared
msg = "When fetching new calendar, data has disappeared\n"
msg += "- cached calendar:\n"
msg += f"{self._calendar}" + "\n"
msg += "- new calendar:\n"
msg += f"{c}" + "\n"
raise Exception(msg)
if c is not None:
yfcm.StoreCacheDatum(self.ticker, "calendar", c)
self._calendar = c
self._calendar_clean = dict(self._calendar)
del self._calendar_clean['FetchDate']
if len(self._calendar_clean.keys()) == 0:
self._calendar_clean = None
return self._calendar_clean
def _get_calendar_dates(self, refresh=True):
yfcu.TypeCheckBool(refresh, 'refresh')
debug = False
# debug = True
if debug:
print(f"_get_calendar_dates(refresh={refresh})")
cal = self.get_calendar(refresh)
if cal is None or len(cal) == 0:
return None
if debug:
print(f"- cal = {cal}")
cal_release_dates = []
cal_release_dates.sort()
last = None
for d in cal["Earnings Date"]:
if last is None:
last = d
else:
diff = d - last
if debug:
print(f"- diff = {diff}")
if diff <= timedelta(days=15):
# Looks like a date range so tag last-added date as estimate. And change data to be middle of range
last = yfcd.DateRange(last, d)
cal_release_dates.append(last)
last = None
else:
print("- cal_release_dates:") ; print(cal_release_dates)
print("- diff =", diff)
raise Exception(f"Implement/rejig this execution path (tkr={self.ticker})")
if last is not None:
cal_release_dates.append(last)
if debug:
print(f"- cal_release_dates = {cal_release_dates}")
if debug:
if len(cal_release_dates) == 0:
print("- cal_release_dates: EMPTY")
else:
print("- cal_release_dates:")
for e in cal_release_dates:
print(e)
return cal_release_dates