import yfinance as yf
from . import yfc_cache_manager as yfcm
from . import yfc_dat as yfcd
from . import yfc_utils as yfcu
from . import yfc_logging as yfcl
from . import yfc_time as yfct
from . import yfc_prices_manager as yfcp
from . import yfc_financials_manager as yfcf
import numpy as np
import pandas as pd
import datetime
from dateutil.relativedelta import relativedelta
from zoneinfo import ZoneInfo
import os
import re
# from time import perf_counter
# TODO: Ticker: add method to delete ticker from cache
[docs]
class Ticker:
def __init__(self, ticker, session=None):
self.ticker = ticker.upper()
self.session = session
self.dat = yf.Ticker(self.ticker, session=self.session)
self._yf_lag = None
self._histories_manager = None
self._info = None
self._fast_info = None
self._splits = None
self._shares = None
self._major_holders = None
self._institutional_holders = None
self._sustainability = None
self._recommendations = None
self._calendar = None
self._isin = None
self._options = None
self._news = None
self._debug = False
# self._debug = True
self._tz = None
self._exchange = None
exchange, tz_name = self._getExchangeAndTz()
self._financials_manager = yfcf.FinancialsManager(ticker, exchange, tz_name, session=self.session)
[docs]
def history(self,
interval="1d",
max_age=None, # defaults to half of interval
period=None,
start=None, end=None, prepost=False, actions=True,
adjust_splits=True, adjust_divs=True,
keepna=False,
proxy=None, rounding=False,
debug=True, quiet=False,
trigger_at_market_close=False):
"""Example docstring for Ticker.history() function
Arguments:
interval - ...
"""
# t0 = perf_counter()
if prepost:
raise Exception("pre and post-market caching currently not implemented. If you really need it raise an issue on Github")
debug_yfc = self._debug
# debug_yfc = True
if start is not None or end is not None:
log_msg = f"Ticker::history(tkr={self.ticker} interval={interval} start={start} end={end} max_age={max_age} trigger_at_market_close={trigger_at_market_close} adjust_splits={adjust_splits}, adjust_divs={adjust_divs})"
else:
log_msg = f"Ticker::history(tkr={self.ticker} interval={interval} period={period} max_age={max_age} trigger_at_market_close={trigger_at_market_close} adjust_splits={adjust_splits}, adjust_divs={adjust_divs})"
yfcl.TraceEnter(log_msg)
td_1d = datetime.timedelta(days=1)
exchange, tz_name = self._getExchangeAndTz()
tz_exchange = ZoneInfo(tz_name)
yfct.SetExchangeTzName(exchange, tz_name)
dt_now = pd.Timestamp.utcnow()
# Type checks
if max_age is not None:
if isinstance(max_age, str):
if max_age.endswith("wk"):
max_age = re.sub("wk$", "w", max_age)
max_age = pd.Timedelta(max_age)
if not isinstance(max_age, (datetime.timedelta, pd.Timedelta)):
raise Exception("Argument 'max_age' must be Timedelta or equivalent string")
if period is not None:
if start is not None or end is not None:
raise Exception("Don't set both 'period' and 'start'/'end'' arguments")
if isinstance(period, str):
if period in ["max", "ytd"]:
period = yfcd.periodStrToEnum[period]
else:
if period.endswith("wk"):
period = re.sub("wk$", "w", period)
if period.endswith("y"):
period = relativedelta(years=int(re.sub("y$", "", period)))
elif period.endswith("mo"):
period = relativedelta(months=int(re.sub("mo", "", period)))
else:
period = pd.Timedelta(period)
if not isinstance(period, (yfcd.Period, datetime.timedelta, pd.Timedelta, relativedelta)):
raise Exception(f"Argument 'period' must be one of: 'max', 'ytd', Timedelta or equivalent string. Not {type(period)}")
if isinstance(interval, str):
if interval not in yfcd.intervalStrToEnum.keys():
raise Exception("'interval' if str must be one of: {}".format(yfcd.intervalStrToEnum.keys()))
interval = yfcd.intervalStrToEnum[interval]
if not isinstance(interval, yfcd.Interval):
raise Exception("'interval' must be yfcd.Interval")
start_d = None ; end_d = None
start_dt = None ; end_dt = None
interday = interval in [yfcd.Interval.Days1, yfcd.Interval.Week]#, yfcd.Interval.Months1, yfcd.Interval.Months3]
if start is not None:
start_dt, start_d = self._process_user_dt(start)
if start_dt > dt_now:
return None
if interval == yfcd.Interval.Week:
# Note: if start is on weekend then Yahoo can return weekly data starting
# on Saturday. This breaks YFC, start must be Monday! So fix here:
if start_dt is None:
# Working with simple dates, easy
if start_d.weekday() in [5, 6]:
start_d += datetime.timedelta(days=7-start_d.weekday())
else:
wd = start_d.weekday()
if wd in [5, 6]:
start_d += datetime.timedelta(days=7-wd)
start_dt = datetime.datetime.combine(start_d, datetime.time(0), tz_exchange)
if end is not None:
end_dt, end_d = self._process_user_dt(end)
if start_dt is not None and end_dt is not None and start_dt >= end_dt:
raise ValueError("start must be < end")
if debug_yfc:
print("- start_dt={} , end_dt={}".format(start_dt, end_dt))
if (start_dt is not None) and start_dt == end_dt:
return None
if max_age is None:
if interval == yfcd.Interval.Days1:
max_age = datetime.timedelta(hours=4)
elif interval == yfcd.Interval.Week:
max_age = datetime.timedelta(hours=60)
# elif interval == yfcd.Interval.Months1:
# max_age = datetime.timedelta(days=15)
# elif interval == yfcd.Interval.Months3:
# max_age = datetime.timedelta(days=45)
else:
max_age = 0.5*yfcd.intervalToTimedelta[interval]
if start is not None:
max_age = min(max_age, dt_now-start_dt)
if period is not None:
if isinstance(period, (datetime.timedelta, pd.Timedelta)):
if (dt_now - max_age) < (dt_now - period):
raise Exception(f"max_age={max_age} must be less than period={period}")
elif period == yfcd.Period.Ytd:
dt_now_ex = dt_now.tz_convert(tz_exchange)
dt_year_start = pd.Timestamp(year=dt_now_ex.year, month=1, day=1).tz_localize(tz_exchange)
if (dt_now - max_age) < dt_year_start:
raise Exception(f"max_age={max_age} must be less than days since this year start")
elif start is not None:
if (dt_now - max_age) < start_dt:
raise Exception(f"max_age={max_age} must be closer to now than start={start}")
if start_dt is not None:
try:
sched_14d = yfct.GetExchangeSchedule(exchange, start_dt.date(), start_dt.date()+14*td_1d)
except Exception as e:
if "Need to add mapping" in str(e):
raise Exception("Need to add mapping of exchange {} to xcal (ticker={})".format(exchange, self.ticker))
else:
raise
if sched_14d is None:
raise Exception("sched_14d is None for date range {}->{} and ticker {}".format(start_dt.date(), start_dt.date()+14*td_1d, self.ticker))
if sched_14d["open"].iloc[0] > dt_now:
# Requested date range is in future
return None
else:
sched_14d = None
# All date checks passed so can begin fetching
if ((start_d is None) or (end_d is None)) and (start_dt is not None) and (end_dt is not None):
# if start_d/end_d not set then start/end are datetimes, so need to inspect
# schedule opens/closes to determine days
if sched_14d is not None:
sched = sched_14d.iloc[0:1]
else:
sched = yfct.GetExchangeSchedule(exchange, start_dt.date(), end_dt.date()+td_1d)
n = sched.shape[0]
start_d = start_dt.date() if start_dt < sched["open"].iloc[0] else start_dt.date()+td_1d
end_d = end_dt.date()+td_1d if end_dt >= sched["close"].iloc[n-1] else end_dt.date()
else:
if exchange not in yfcd.exchangeToXcalExchange:
raise Exception("Need to add mapping of exchange {} to xcal (ticker={})".format(exchange, self.ticker))
if self._histories_manager is None:
self._histories_manager = yfcp.HistoriesManager(self.ticker, exchange, tz_name, self.session, proxy)
# t1_setup = perf_counter()
hist = self._histories_manager.GetHistory(interval)
if period is not None:
h = hist.get(start=None, end=None, period=period, max_age=max_age, trigger_at_market_close=trigger_at_market_close, quiet=quiet)
elif interday:
h = hist.get(start_d, end_d, period=None, max_age=max_age, trigger_at_market_close=trigger_at_market_close, quiet=quiet)
else:
h = hist.get(start_dt, end_dt, period=None, max_age=max_age, trigger_at_market_close=trigger_at_market_close, quiet=quiet)
if (h is None) or h.shape[0] == 0:
msg = f"YFC: history() exiting without price data (tkr={self.ticker}"
if start_dt is not None or end_dt is not None:
msg += f" start_dt={start_dt} end_dt={end_dt}"
else:
msg += f" period={period}"
msg += f" max_age={max_age}"
msg += f" interval={yfcd.intervalToString[interval]})"
raise Exception(msg)
# t2_sync = perf_counter()
f_dups = h.index.duplicated()
if f_dups.any():
raise Exception("{}: These timepoints have been duplicated: {}".format(self.ticker, h.index[f_dups]))
# Present table for user:
h_copied = False
if (start_dt is not None) and (end_dt is not None):
h = h.loc[start_dt:end_dt-datetime.timedelta(milliseconds=1)].copy()
h_copied = True
if not keepna:
price_data_cols = [c for c in yfcd.yf_data_cols if c in h.columns]
mask_nan_or_zero = (np.isnan(h[price_data_cols].to_numpy()) | (h[price_data_cols].to_numpy() == 0)).all(axis=1)
if mask_nan_or_zero.any():
h = h.drop(h.index[mask_nan_or_zero])
h_copied = True
# t3_filter = perf_counter()
if h.shape[0] == 0:
h = None
else:
if adjust_splits:
if not h_copied:
h = h.copy()
for c in ["Open", "Close", "Low", "High", "Dividends"]:
h[c] = np.multiply(h[c].to_numpy(), h["CSF"].to_numpy())
h["Volume"] = np.round(np.divide(h["Volume"].to_numpy(), h["CSF"].to_numpy()), 0).astype('int')
if adjust_divs:
if not h_copied:
h = h.copy()
for c in ["Open", "Close", "Low", "High"]:
h[c] = np.multiply(h[c].to_numpy(), h["CDF"].to_numpy())
else:
if not h_copied:
h = h.copy()
h["Adj Close"] = np.multiply(h["Close"].to_numpy(), h["CDF"].to_numpy())
h = h.drop(["CSF", "CDF"], axis=1)
if rounding:
# Round to 4 sig-figs
if not h_copied:
h = h.copy()
f_na = h["Close"].isna()
na = f_na.any()
if na:
f_nna = ~f_na
if not f_nna.any():
raise Exception(f"{self.ticker}: price table is entirely NaNs. Delisted?" +" \n" + log_msg)
last_close = h["Close"][f_nna].iloc[-1]
else:
last_close = h["Close"].iloc[-1]
rnd = yfcu.CalculateRounding(last_close, 4)
for c in ["Open", "Close", "Low", "High"]:
if na:
h.loc[f_nna, c] = np.round(h.loc[f_nna, c].to_numpy(), rnd)
else:
h[c] = np.round(h[c].to_numpy(), rnd)
if debug_yfc:
print("- h:")
cols = [c for c in ["Close", "Dividends", "Volume", "CDF", "CSF"] if c in h.columns]
print(h[cols])
if "Dividends" in h.columns:
f = h["Dividends"] != 0.0
if f.any():
print("- dividends:")
print(h.loc[f, cols])
print("")
yfcl.TraceExit("Ticker::history() returning")
# t4_adjust = perf_counter()
# t_setup = t1_setup - t0
# t_sync = t2_sync - t1_setup
# t_filter = t3_filter - t2_sync
# t_adjust = t4_adjust - t3_filter
# t_sum = t_setup + t_sync + t_filter + t_adjust
# print("TIME: {:.4f}s: setup={:.4f} sync={:.4f} filter={:.4f} adjust={:.4f}".format(t_sum, t_setup, t_sync, t_filter, t_adjust))
# t_setup *= 100/t_sum
# t_sync *= 100/t_sum
# t_cache *= 100/t_sum
# t_filter *= 100/t_sum
# t_adjust *= 100/t_sum
# print("TIME %: setup={:.1f}% sync={:.1f}% filter={:.1f}% adjust={:.1f}%".format(t_setup, t_sync, t_filter, t_adju
return h
def _getCachedPrices(self, interval, proxy=None):
if self._histories_manager is None:
exchange, tz_name = self._getExchangeAndTz()
self._histories_manager = yfcp.HistoriesManager(self.ticker, exchange, tz_name, self.session, proxy)
if isinstance(interval, str):
if interval not in yfcd.intervalStrToEnum.keys():
raise Exception("'interval' if str must be one of: {}".format(yfcd.intervalStrToEnum.keys()))
interval = yfcd.intervalStrToEnum[interval]
return self._histories_manager.GetHistory(interval).h
def _getExchangeAndTz(self):
if self._tz is not None and self._exchange is not None:
return self._exchange, self._tz
exchange, tz_name = None, None
try:
exchange = self.get_info('9999d')['exchange']
if "exchangeTimezoneName" in self.get_info('9999d'):
tz_name = self.get_info('9999d')["exchangeTimezoneName"]
else:
tz_name = self.get_info('9999d')["timeZoneFullName"]
except Exception:
md = yf.Ticker(self.ticker, session=self.session).history_metadata
if 'exchangeName' in md.keys():
exchange = md['exchangeName']
if 'exchangeTimezoneName' in md.keys():
tz_name = md['exchangeTimezoneName']
if exchange is None or tz_name is None:
raise Exception(f"{self.ticker}: exchange and timezone not available")
self._tz = tz_name
self._exchange = exchange
return self._exchange, self._tz
[docs]
def verify_cached_prices(self, rtol=0.0001, vol_rtol=0.005, correct='none', discard_old=False, quiet=True, debug=False, debug_interval=None):
if debug:
quiet = False
if debug_interval is not None and isinstance(debug_interval, str):
debug_interval = yfcd.intervalStrToEnum[debug_interval]
fn_locals = locals()
del fn_locals["self"]
interval = yfcd.Interval.Days1
cache_key = "history-"+yfcd.intervalToString[interval]
if not yfcm.IsDatumCached(self.ticker, cache_key):
return True
yfcl.TraceEnter(f"Ticker::verify_cached_prices(tkr={self.ticker} {fn_locals})")
if self._histories_manager is None:
exchange, tz_name = self._getExchangeAndTz()
self._histories_manager = yfcp.HistoriesManager(self.ticker, exchange, tz_name, self.session, proxy=None)
v = True
# First verify 1d
dt0 = self._histories_manager.GetHistory(interval)._getCachedPrices().index[0]
self.history(start=dt0.date(), quiet=quiet, trigger_at_market_close=True) # ensure have all dividends
v = self._verify_cached_prices_interval(interval, rtol, vol_rtol, correct, discard_old, quiet, debug)
if debug_interval == yfcd.Interval.Days1:
yfcl.TraceExit(f"Ticker::verify_cached_prices() returning {v} (1st pass)")
return v
if not v:
if debug or not correct:
yfcl.TraceExit(f"Ticker::verify_cached_prices() returning {v} (1st pass)")
return v
if correct in ['one', 'all']:
# Rows were removed so re-fetch. Only do for 1d data
self.history(start=dt0.date(), quiet=quiet)
# repeat verification, because 'fetch backporting' may be buggy
v2 = self._verify_cached_prices_interval(interval, rtol, vol_rtol, correct, discard_old, quiet, debug)
if not v2 and debug:
yfcl.TraceExit(f"Ticker::verify_cached_prices() returning {v2} (post-correction)")
return v2
if not v2:
yfcl.TraceExit(f"Ticker::verify_cached_prices() returning {v2} (post-correction)")
return v2
if not v:
# Stop after correcting first problem, because user won't have been shown the next problem yet
yfcl.TraceExit(f"Ticker::verify_cached_prices() returning {v} (corrected but user should review next problem)")
return v
if debug_interval is not None:
if debug_interval == yfcd.Interval.Days1:
intervals = []
else:
intervals = [debug_interval]
debug = True
else:
intervals = yfcd.Interval
for interval in intervals:
if interval == yfcd.Interval.Days1:
continue
istr = yfcd.intervalToString[interval]
cache_key = "history-"+istr
if not yfcm.IsDatumCached(self.ticker, cache_key):
continue
vi = self._verify_cached_prices_interval(interval, rtol, vol_rtol, correct, discard_old, quiet, debug)
yfcl.TracePrint(f"{istr}: vi={vi}")
if not vi and correct != 'all':
# Stop after correcting first problem, because user won't have been shown the next problem yet
yfcl.TraceExit(f"Ticker::verify_cached_prices() returning {vi}")
return vi
v = v and vi
yfcl.TraceExit(f"Ticker::verify_cached_prices() returning {v}")
return v
def _verify_cached_prices_interval(self, interval, rtol=0.0001, vol_rtol=0.005, correct=False, discard_old=False, quiet=True, debug=False):
if debug:
quiet = False
fn_locals = locals()
del fn_locals["self"]
if isinstance(interval, str):
if interval not in yfcd.intervalStrToEnum.keys():
raise Exception("'interval' if str must be one of: {}".format(yfcd.intervalStrToEnum.keys()))
interval = yfcd.intervalStrToEnum[interval]
istr = yfcd.intervalToString[interval]
cache_key = "history-"+istr
if not yfcm.IsDatumCached(self.ticker, cache_key):
return True
yfcl.TraceEnter(f"Ticker::_verify_cached_prices_interval(tkr={self.ticker}, {fn_locals})")
if self._histories_manager is None:
exchange, tz_name = self._getExchangeAndTz()
self._histories_manager = yfcp.HistoriesManager(self.ticker, exchange, tz_name, self.session, proxy=None)
v = self._histories_manager.GetHistory(interval)._verifyCachedPrices(rtol, vol_rtol, correct, discard_old, quiet, debug)
yfcl.TraceExit(f"Ticker::_verify_cached_prices_interval() returning {v}")
return v
def _process_user_dt(self, dt):
exchange, tz_name = self._getExchangeAndTz()
return yfcu.ProcessUserDt(dt, tz_name)
@property
def info(self):
return self.get_info()
[docs]
def get_info(self, max_age=None):
if self._info is not None:
return self._info
if max_age is None:
max_age = pd.Timedelta(yfcm._option_manager.max_ages.info)
elif not isinstance(max_age, (datetime.timedelta, pd.Timedelta)):
max_age = pd.Timedelta(max_age)
if max_age < pd.Timedelta(0):
raise Exception(f"'max_age' must be positive timedelta not {max_age}")
md = None
if yfcm.IsDatumCached(self.ticker, "info"):
self._info, md = yfcm.ReadCacheDatum(self.ticker, "info", True)
if 'FetchDate' not in self._info.keys():
# Old bug meant this could happen
fp = yfcm.GetFilepath(self.ticker, 'info')
mod_dt = datetime.datetime.fromtimestamp(os.path.getmtime(fp))
self._info['FetchDate'] = mod_dt
md['LastCheck'] = mod_dt
yfcm.StoreCacheDatum(self.ticker, "info", self._info, metadata=md)
if self._info is not None:
if md is None:
md = {}
if 'LastCheck' not in md.keys():
# Old bug meant this could happen
md['LastCheck'] = self._info['FetchDate']
yfcm.WriteCacheMetadata(self.ticker, "info", 'LastCheck', md['LastCheck'])
if max(self._info['FetchDate'], md['LastCheck']) + max_age > pd.Timestamp.now():
return self._info
i = self.dat.info
i['FetchDate'] = pd.Timestamp.now()
if self._info is not None:
# Check new info is not downgrade
diff = len(i) - len(self._info)
diff_pct = float(diff) / float(len(self._info))
if diff_pct < -0.1 and diff < -10:
msg = 'When fetching new info, significant amount of data has disappeared\n'
missing_keys = [k for k in self._info.keys() if k not in i.keys()]
new_keys = [k for k in i.keys() if k not in self._info.keys()]
msg += "- missing: "
msg += str({k:self._info[k] for k in missing_keys}) + '\n'
msg += "- new: "
msg += str({k:i[k] for k in new_keys}) + '\n'
# msg += "\nKeep new data?"
# keep = click.confirm(msg, default=False)
# if not keep:
# return self._info
#
msg += "\nDiscarding fetched info."
print(f'{self.ticker}: {msg}')
yfcm.WriteCacheMetadata(self.ticker, "info", 'LastCheck', i['FetchDate'])
return self._info
self._info = i
if md is None:
md = {}
md['LastCheck'] = i['FetchDate']
yfcm.StoreCacheDatum(self.ticker, "info", self._info, metadata=md)
exchange, tz_name = self._getExchangeAndTz()
yfct.SetExchangeTzName(exchange, tz_name)
return self._info
@property
def fast_info(self):
if self._fast_info is not None:
return self._fast_info
if yfcm.IsDatumCached(self.ticker, "fast_info"):
try:
self._fast_info = yfcm.ReadCacheDatum(self.ticker, "fast_info")
except Exception:
pass
else:
return self._fast_info
# self._fast_info = self.dat.fast_info
self._fast_info = {}
for k in self.dat.fast_info.keys():
try:
self._fast_info[k] = self.dat.fast_info[k]
except Exception as e:
if "decrypt" in str(e):
pass
else:
print(f"TICKER = {self.ticker}")
raise
yfcm.StoreCacheDatum(self.ticker, "fast_info", self._fast_info)
yfct.SetExchangeTzName(self._fast_info["exchange"], self._fast_info["timezone"])
return self._fast_info
@property
def splits(self):
if self._splits is not None:
return self._splits
if yfcm.IsDatumCached(self.ticker, "splits"):
self._splits = yfcm.ReadCacheDatum(self.ticker, "splits")
return self._splits
self._splits = self.dat.splits
yfcm.StoreCacheDatum(self.ticker, "splits", self._splits)
return self._splits
[docs]
def get_shares(self, start=None, end=None, max_age='30d'):
debug = False
# debug = True
max_age = pd.Timedelta(max_age)
# Process dates
exchange, tz = self._getExchangeAndTz()
dt_now = pd.Timestamp.utcnow().tz_convert(tz)
if start is not None:
start_dt, start_d = self._process_user_dt(start)
start = start_d
if end is not None:
end_dt, end_d = self._process_user_dt(end)
end = end_d
if end is None:
end_dt = dt_now
end = dt_now.date()
if start is None:
start = end - pd.Timedelta(days=548) # 18 months
start_dt = end_dt - pd.Timedelta(days=548)
if start >= end:
raise Exception("Start date must be before end")
if debug:
print("- start =", start, " end =", end)
if self._shares is None:
if yfcm.IsDatumCached(self.ticker, "shares"):
if debug:
print("- init shares from cache")
self._shares = yfcm.ReadCacheDatum(self.ticker, "shares")
if self._shares is None or self._shares.empty:
# Loaded from cache. Either re-fetch or return None
lastFetchDt = yfcm.ReadCacheMetadata(self.ticker, "shares", 'LastFetch')
do_fetch = (lastFetchDt is None) or ((dt_now - lastFetchDt) > max_age)
if do_fetch:
self._shares = self._fetch_shares(start, end)
if self._shares is None:
self._shares = pd.DataFrame()
yfcm.StoreCacheDatum(self.ticker, "shares", self._shares, metadata={'LastFetch':pd.Timestamp.utcnow().tz_convert(tz)})
if self._shares.empty:
return None
else:
return None
if debug:
print("- self._shares:", self._shares.index[0], '->', self._shares.index[-1])
td_1d = datetime.timedelta(days=1)
last_row = self._shares.iloc[-1]
if pd.isna(last_row['Shares']):# and last_row['FetchDate'].date() == last_row.name:
if debug:
print("- dropping last row from cached")
self._shares = self._shares.drop(self._shares.index[-1])
if not isinstance(self._shares.index, pd.DatetimeIndex):
self._shares.index = pd.to_datetime(self._shares.index).tz_localize(tz)
if self._shares['Shares'].dtype == 'float':
# Convert to Int, and add a little to avoid rounding errors
self._shares['Shares'] = (self._shares['Shares']+0.01).round().astype('Int64')
if start < self._shares.index[0].date():
df_pre = self._fetch_shares(start, self._shares.index[0])
yfcm.WriteCacheMetadata(self.ticker, "shares", 'LastFetch', pd.Timestamp.utcnow().tz_convert(tz))
if df_pre is not None:
self._shares = pd.concat([df_pre, self._shares])
if (end-td_1d) > self._shares.index[-1].date() and \
(end - self._shares.index[-1].date()) > max_age:
df_post = self._fetch_shares(self._shares.index[-1] + td_1d, end)
yfcm.WriteCacheMetadata(self.ticker, "shares", 'LastFetch', pd.Timestamp.utcnow().tz_convert(tz))
if df_post is not None:
self._shares = pd.concat([self._shares, df_post])
self._shares = self._shares
yfcm.StoreCacheDatum(self.ticker, "shares", self._shares)
f_na = self._shares['Shares'].isna()
shares = self._shares[~f_na]
if start is not None:
i0 = np.searchsorted(shares.index, start_dt)
else:
i0 = None
if end is not None:
i1 = np.searchsorted(shares.index, end_dt)
else:
i1 = None
if i0 is not None and i1 is not None:
return shares.iloc[i0:i1]
elif i0 is not None:
return shares[i0:]
elif i1 is not None:
return shares[:i1]
return shares
def _fetch_shares(self, start, end):
td_1d = datetime.timedelta(days=1)
exchange, tz = self._getExchangeAndTz()
if isinstance(end, datetime.datetime):
end_dt = end
end_d = end.date()
else:
end_dt = pd.Timestamp(end).tz_localize(tz)
end_d = end
if isinstance(start, datetime.datetime):
start_dt = start
start_d = start.date()
else:
start_dt = pd.Timestamp(start).tz_localize(tz)
start_d = start
end_d = min(end_d, datetime.date.today() + td_1d)
df = self.dat.get_shares_full(start_d, end_d)
if df is None:
return df
if df.empty:
return None
# Convert to Pandas Int for NaN support
df = df.astype('Int64')
# Currently, yfinance uses ceil(end), so fix:
if df.index[-1].date() == end_d:
df.drop(df.index[-1])
if df.empty:
return None
fetch_dt = pd.Timestamp.utcnow().tz_convert(tz)
df = pd.DataFrame(df, columns=['Shares'])
if start_d < df.index[0].date():
df.loc[start_dt, 'Shares'] = np.nan
if (end_d-td_1d) > df.index[-1].date():
df.loc[end_dt, 'Shares'] = np.nan
df = df.sort_index()
df['FetchDate'] = fetch_dt
return df
@property
def major_holders(self):
if self._major_holders is not None:
return self._major_holders
if yfcm.IsDatumCached(self.ticker, "major_holders"):
self._major_holders = yfcm.ReadCacheDatum(self.ticker, "major_holders")
return self._major_holders
self._major_holders = self.dat.major_holders
yfcm.StoreCacheDatum(self.ticker, "major_holders", self._major_holders)
return self._major_holders
@property
def institutional_holders(self):
if self._institutional_holders is not None:
return self._institutional_holders
if yfcm.IsDatumCached(self.ticker, "institutional_holders"):
self._institutional_holders = yfcm.ReadCacheDatum(self.ticker, "institutional_holders")
return self._institutional_holders
self._institutional_holders = self.dat.institutional_holders
yfcm.StoreCacheDatum(self.ticker, "institutional_holders", self._institutional_holders)
return self._institutional_holders
@property
def earnings(self):
return self._financials_manager.get_earnings()
@property
def quarterly_earnings(self):
return self._financials_manager.get_quarterly_earnings()
@property
def income_stmt(self):
return self._financials_manager.get_income_stmt()
@property
def quarterly_income_stmt(self):
return self._financials_manager.get_quarterly_income_stmt()
@property
def financials(self):
return self._financials_manager.get_income_stmt()
@property
def quarterly_financials(self):
return self._financials_manager.get_quarterly_income_stmt()
@property
def balance_sheet(self):
return self._financials_manager.get_balance_sheet()
@property
def quarterly_balance_sheet(self):
return self._financials_manager.get_quarterly_balance_sheet()
@property
def cashflow(self):
return self._financials_manager.get_cashflow()
@property
def quarterly_cashflow(self):
return self._financials_manager.get_quarterly_cashflow()
[docs]
def get_earnings_dates(self, limit=12):
return self._financials_manager.get_earnings_dates(limit)
[docs]
def get_release_dates(self, period='quarterly', as_df=False, check=True):
if period not in ['annual', 'quarterly']:
raise ValueError(f'period argument must be "annual" or "quarterly", not "{period}"')
if period == 'annual':
period = yfcd.ReportingPeriod.Full
else:
period = yfcd.ReportingPeriod.Interim
releases = self._financials_manager.get_release_dates(period, as_df, refresh=True, check=check)
if releases is None:
return releases
if as_df:
# Format:
releases['Period end uncertainty'] = '0d'
f = releases['PE confidence'] == yfcd.Confidence.Medium
if f.any():
releases.loc[f, 'Period end uncertainty'] = '+-7d'
f = releases['PE confidence'] == yfcd.Confidence.Low
if f.any():
releases.loc[f, 'Period end uncertainty'] = '+-45d'
releases = releases.drop('PE confidence', axis=1)
releases['Release date uncertainty'] = '0d'
f = releases['RD confidence'] == yfcd.Confidence.Medium
if f.any():
releases.loc[f, 'Release date uncertainty'] = '+/-7d'
f = releases['RD confidence'] == yfcd.Confidence.Low
if f.any():
releases.loc[f, 'Release date uncertainty'] = '+/-45d'
releases = releases.drop('RD confidence', axis=1)
releases = releases.drop('Delay', axis=1)
return releases
@property
def sustainability(self):
if self._sustainability is not None:
return self._sustainability
if yfcm.IsDatumCached(self.ticker, "sustainability"):
self._sustainability = yfcm.ReadCacheDatum(self.ticker, "sustainability")
return self._sustainability
self._sustainability = self.dat.sustainability
yfcm.StoreCacheDatum(self.ticker, "sustainability", self._sustainability)
return self._sustainability
@property
def recommendations(self):
if self._recommendations is not None:
return self._recommendations
if yfcm.IsDatumCached(self.ticker, "recommendations"):
self._recommendations = yfcm.ReadCacheDatum(self.ticker, "recommendations")
return self._recommendations
self._recommendations = self.dat.recommendations
yfcm.StoreCacheDatum(self.ticker, "recommendations", self._recommendations)
return self._recommendations
@property
def calendar(self):
return self._financials_manager.get_calendar()
@property
def inin(self):
if self._inin is not None:
return self._inin
if yfcm.IsDatumCached(self.ticker, "inin"):
self._inin = yfcm.ReadCacheDatum(self.ticker, "inin")
return self._inin
self._inin = self.dat.inin
yfcm.StoreCacheDatum(self.ticker, "inin", self._inin)
return self._inin
@property
def options(self):
if self._options is not None:
return self._options
if yfcm.IsDatumCached(self.ticker, "options"):
self._options = yfcm.ReadCacheDatum(self.ticker, "options")
return self._options
self._options = self.dat.options
yfcm.StoreCacheDatum(self.ticker, "options", self._options)
return self._options
@property
def news(self):
if self._news is not None:
return self._news
if yfcm.IsDatumCached(self.ticker, "news"):
self._news = yfcm.ReadCacheDatum(self.ticker, "news")
return self._news
self._news = self.dat.news
yfcm.StoreCacheDatum(self.ticker, "news", self._news)
return self._news
@property
def yf_lag(self):
if self._yf_lag is not None:
return self._yf_lag
exchange, tz_name = self._getExchangeAndTz()
exchange_str = "exchange-{0}".format(exchange)
if yfcm.IsDatumCached(exchange_str, "yf_lag"):
self._yf_lag = yfcm.ReadCacheDatum(exchange_str, "yf_lag")
if self._yf_lag:
return self._yf_lag
# Just use specified lag
specified_lag = yfcd.exchangeToYfLag[exchange]
self._yf_lag = specified_lag
return self._yf_lag
[docs]
def verify_cached_tickers_prices(session=None, rtol=0.0001, vol_rtol=0.005, correct=False, halt_on_fail=True, resume_from_tkr=None, debug_tkr=None, debug_interval=None):
"""
:Parameters:
session:
Recommend providing a 'requests_cache' session, in case
you have to abort and resume verification (likely).
correct:
False, 'one', 'all'
resume_from_tkr: str
Resume verification from this ticker (alphabetical order).
Because maybe you had to abort verification partway.
debug_tkr: str
Only verify this ticker.
Because maybe you want to investigate a difference.
"""
if debug_interval is not None and isinstance(debug_interval, str):
if debug_interval not in yfcd.intervalStrToEnum.keys():
raise Exception("'debug_interval' if str must be one of: {}".format(yfcd.intervalStrToEnum.keys()))
debug_interval = yfcd.intervalStrToEnum[debug_interval]
d = yfcm.GetCacheDirpath()
tkrs = [x for x in os.listdir(d) if not x.startswith("exchange-") and os.path.isdir(os.path.join(d, x)) and '_' not in x]
debug = debug_tkr is not None
if debug_tkr is not None:
debug_tkr = debug_tkr.upper()
tkrs = [debug_tkr]
else:
tkrs = sorted(tkrs)
if resume_from_tkr is not None:
resume_from_tkr = resume_from_tkr.upper()
resume_after_tkr = False
if resume_from_tkr.endswith("+1"):
resume_after_tkr = True
resume_from_tkr = resume_from_tkr.replace("+1", "")
i = np.searchsorted(np.array(tkrs), resume_from_tkr, side="left")
if resume_after_tkr:
i += 1
tkrs = tkrs[i:]
if debug_tkr is not None:
tkrs = [debug_tkr]
tqdm_loaded = False
try:
from tqdm import tqdm
t = tqdm(range(len(tkrs)))
tqdm_loaded = True
except ModuleNotFoundError:
print("Install Python module 'tqdm' to print progress bar + estimated time to completion")
t = range(len(tkrs))
for i in t:
tkr = tkrs[i]
if tqdm_loaded:
t.set_description("Verifying " + tkr)
else:
print(f"{tkr} : {i+1}/{len(tkrs)}")
dat = Ticker(tkr, session=session)
try:
discard_old = correct in ['one', 'all']
v = dat.verify_cached_prices(rtol=rtol, vol_rtol=vol_rtol, correct=correct, discard_old=discard_old, quiet=not debug, debug=debug, debug_interval=debug_interval)
except yfcd.NoPriceDataInRangeException as e:
print(str(e) + " - is it delisted? Aborting verification so you can investigate.")
return
if debug:
return
if correct in ['one', 'all']:
v = dat.verify_cached_prices(rtol=rtol, vol_rtol=vol_rtol, correct=correct, discard_old=False, quiet=True, debug=debug, debug_interval=debug_interval)
if not v and correct != 'all':
v = dat.verify_cached_prices(rtol=rtol, vol_rtol=vol_rtol, correct=False, discard_old=False, quiet=False, debug=True, debug_interval=debug_interval)
if halt_on_fail:
raise Exception(f"{tkr}: verify failing")
else:
print(f"{tkr}: verify failing")