import yfinance as yf
from . import yfc_cache_manager as yfcm
from . import yfc_dat as yfcd
from . import yfc_time as yfct
from . import yfc_utils as yfcu
from . import yfc_logging as yfcl
import numpy as np
import pandas as pd
import pytz
# import itertools
from scipy import ndimage as _ndimage
from datetime import datetime, date, time, timedelta
import dateutil
from zoneinfo import ZoneInfo
from pprint import pprint
import click
import logging
# TODOs:
# - when filling a missing interval with NaNs, try to reconstruct first
[docs]
class HistoriesManager:
# Intended as single to class to ensure:
# - only one History() object exists for each timescale/data type
# - different History() objects and communicate
def __init__(self, ticker, exchange, tzName, session, proxy):
yfcu.TypeCheckStr(ticker, "ticker")
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckStr(tzName, "tzName")
self.ticker = ticker
self.exchange = exchange
self.tzName = tzName
self.histories = {}
self.session = session
self.proxy = proxy
self.logger = None
def __del__(self):
if self.logger is not None:
# Fix OS error "Too many open files"
self.logger.handlers[0].close()
[docs]
def GetHistory(self, key):
permitted_keys = set(yfcd.intervalToString.keys()) | {"Events"}
if key not in permitted_keys:
raise ValueError(f"key='{key}' is invalid, must be one of: {permitted_keys}")
if key not in self.histories:
if key in yfcd.intervalToString.keys():
if key == yfcd.Interval.Days1:
self.histories[key] = PriceHistory(self, self.ticker, self.exchange, self.tzName, key, self.session, self.proxy, repair=True, contiguous=True)
else:
self.histories[key] = PriceHistory(self, self.ticker, self.exchange, self.tzName, key, self.session, self.proxy, repair=True, contiguous=False)
elif key == "Events":
self.histories[key] = EventsHistory(self, self.ticker, self.exchange, self.tzName, self.proxy)
else:
raise Exception(f"Not implemented code path for key='{key}'")
return self.histories[key]
[docs]
def LogEvent(self, level, group, msg):
if not yfcl.IsLoggingEnabled():
if yfcl.IsTracingEnabled():
yfcl.TracePrint(msg)
return
if not isinstance(level, str) or level not in ["debug", "info"]:
raise Exception("'level' must be str 'debug' or 'info'")
if self.logger is None:
self.logger = yfcl.GetLogger(self.ticker)
full_msg = f"{group}: {msg}"
if level == "debug":
self.logger.debug(full_msg)
else:
self.logger.info(full_msg)
[docs]
class EventsHistory:
def __init__(self, manager, ticker, exchange, tzName, proxy):
if not isinstance(manager, HistoriesManager):
raise TypeError(f"'manager' must be HistoriesManager not {type(manager)}")
yfcu.TypeCheckStr(ticker, "ticker")
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckStr(tzName, "tzName")
self.manager = manager
self.ticker = ticker
self.exchange = exchange
self.tzName = tzName
self.proxy = proxy
self.tz = ZoneInfo(self.tzName)
if yfcm.IsDatumCached(self.ticker, "dividends"):
self.divs = yfcm.ReadCacheDatum(self.ticker, "dividends").sort_index()
else:
self.divs = None
if yfcm.IsDatumCached(self.ticker, "splits"):
self.splits = yfcm.ReadCacheDatum(self.ticker, "splits").sort_index()
else:
self.splits = None
[docs]
def GetDivs(self, start=None, end=None):
if start is not None:
yfcu.TypeCheckDateStrict(start, "start")
if end is not None:
yfcu.TypeCheckDateStrict(end, "end")
if self.divs is None or self.divs.empty:
return None
if start is None and end is None:
return self.divs.copy()
tz = self.divs.index[0].tz
if start is not None:
start = pd.Timestamp(start).tz_localize(tz)
if end is not None:
end = pd.Timestamp(end).tz_localize(tz)
td_1d = timedelta(days=1)
if end is None:
slc = self.divs.loc[start:]
elif start is None:
slc = self.divs.loc[:end-td_1d]
else:
slc = self.divs.loc[start:end-td_1d]
if slc.empty:
return None
else:
return slc.copy()
[docs]
def GetDivsFetchedSince(self, dt):
yfcu.TypeCheckDatetime(dt, "dt")
if self.divs is None or self.divs.empty:
result = None
else:
f = self.divs["FetchDate"] > dt
if f.any():
result = self.divs[f].copy()
else:
result = None
return result
[docs]
def GetSplits(self, start, end=None):
yfcu.TypeCheckDateStrict(start, "start")
if end is not None:
yfcu.TypeCheckDateStrict(end, "end")
result = None
if self.splits is not None and not self.splits.empty:
start = pd.Timestamp(start).tz_localize(self.splits.index[0].tz)
if end is not None:
end = pd.Timestamp(end).tz_localize(self.splits.index[0].tz)
td_1d = timedelta(days=1)
if end is None:
slc = self.splits.loc[start:]
else:
slc = self.splits.loc[start:end-td_1d]
if slc.empty:
result = None
else:
result = slc.copy()
return result
[docs]
def GetSplitsFetchedSince(self, dt):
yfcu.TypeCheckDatetime(dt, "dt")
if self.splits is None or self.splits.empty:
result = None
else:
f = self.splits["FetchDate"] > dt
if f.any():
result = self.splits[f].copy()
else:
result = None
return result
[docs]
def UpdateSplits(self, splits_df):
n = splits_df.shape[0]
yfcl.TraceEnter(f"PM: UpdateSplits({splits_df.index.date})") if n <= 2 else yfcl.TraceEnter(f"PM: UpdateSplits(n={n})")
self_splits_modified = False
debug = False
# debug = True
yfcu.TypeCheckDataFrame(splits_df, "splits_df")
splits_df = splits_df.copy()
if not splits_df.empty:
expected_cols = ["Stock Splits", "FetchDate"]
for c in expected_cols:
if c not in splits_df.columns:
raise ValueError("UpdateSplits() 'splits_df' columns must contain: '{expected_cols}'")
# Prepare 'splits_df' for append
splits_df["Superseded split"] = 0.0
splits_df["Superseded split FetchDate"] = pd.NaT
if splits_df['Superseded split FetchDate'].dt.tz is None and self.splits is not None:
splits_df['Superseded split FetchDate'] = splits_df['Superseded split FetchDate'].dt.tz_localize(self.splits['FetchDate'].dt.tz)
for dt in splits_df.index:
new_split = splits_df.loc[dt, "Stock Splits"]
if self.splits is not None and dt in self.splits.index:
cached_split = self.splits.loc[dt, "Stock Splits"]
if debug:
yfcl.TracePrint(f"pre-existing stock-split @ {dt}: {cached_split} vs {new_split}")
diff_pct = 100*abs(cached_split-new_split)/cached_split
if diff_pct < 0.01:
# tiny difference, easier to just keep old value
splits_df = splits_df.drop(dt)
if debug:
yfcl.TracePrint("ignoring")
else:
splits_df.loc[dt, "Superseded split"] = self.splits.loc[dt, "Stock Splits"]
splits_df.loc[dt, "Superseded split FetchDate"] = self.splits.loc[dt, "FetchDate"]
self.splits = self.splits.drop(dt)
self_splits_modified = True
if debug:
yfcl.TracePrint("supersede")
cols = ["Stock Splits", "FetchDate", "Superseded split", "Superseded split FetchDate"]
if not splits_df.empty:
splits_pretty = splits_df["Stock Splits"]
splits_pretty.index = splits_pretty.index.date.astype(str)
self.manager.LogEvent("info", "SplitManager", f"{splits_pretty.shape[0]} new splits: {splits_pretty.to_dict()}")
if self.splits is None:
self.splits = splits_df[cols].copy()
else:
f_na = self.splits['Superseded split FetchDate'].isna()
if f_na.all():
# Drop column. It breaks concat, and anyway 'divs_df' will restore it.
self.splits = self.splits.drop('Superseded split FetchDate', axis=1)
self.splits = pd.concat([self.splits, splits_df[cols]], sort=True).sort_index()
yfcm.StoreCacheDatum(self.ticker, "splits", self.splits)
elif self_splits_modified:
yfcm.StoreCacheDatum(self.ticker, "splits", self.splits)
yfcl.TraceExit("UpdateSplits() returning")
[docs]
def UpdateDividends(self, divs_df):
debug = False
# debug = True
n = divs_df.shape[0]
yfcl.TraceEnter(f"PM: UpdateDividends({divs_df.index.date})") if n <= 2 else yfcl.TraceEnter(f"PM: UpdateDividends(n={n})")
self_divs_modified = False
yfcu.TypeCheckDataFrame(divs_df, "divs_df")
divs_df = divs_df.copy()
if not divs_df.empty:
expected_cols = ["Dividends", "FetchDate", "Close day before"]
# expected_cols = ["Dividends", "FetchDate", "Close today"]
for c in expected_cols:
if c not in divs_df.columns:
raise ValueError(f"AddDividends() 'divs_df' is missing column: '{c}'")
# Prepare 'divs_df' for append
divs_df["Back Adj."] = np.nan
divs_df["Superseded div"] = 0.0
divs_df["Superseded back adj."] = 0.0
divs_df["Superseded div FetchDate"] = pd.NaT
if divs_df['Superseded div FetchDate'].dt.tz is None and self.divs is not None:
divs_df['Superseded div FetchDate'] = divs_df['Superseded div FetchDate'].dt.tz_localize(self.divs['FetchDate'].dt.tz)
divs_df_dts = divs_df.index.copy()
for dt in divs_df_dts:
new_div = divs_df.loc[dt, "Dividends"]
close_before = divs_df.loc[dt, "Close day before"]
# adj = (close_before - new_div) / close_before
adj = 1.0 - new_div / close_before
# # F = P2/(P2+D)
# # http://marubozu.blogspot.com/2006/09/how-yahoo-calculates-adjusted-closing.html#c8038064975185708856
# close_today = divs_df.loc[dt, "Close today"]
# adj = close_today / (close_today + new_div)
try:
if np.isnan(adj): # todo: remove once confirm YFC bug-free
print(divs_df.loc[dt])
raise Exception("Back Adj. is NaN")
except:
print("dt=", dt)
print("T=", self.ticker)
print("divs_df:") ; print(divs_df)
raise
if debug:
fetch_dt = divs_df.loc[dt, "FetchDate"]
msg = f"new dividend: {new_div} @ {dt.date()} adj={adj:.5f} close_before={close_before:.4f} fetch={fetch_dt.strftime('%Y-%m-%d %H:%M:%S%z')}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
divs_df.loc[dt, "Back Adj."] = adj
if self.divs is not None and dt in self.divs.index:
# Replaced cached dividend event if (i) dividend different or (ii) adj different.
cached_div = self.divs.loc[dt, "Dividends"]
cached_adj = self.divs.loc[dt, "Back Adj."]
if debug:
msg = f"pre-existing dividend @ {dt}: {cached_div} vs {new_div}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
diff_pct = 100*abs(cached_div-new_div)/cached_div
diff_pct2 = 100*abs(cached_adj-adj)/cached_adj
diff_pct = max(diff_pct, diff_pct2)
if diff_pct < 0.01:
# tiny difference, easier to just keep old value
divs_df = divs_df.drop(dt)
if debug:
msg = "ignoring new div"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
else:
divs_df.loc[dt, "Superseded div"] = self.divs.loc[dt, "Dividends"]
divs_df.loc[dt, "Superseded back adj."] = self.divs.loc[dt, "Back Adj."]
divs_df.loc[dt, "Superseded div FetchDate"] = self.divs.loc[dt, "FetchDate"]
self.divs = self.divs.drop(dt)
self_divs_modified = True
if debug:
msg = "replacing old div"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
elif new_div == 0.0:
# Discard, was only sent to deprecate previous dividend on this date
divs_df = divs_df.drop(dt)
cols = ["Dividends", "Back Adj.", "FetchDate", "Superseded div", "Superseded back adj.", "Superseded div FetchDate"]
if not divs_df.empty:
divs_pretty = divs_df[["Dividends", "Back Adj.", "Close day before"]].copy()
divs_pretty = divs_pretty.rename(columns={'Dividends':'Div', 'Back Adj.':'Adj', 'Close day before':'Close'})
divs_pretty['Adj'] = divs_pretty['Adj'].round(3)
divs_pretty['Close'] = divs_pretty['Close'].round(3)
divs_pretty.index = divs_pretty.index.date.astype(str)
n = divs_pretty.shape[0]
n_sup = np.sum((divs_df['Superseded div']>0.0).to_numpy())
n_new = n - n_sup
self.manager.LogEvent("info", "DividendManager", f"stored {n} dividends ({n_new} new, {n_sup} superseded): {divs_pretty.to_dict(orient='index')}")
if self.divs is None:
self.divs = divs_df[cols].copy()
else:
f_na = self.divs['Superseded div FetchDate'].isna()
if f_na.all():
# Drop column. It breaks concat, and anyway 'divs_df' will restore it.
self.divs = self.divs.drop('Superseded div FetchDate', axis=1)
self.divs = pd.concat([self.divs, divs_df[cols]], sort=True).sort_index()
yfcm.StoreCacheDatum(self.ticker, "dividends", self.divs)
elif self_divs_modified:
yfcm.StoreCacheDatum(self.ticker, "dividends", self.divs)
yfcl.TraceExit("UpdateDividends() returning")
[docs]
class PriceHistory:
def __init__(self, manager, ticker, exchange, tzName, interval, session, proxy, repair=True, contiguous=False):
if isinstance(interval, str):
if interval not in yfcd.intervalStrToEnum.keys():
raise Exception("'interval' if str must be one of: {}".format(yfcd.intervalStrToEnum.keys()))
interval = yfcd.intervalStrToEnum[interval]
yfcu.TypeCheckStr(ticker, "ticker")
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckStr(tzName, "tzName")
yfcu.TypeCheckBool(repair, "repair")
yfcu.TypeCheckBool(contiguous, "contiguous")
self.manager = manager
self.ticker = ticker
self.exchange = exchange
self.tzName = tzName
self.interval = interval
self.session = session
self.proxy = proxy
self.repair = repair
self.contiguous = contiguous
self.dat = yf.Ticker(self.ticker, session=self.session)
self.tz = ZoneInfo(self.tzName)
self.itd = yfcd.intervalToTimedelta[self.interval]
self.istr = yfcd.intervalToString[self.interval]
self.interday = self.interval in [yfcd.Interval.Days1, yfcd.Interval.Week]#, yfcd.Interval.Months1, yfcd.Interval.Months3]
self.intraday = not self.interday
self.multiday = self.interday and self.interval != yfcd.Interval.Days1
# Load from cache
self.cache_key = "history-"+self.istr
self.h = self._getCachedPrices()
self._reviewNewDivs()
# A place to temporarily store new dividends, until prices have
# been repaired, then they can be sent to EventsHistory
self._debug = False
# self._debug = True
# Manage potential for infinite recursion during price repair:
self._record_stack_trace = True
# self._record_stack_trace = False
self._stack_trace = []
self._infinite_recursion_detected = False
def _getCachedPrices(self):
h = None
if yfcm.IsDatumCached(self.ticker, self.cache_key):
h = yfcm.ReadCacheDatum(self.ticker, self.cache_key)
if h is not None and h.empty:
h = None
elif h is not None:
h_modified = False
# h = yfcu.CustomNanCheckingDataFrame(h)
if "Adj Close" in h.columns:
raise Exception("Adj Close in cached h")
f_dups = h.index.duplicated()
if f_dups.any():
raise Exception("{}: These timepoints have been duplicated: {}".format(self.ticker, h.index[f_dups]))
f_na = np.isnan(h["CDF"].to_numpy())
if f_na.any():
h["CDF"] = h["CDF"].bfill().ffill()
f_na = h["CDF"].isna()
if f_na.any():
raise Exception("CDF NaN repair failed")
h_modified = True
if h_modified:
yfcm.StoreCacheDatum(self.ticker, self.cache_key, h)
return h
def _updatedCachedPrices(self, df):
yfcu.TypeCheckDataFrame(df, "df")
expected_cols = ["Open", "High", "Low", "Close", "Volume", "Dividends", "Stock Splits"]
expected_cols += ["Final?", "C-Check?", "FetchDate", "CSF", "CDF"]
expected_cols += ["LastDivAdjustDt", "LastSplitAdjustDt"]
missing_cols = [c for c in expected_cols if c not in df.columns]
if len(missing_cols) > 0:
raise Exception(f"DF missing these columns: {missing_cols}")
if df.empty:
df = None
yfcm.StoreCacheDatum(self.ticker, self.cache_key, df)
self.h = df
def _reviewNewDivs(self):
if self.interval != yfcd.Interval.Days1:
return
cached_new_divs = yfcm.ReadCacheDatum(self.ticker, "new_divs")
if cached_new_divs is not None:
if yfcm.ReadCacheMetadata(self.ticker, "new_divs", "locked") is not None:
# This is bad, means YFC was killed before 'new_divs' could be processed.
# Means potentially future new dividends have not been processed
h_divs = self.h.loc[self.h["Dividends"]!=0, ["Dividends", "FetchDate"]]
h_divs_since = h_divs[h_divs.index > cached_new_divs.index.max()]
if not h_divs_since.empty:
if 'Desplitted?' not in cached_new_divs.columns:
cached_new_divs['Desplitted?'] = False # assume
h_divs_since['Desplitted?'] = True
cached_new_divs = pd.concat([cached_new_divs, h_divs_since])
yfcm.StoreCacheDatum(self.ticker, "new_divs", cached_new_divs)
yfcm.WriteCacheMetadata(self.ticker, "new_divs", "locked", None)
[docs]
def get(self, start=None, end=None, period=None, max_age=None, trigger_at_market_close=False, repair=True, prepost=False, adjust_splits=False, adjust_divs=False, quiet=False):
if start is None and end is None and period is None:
raise ValueError("Must provide value for one of: 'start', 'end', 'period'")
if start is not None:
yfcu.TypeCheckIntervalDt(start, self.interval, "start", strict=False)
if end is not None:
yfcu.TypeCheckIntervalDt(end, self.interval, "end", strict=False)
if period is not None:
yfcu.TypeCheckPeriod(period, "period")
yfcu.TypeCheckBool(trigger_at_market_close, "trigger_at_market_close")
yfcu.TypeCheckBool(repair, "repair")
yfcu.TypeCheckBool(adjust_splits, "adjust_splits")
yfcu.TypeCheckBool(adjust_divs, "adjust_divs")
# TODO: enforce 'max_age' value provided. Only 'None' while I dev
if max_age is None:
if self.interval == yfcd.Interval.Days1:
max_age = timedelta(hours=4)
elif self.interval == yfcd.Interval.Week:
max_age = timedelta(hours=60)
# elif self.interval == yfcd.Interval.Months1:
# max_age = timedelta(days=15)
# elif self.interval == yfcd.Interval.Months3:
# max_age = timedelta(days=45)
else:
max_age = 0.5*yfcd.intervalToTimedelta[self.interval]
# YFC cannot handle pre- and post-market intraday
prepost = self.interday
yfct.SetExchangeTzName(self.exchange, self.tzName)
td_1d = timedelta(days=1)
tz_exchange = ZoneInfo(self.tzName)
dt_now = pd.Timestamp.utcnow().tz_convert(tz_exchange)
d_now_exchange = dt_now.date()
tomorrow_d = d_now_exchange + td_1d
if self.interday:
tomorrow = tomorrow_d
else:
tomorrow = datetime.combine(tomorrow_d, time(0), tz_exchange)
if start is not None:
if end is not None and start >= end:
# raise ValueError(f"start={start} must < end={end}")
return None
# if (self.interday and start >= tomorrow) or (not self.interday and start > dt_now):
if isinstance(start, datetime):
if start > dt_now:
return None
else:
if start >= tomorrow_d:
return None
debug_yf = False
debug_yfc = self._debug
# debug_yfc = True
if period is not None:
log_msg = f"PriceHistory-{self.istr}.get(tkr={self.ticker}, period={period}, max_age={max_age}, trigger_at_market_close={trigger_at_market_close}, prepost={prepost}, repair={repair})"
else:
log_msg = f"PriceHistory-{self.istr}.get(tkr={self.ticker}, start={start}, end={end}, max_age={max_age}, trigger_at_market_close={trigger_at_market_close}, prepost={prepost}, repair={repair})"
if yfcl.IsTracingEnabled():
yfcl.TraceEnter(log_msg)
elif debug_yfc:
print(log_msg)
self._applyNewEvents()
try:
yf_lag = yfcd.exchangeToYfLag[self.exchange]
except:
print(f"- ticker = {self.ticker}")
raise
pstr = None
end_d = None ; end_dt = None
if period is not None:
start_d, end_d = yfct.MapPeriodToDates(self.exchange, period, self.interval)
period = None
if self.interday:
start = start_d
end = end_d
start_dt = datetime.combine(start_d, time(0), tz_exchange)
end_dt = datetime.combine(end_d, time(0), tz_exchange)
else:
start = datetime.combine(start_d, time(0), tz_exchange)
end = None
start_dt = start
end_dt = None
else:
if self.interday:
start_dt = datetime.combine(start, time(0), tz_exchange)
else:
if isinstance(start, datetime):
start_dt = start
else:
start_dt = datetime.combine(start, time(0), tz_exchange)
start = start_dt
if end is None and pstr is None:
if self.interday:
end = d_now_exchange+td_1d
else:
sched = yfct.GetExchangeSchedule(self.exchange, d_now_exchange, d_now_exchange+td_1d)
if sched is None or (dt_now + yf_lag) < sched["open"].iloc[0]:
# Before market open
end = datetime.combine(d_now_exchange, time(0), tz_exchange)
else:
i = yfct.GetTimestampCurrentInterval(self.exchange, dt_now+yf_lag, self.interval, ignore_breaks=True)
if i is None:
# After interval
end = datetime.combine(d_now_exchange+td_1d, time(0), tz_exchange)
else:
# During interval
end = i["interval_close"]
if end is not None and end_dt is None:
if isinstance(end, datetime):
end_dt = end
else:
# end_dt = datetime.combine(end+td_1d, time(0), tz_exchange)
end_dt = datetime.combine(end, time(0), tz_exchange)
if not self.interday:
end = end_dt
if self.interday:
if isinstance(start, datetime) or isinstance(end, datetime):
raise TypeError(f"'start' and 'end' must be date type not {type(start)}, {type(end)}")
else:
if (not isinstance(start, datetime)) and (not isinstance(end, datetime)):
raise TypeError(f"'start' and 'end' must be datetime type not {type(start)}, {type(end)}")
listing_date = yfcm.ReadCacheDatum(self.ticker, "listing_date")
if listing_date is not None and start is not None:
listing_date_dt = datetime.combine(listing_date, time(0), tz_exchange)
if isinstance(start, datetime):
start = max(start, listing_date_dt)
else:
start = max(start, listing_date)
if self.h is not None:
if self.h.empty:
self.h = None
# Remove expired intervals from cache
if self.h is not None:
n = self.h.shape[0]
if self.interday:
h_interval_dts = self.h.index.date if isinstance(self.h.index[0], pd.Timestamp) else self.h.index
else:
h_interval_dts = np.array([yfct.ConvertToDatetime(dt, tz=tz_exchange) for dt in self.h.index])
h_interval_dts = np.array(h_interval_dts)
# if self.interval == yfcd.Interval.Days1:
if self.contiguous:
# Daily data is always contiguous so only need to check last row
h_interval_dt = h_interval_dts[-1]
fetch_dt = yfct.ConvertToDatetime(self.h["FetchDate"].iloc[-1], tz=tz_exchange)
f_final = self.h["Final?"].to_numpy()
f_nfinal = ~f_final
# - also treat repaired data as non-final, if fetched near to interval timepoint
# because Yahoo might now have correct data
# - and treat NaN data as repaired
f_repair = self.h["Repaired?"].to_numpy()
f_na = self.h['Close'].isna().to_numpy()
f_repair = f_repair | f_na
cutoff_dts = self.h.index + self.itd + timedelta(days=7)
# Ignore repaired data if fetched/repaired 7+ days after interval end
f_repair[self.h['FetchDate'] > cutoff_dts] = False
if f_repair.any():
f_nfinal = f_nfinal | f_repair
if f_nfinal.any():
idx0 = np.where(f_nfinal)[0][0]
repaired = f_repair[idx0]
h_interval_dt = h_interval_dts[idx0]
fetch_dt = yfct.ConvertToDatetime(self.h["FetchDate"].iloc[idx0], tz=tz_exchange)
try:
expired = yfct.IsPriceDatapointExpired(h_interval_dt, fetch_dt, repaired, max_age, self.exchange, self.interval, yf_lag=yf_lag, triggerExpiryOnClose=trigger_at_market_close)
except yfcd.TimestampOutsideIntervalException as e:
if f_na[idx0]:
# YFC must have inserted a row of NaNs, wrongly thinking exchange should have been open here.
expired = True
else:
raise e
if expired:
self.h = self.h.iloc[:idx0]
h_interval_dts = h_interval_dts[:idx0]
else:
expired = np.array([False]*n)
f_final = self.h["Final?"].to_numpy()
f_nfinal = ~f_final
# - also treat repaired data as non-final, if fetched near to interval timepoint
# because Yahoo might now have correct data
# - and treat NaN data as repaired
cutoff_dt = dt_now - timedelta(days=7)
idx = self.h.index.get_indexer([cutoff_dt], method='bfill')[0]
f_repair = self.h["Repaired?"].to_numpy()
f_na = self.h['Close'].isna().to_numpy()
f_repair = f_repair | f_na
cutoff_dts = self.h.index + self.itd + timedelta(days=7)
# Ignore repaired data if fetched/repaired 7+ days after interval end
f_repair[self.h['FetchDate'] > cutoff_dts] = False
if f_repair.any():
f_nfinal = f_nfinal | f_repair
for idx in np.where(f_nfinal)[0]:
# repaired = False
repaired = f_repair[idx]
h_interval_dt = h_interval_dts[idx]
fetch_dt = yfct.ConvertToDatetime(self.h["FetchDate"].iloc[idx], tz=tz_exchange)
try:
expired_idx = yfct.IsPriceDatapointExpired(h_interval_dt, fetch_dt, repaired, max_age, self.exchange, self.interval, yf_lag=yf_lag, triggerExpiryOnClose=trigger_at_market_close)
except yfcd.TimestampOutsideIntervalException as e:
if f_na[idx0]:
# YFC must have inserted a row of NaNs, wrongly thinking exchange should have been open here.
expired_idx = True
else:
raise e
expired[idx] = expired_idx
if expired.any():
self.h = self.h.drop(self.h.index[expired])
h_interval_dts = h_interval_dts[~expired]
if self.h.empty:
self.h = None
ranges_to_fetch = []
if self.h is None:
# Simple, just fetch the requested data
if self.contiguous:
# Ensure daily always up-to-now
h = self._fetchYfHistory(start, tomorrow, prepost, debug_yf)
else:
h = self._fetchYfHistory(start, end, prepost, debug_yf)
if h is None:
raise Exception(f"{self.ticker}: Failed to fetch date range {start}->{end}")
# Adjust
h = self._reverseYahooAdjust(h)
if self.interval == yfcd.Interval.Days1:
h_splits = h[h["Stock Splits"] != 0]
if len(h_splits) > 0:
self.manager.GetHistory("Events").UpdateSplits(h_splits)
self._updatedCachedPrices(h)
else:
# Compare request against cached data, only fetch missing/expired data
# Performance TODO: tag rows as fully contiguous to avoid searching for gaps
# Calculate ranges_to_fetch
if self.contiguous:
if self.h is None or self.h.empty:
if self.interday:
if self.multiday:
ranges_to_fetch = yfct.IdentifyMissingIntervalRanges(self.exchange, start_d, end_d+self.itd, self.interval, [], minDistanceThreshold=5)
else:
ranges_to_fetch = yfct.IdentifyMissingIntervalRanges(self.exchange, start_d, end_d, self.interval, [], minDistanceThreshold=5)
else:
ranges_to_fetch = yfct.IdentifyMissingIntervalRanges(self.exchange, start, end, self.interval, [], minDistanceThreshold=5)
else:
# Ensure that daily data always up-to-date to now
# Update: only necessary to be up-to-date to now if a fetch happens
dt_start = yfct.ConvertToDatetime(self.h.index[0], tz=tz_exchange)
dt_end = yfct.ConvertToDatetime(self.h.index[-1], tz=tz_exchange)
h_start = yfct.GetTimestampCurrentInterval(self.exchange, dt_start, self.interval, ignore_breaks=True)["interval_open"]
last_interval = yfct.GetTimestampCurrentInterval(self.exchange, dt_end, self.interval, ignore_breaks=True)
if last_interval is None:
# Possible if Yahoo returned price data when 'exchange_calendars' thinks exchange was closed
for i in range(1, 5):
last_interval = yfct.GetTimestampCurrentInterval(self.exchange, dt_end - td_1d*i, self.interval, ignore_breaks=True)
if last_interval is not None:
break
h_end = last_interval["interval_close"]
rangePre_to_fetch = None
if start < h_start:
if debug_yfc:
msg = "checking for rangePre_to_fetch"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
try:
rangePre_to_fetch = yfct.IdentifyMissingIntervalRanges(self.exchange, start, h_start, self.interval, None, ignore_breaks=True, minDistanceThreshold=5)
except yfcd.NoIntervalsInRangeException:
rangePre_to_fetch = None
if rangePre_to_fetch is not None:
if len(rangePre_to_fetch) > 1:
raise Exception("Expected only one element in rangePre_to_fetch[], but = {}".format(rangePre_to_fetch))
rangePre_to_fetch = rangePre_to_fetch[0]
#
rangePost_to_fetch = None
if self.h["Final?"].iloc[-1] and (min(yfct.CalcIntervalLastDataDt(self.exchange, dt_end, self.interval), self.h["FetchDate"].iloc[-1])+max_age <= dt_now):
# Note: if self.h["Final?"].iloc[-1] == False, then that means above expiry check didn't remove it so don't fetch anything
if debug_yfc:
msg = "checking for rangePost_to_fetch"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
if self.interday:
if rangePre_to_fetch is not None or end > h_end:
target_end_d = tomorrow_d
else:
target_end_d = end
if self.multiday:
target_end_d += self.itd # testing new code
if h_end < target_end_d:
try:
rangePost_to_fetch = yfct.IdentifyMissingIntervalRanges(self.exchange, h_end, target_end_d, self.interval, None, minDistanceThreshold=5)
except yfcd.NoIntervalsInRangeException:
rangePost_to_fetch = None
else:
if rangePre_to_fetch is not None or end > h_end:
target_end_dt = dt_now
else:
target_end_dt = end
d = target_end_dt.astimezone(tz_exchange).date()
sched = yfct.GetExchangeSchedule(self.exchange, d, d + td_1d)
if (sched is not None) and (not sched.empty) and (dt_now > sched["open"].iloc[0]):
target_end_dt = sched["close"].iloc[0]+timedelta(hours=2)
if h_end < target_end_dt:
try:
rangePost_to_fetch = yfct.IdentifyMissingIntervalRanges(self.exchange, h_end, target_end_dt, self.interval, None, minDistanceThreshold=5)
except yfcd.NoIntervalsInRangeException:
rangePost_to_fetch = None
ranges_to_fetch = []
if rangePost_to_fetch is not None:
if len(rangePost_to_fetch) > 1:
raise Exception("Expected only one element in rangePost_to_fetch[], but = {}".format(rangePost_to_fetch))
rangePost_to_fetch = rangePost_to_fetch[0]
if rangePre_to_fetch is not None:
ranges_to_fetch.append(rangePre_to_fetch)
if rangePost_to_fetch is not None:
ranges_to_fetch.append(rangePost_to_fetch)
else:
h_intervals = yfct.GetTimestampCurrentInterval_batch(self.exchange, h_interval_dts, self.interval, ignore_breaks=True)
if h_intervals is None:
h_intervals = pd.DataFrame(data={"interval_open": [], "interval_close": []})
else:
f_na = h_intervals["interval_open"].isna().to_numpy()
if f_na.any():
# Mapping Yahoo intervals -> xcal can fail now, because sometime xcal is wrong.
# Need to tolerate
h_intervals.loc[f_na, "interval_open"] = h_interval_dts[f_na]
h_intervals.loc[f_na, "interval_close"] = h_interval_dts[f_na]+self.itd
if (not h_intervals.empty) and isinstance(h_intervals["interval_open"].iloc[0], datetime):
h_interval_opens = [x.to_pydatetime().astimezone(tz_exchange) for x in h_intervals["interval_open"]]
else:
h_interval_opens = h_intervals["interval_open"].to_numpy()
try:
target_end = end
if self.multiday:
target_end += self.itd
ranges_to_fetch = yfct.IdentifyMissingIntervalRanges(self.exchange, start, target_end, self.interval, h_interval_opens, ignore_breaks=True, minDistanceThreshold=5)
if ranges_to_fetch is None:
ranges_to_fetch = []
except yfcd.NoIntervalsInRangeException:
ranges_to_fetch = []
except Exception:
print("Ticker =", self.ticker)
raise
# Prune ranges in future:
for i in range(len(ranges_to_fetch)-1, -1, -1):
r = ranges_to_fetch[i]
x = r[0]
delete_range = False
if isinstance(x, (datetime, pd.Timestamp)):
if x > dt_now:
delete_range = True
else:
sched = yfct.GetExchangeSchedule(self.exchange, x.date(), x.date() + td_1d)
delete_range = (sched is not None) and dt_now < (sched["open"].iloc[0] + yf_lag)
else:
if datetime.combine(x, time(0), tzinfo=tz_exchange) > dt_now:
delete_range = True
else:
sched = yfct.GetExchangeSchedule(self.exchange, x, x + td_1d)
delete_range = (sched is not None) and dt_now < (sched["open"].iloc[0] + yf_lag)
if delete_range:
if debug_yfc:
print("- deleting future range:", r[i])
del ranges_to_fetch[i]
else:
# Check if range ends in future, if yes then adjust to tomorrow max
y = r[1]
if isinstance(y, (datetime, pd.Timestamp)):
if y > dt_now:
ranges_to_fetch[i] = (r[0], min(dt_now.ceil('1D'), y))
elif y > d_now_exchange:
sched = yfct.GetExchangeSchedule(self.exchange, d_now_exchange, y + td_1d)
if sched is not None:
if debug_yfc:
print("- capping last range_to_fetch end to d_now_exchange")
if dt_now < sched["open"].iloc[0]:
ranges_to_fetch[i] = (r[0], d_now_exchange)
else:
ranges_to_fetch[i] = (r[0], d_now_exchange + td_1d)
# Important that ranges_to_fetch in reverse order!
ranges_to_fetch.sort(key=lambda x: x[0], reverse=True)
if debug_yfc:
print("- ranges_to_fetch:")
pprint(ranges_to_fetch)
if len(ranges_to_fetch) > 0:
if not self.h.empty:
# Ensure only one range max is after cached data:
h_last_dt = self.h.index[-1].to_pydatetime()
if not isinstance(ranges_to_fetch[0][0], datetime):
h_last_dt = h_last_dt.astimezone(tz_exchange).date()
n = 0
for r in ranges_to_fetch:
if r[0] > h_last_dt:
n += 1
if n > 1:
print("ranges_to_fetch:")
pprint(ranges_to_fetch)
raise Exception("ranges_to_fetch contains {} ranges that occur after h_last_dt={}, expected 1 max".format(n, h_last_dt))
# if not quiet:
# quiet = period is not None # YFC generated date range so don't print message
if debug_yfc:
quiet = False
# quiet = not debug_yfc
# if self.interval == yfcd.Interval.Days1:
if self.contiguous:
self._fetchAndAddRanges_contiguous(ranges_to_fetch, prepost, debug_yf, quiet=quiet)
else:
self._fetchAndAddRanges_sparse(ranges_to_fetch, prepost, debug_yf, quiet=quiet)
# repair after all fetches complete
if self.repair and repair:
f_checked = self.h["C-Check?"].to_numpy()
f_not_checked = ~f_checked
if f_not_checked.any():
# Check for 100x errors across ENTIRE table with split-adjustment applied temporarily.
# Potential problem with very sparse data, but assume most users will
# fetch "sparse" fairly contiguously.
# - apply split-adjustment
OHLC = ['Open', 'High', 'Low', 'Close']
csf = self.h['CSF'].to_numpy()
for c in OHLC:
self.h[c] *= csf
self.h = self._repairUnitMixups(self.h)
# Also repair split errors:
self.h = self._fixBadStockSplits(self.h)
# - reverse split-adjustment
csf_rcp = 1.0/self.h['CSF'].to_numpy()
for c in OHLC:
self.h[c] *= csf_rcp
ha = self.h[f_not_checked].copy()
hb = self.h[~f_not_checked]
ha = self._repairZeroPrices(ha, silent=True)
ha["C-Check?"] = True
if not hb.empty:
self.h = pd.concat([ha, hb[ha.columns]])
self.h.index = pd.to_datetime(self.h.index, utc=True).tz_convert(tz_exchange)
else:
self.h = ha
self.h = self.h.sort_index()
self._updatedCachedPrices(self.h)
# Now prices have been repaired, can send out dividends
cached_new_divs = yfcm.ReadCacheDatum(self.ticker, "new_divs")
if self.interval == yfcd.Interval.Days1 and cached_new_divs is not None and not cached_new_divs.empty:
cached_new_divs_locked = yfcm.ReadCacheMetadata(self.ticker, "new_divs", "locked")
yfcl.TracePrint(f"cached_new_divs_locked = {cached_new_divs_locked}")
if cached_new_divs_locked is None:
f_dups = cached_new_divs.index.duplicated()
if f_dups.any():
print(cached_new_divs)
raise Exception('duplicates detected in cached_new_divs')
yfcm.WriteCacheMetadata(self.ticker, "new_divs", "locked", 1)
yfcl.TracePrint("sending out new dividends ...")
# TODO: remove duplicates from _newDivs (possible when restoring file file)
divs_df = cached_new_divs
divs_df["Close day before"] = np.nan
for dt in divs_df.index:
if dt == self.h.index[0]:
hist_before = self.manager.GetHistory(yfcd.Interval.Days1).get(start=dt.date()-timedelta(days=7), end=dt.date(), adjust_splits=False, adjust_divs=False)
close_day_before = hist_before["Close"].iloc[-1]
if np.isnan(close_day_before):
raise Exception("'close_day_before' is NaN")
else:
idx = self.h.index.get_loc(dt)
close_day_before = self.h["Close"].iloc[idx-1]
if np.isnan(close_day_before):
for idx in range(idx-1, idx-9, -1):
close_day_before = self.h["Close"].iloc[idx-1]
if not np.isnan(close_day_before):
break
if np.isnan(close_day_before):
print(f"- idx={idx} dt={dt}")
print(self.h.iloc[idx-2:idx+3][["Close", "FetchDate"]])
raise Exception("'close_day_before' is NaN")
divs_df.loc[dt, "Close day before"] = close_day_before
# De-split div:
if not divs_df.loc[dt, 'Desplitted?']:
splits_post = self.manager.GetHistory('Events').GetSplits(dt.date())
if splits_post is not None:
post_csf = 1.0/splits_post["Stock Splits"].prod()
divs_df.loc[dt, 'Dividends'] /= post_csf
divs_df.loc[dt, 'Desplitted?'] = True
self.manager.GetHistory("Events").UpdateDividends(divs_df)
self._applyNewEvents()
if cached_new_divs is not None and not cached_new_divs_locked:
yfcl.TracePrint("releasing lock on cached_new_divs")
yfcm.StoreCacheDatum(self.ticker, "new_divs", None) # delete
if "Adj Close" in self.h.columns:
raise Exception("Adj Close in self.h")
if (start is not None) and (end is not None):
h_copy = self.h.loc[start_dt:end_dt-timedelta(milliseconds=1)].copy()
else:
h_copy = self.h.copy()
if adjust_splits:
for c in ["Open", "High", "Low", "Close", "Dividends"]:
h_copy[c] *= h_copy["CSF"]
h_copy["Volume"] = (h_copy["Volume"]/h_copy["CSF"]).round(0).astype('int')
h_copy = h_copy.drop("CSF", axis=1)
if adjust_divs:
for c in ["Open", "High", "Low", "Close"]:
h_copy[c] *= h_copy["CDF"]
h_copy = h_copy.drop("CDF", axis=1)
log_msg = f"PriceHistory-{self.istr}.get() returning"
if h_copy.empty:
log_msg += " empty df"
else:
log_msg += f" DF {h_copy.index[0]} -> {h_copy.index[-1]}"
if yfcl.IsTracingEnabled():
yfcl.TraceExit(log_msg)
elif debug_yfc:
print(log_msg)
return h_copy
def _fetchAndAddRanges_contiguous(self, ranges_to_fetch, prepost, debug, quiet=False):
yfcu.TypeCheckIterable(ranges_to_fetch, "ranges_to_fetch")
yfcu.TypeCheckBool(prepost, "prepost")
yfcu.TypeCheckBool(debug, "debug")
yfcu.TypeCheckBool(quiet, "quiet")
# Fetch each range, appending/prepending to cached data
if (ranges_to_fetch is None) or len(ranges_to_fetch) == 0:
# return h
return
debug_yfc = self._debug
# debug_yfc = True
log_msg = f"_fetchAndAddRanges_contiguous-{self.istr}(n={len(ranges_to_fetch)} prepost={prepost}))"
if yfcl.IsTracingEnabled():
yfcl.TraceEnter(log_msg)
elif debug_yfc:
print(log_msg)
print("- ranges_to_fetch:")
pprint(ranges_to_fetch)
tz_exchange = ZoneInfo(self.tzName)
yfct.SetExchangeTzName(self.exchange, self.tzName)
if self.h is None or self.h.empty:
h_first_dt = None ; h_last_dt = None
else:
h_first_dt = self.h.index[0].to_pydatetime()
h_last_dt = self.h.index[-1].to_pydatetime()
if not isinstance(ranges_to_fetch[0][0], datetime):
h_first_dt = h_first_dt.astimezone(tz_exchange).date()
h_last_dt = h_last_dt.astimezone(tz_exchange).date()
td_1d = timedelta(days=1)
# Because data should be contiguous, then ranges should meet some conditions:
if len(ranges_to_fetch) > 2:
pprint(ranges_to_fetch)
raise Exception("For contiguous data generated {}>2 ranges".format(len(ranges_to_fetch)))
if self.h.empty and len(ranges_to_fetch) > 1:
raise Exception("For contiguous data generated {} ranges, but h is empty".format(len(ranges_to_fetch)))
range_pre = None ; range_post = None
if self.h.empty and len(ranges_to_fetch) == 1:
range_pre = ranges_to_fetch[0]
else:
n_pre = 0 ; n_post = 0
for r in ranges_to_fetch:
if r[0] > h_last_dt:
n_post += 1
range_post = r
elif r[0] < h_first_dt:
n_pre += 1
range_pre = r
if n_pre > 1:
pprint(ranges_to_fetch)
raise Exception("For contiguous data generated {}>1 ranges before h_first_dt".format(n_pre))
if n_post > 1:
pprint(ranges_to_fetch)
raise Exception("For contiguous data generated {}>1 ranges after h_last_dt".format(n_post))
# Fetch ranges
h2_pre = None ; h2_post = None
if range_pre is not None:
r = range_pre
try:
h2_pre = self._fetchYfHistory(r[0], r[1], prepost, debug)
except yfcd.NoPriceDataInRangeException:
if self.interval == yfcd.Interval.Days1 and r[1] - r[0] == td_1d:
# If only trying to fetch 1 day of 1d data, then print warning instead of exception.
# Could add additional condition of dividend previous day (seems to mess up table).
if not quiet:
print(f"WARNING: {self.ticker}: No {yfcd.intervalToString[self.interval]}-price data fetched for {r[0]} -> {r[1]}")
h2_pre = None
elif (range_post is None) and (r[1]-r[0] < td_1d*7) and (r[1]-r[0] > td_1d*3):
# Small date range, potentially trying to fetch before listing data
h2_pre = None
else:
raise
if range_post is not None:
r = range_post
try:
h2_post = self._fetchYfHistory(r[0], r[1], prepost, debug)
except yfcd.NoPriceDataInRangeException:
# If only trying to fetch 1 day of 1d data, then print warning instead of exception.
# Could add additional condition of dividend previous day (seems to mess up table).
if self.interval == yfcd.Interval.Days1 and r[1] - r[0] == td_1d:
if not quiet:
print(f"WARNING: {self.ticker}: No {yfcd.intervalToString[self.interval]}-price data fetched for {r[0]} -> {r[1]}")
h2_post = None
else:
raise
listing_date = yfcm.ReadCacheDatum(self.ticker, "listing_date")
if listing_date is None:
listing_date = self.dat.history_metadata["firstTradeDate"]
if isinstance(listing_date, int):
listing_date = pd.to_datetime(listing_date, unit='s', utc=True).tz_convert(tz_exchange)
yfcm.StoreCacheDatum(self.ticker, "listing_date", listing_date.date())
if h2_post is not None:
# De-adjust the new data, and backport any new events in cached data
# Note: Yahoo always returns split-adjusted price, so reverse it
# Simple append to bottom of table
# 1) adjust h2_post
h2_post = self._reverseYahooAdjust(h2_post)
if debug_yfc:
print("- h2_post:")
print(h2_post)
# TODO: Problem: dividends need correct close
if self.interval == yfcd.Interval.Days1:
h2_post_splits = h2_post[h2_post["Stock Splits"] != 0][["Stock Splits", "FetchDate"]].copy()
if not h2_post_splits.empty:
self.manager.GetHistory("Events").UpdateSplits(h2_post_splits)
# Update: moving UpdateDividends() to after repair
# Backport new events across entire h table
self._applyNewEvents()
if h2_post is not None and not isinstance(h2_post.index, pd.DatetimeIndex):
raise Exception("h2_post.index not DatetimeIndex")
if "Adj Close" in h2_post.columns:
raise Exception("Adj Close in h2_post")
self.h = pd.concat([self.h, h2_post[self.h.columns]])
self.h.index = pd.to_datetime(self.h.index, utc=True).tz_convert(tz_exchange)
if h2_pre is not None:
if debug_yfc:
print("- prepending new data")
# Simple prepend to top of table
h2_pre = self._reverseYahooAdjust(h2_pre)
if self.interval == yfcd.Interval.Days1:
h2_pre_splits = h2_pre[h2_pre["Stock Splits"] != 0][["Stock Splits", "FetchDate"]].copy()
if not h2_pre_splits.empty:
self.manager.GetHistory("Events").UpdateSplits(h2_pre_splits)
# h2_pre = h2_pre.drop(["Dividends", "Stock Splits"], axis=1)
if "Adj Close" in h2_pre.columns:
raise Exception("Adj Close in h2_pre")
self.h = pd.concat([self.h, h2_pre[self.h.columns]])
self.h.index = pd.to_datetime(self.h.index, utc=True).tz_convert(tz_exchange)
self.h = self.h.sort_index()
self._updatedCachedPrices(self.h)
log_msg = "_fetchAndAddRanges_contiguous() returning"
if yfcl.IsTracingEnabled():
yfcl.TraceExit(log_msg)
elif debug_yfc:
print("- h:")
print(self.h)
print(log_msg)
def _fetchAndAddRanges_sparse(self, ranges_to_fetch, prepost, debug, quiet=False):
yfcu.TypeCheckIterable(ranges_to_fetch, "ranges_to_fetch")
yfcu.TypeCheckBool(prepost, "prepost")
yfcu.TypeCheckBool(debug, "debug")
yfcu.TypeCheckBool(quiet, "quiet")
# Fetch each range, but can be careless regarding de-adjust because
# getting events from the carefully-managed daily data
if (ranges_to_fetch is None) or len(ranges_to_fetch) == 0:
# return h
return
debug_yfc = self._debug
# debug_yfc = True
log_msg = f"_fetchAndAddRanges_sparse(prepost={prepost})"
if yfcl.IsTracingEnabled():
yfcl.TraceEnter(log_msg)
elif debug_yfc:
print(log_msg)
tz_exchange = self.tz
td_1d = timedelta(days=1)
dtnow = pd.Timestamp.utcnow().tz_convert(tz_exchange)
# Backport events that occurred since last adjustment:
self._applyNewEvents()
# Ensure have daily data covering all ranges_to_fetch, so they can be de-splitted
r_start_earliest = ranges_to_fetch[0][0]
for rstart, rend in ranges_to_fetch:
r_start_earliest = min(rstart, r_start_earliest)
r_start_earliest_d = r_start_earliest.date() if isinstance(r_start_earliest, datetime) else r_start_earliest
if debug_yfc:
print("- r_start_earliest = {}".format(r_start_earliest))
# Trigger price sync:
histDaily = self.manager.GetHistory(yfcd.Interval.Days1)
# histDaily.get(start=r_start_earliest_d, max_age=td_1d)
histDaily.get(start=r_start_earliest_d, max_age=td_1d, repair=False)
# Fetch each range, and adjust for splits that occurred after
for rstart, rend in ranges_to_fetch:
fetch_start = rstart
fetch_end = rend
# if not self.interday: # and fetch_start.date() == fetch_end.date():
# # Intraday fetches behave better when time = midnight
# fetch_start = fetch_start.floor("1D")
# fetch_end = fetch_end.ceil("1D")
# Update: data reliability now fixed by ChunkDatesIntoYfFetches()
if debug_yfc:
print("- fetching {} -> {}".format(fetch_start, fetch_end))
try:
h2 = self._fetchYfHistory(fetch_start, fetch_end, prepost, debug)
except yfcd.NoPriceDataInRangeException:
# If only trying to fetch 1 day of 1d data, then print warning instead of exception.
# Could add additional condition of dividend previous day (seems to mess up table).
ignore = False
if self.interval == yfcd.Interval.Days1 and fetch_end - fetch_start == td_1d:
ignore = True
elif self.intraday and fetch_start.date() == dtnow.date():
ignore = True
elif self.interval == yfcd.Interval.Mins1 and fetch_end - fetch_start <= timedelta(minutes=10):
ignore = True
if ignore:
if not quiet:
# print("WARNING: No {}-price data fetched for ticker {} between dates {} -> {}".format(yfcd.intervalToString[self.interval], self.ticker, rstart, rend))
print(f"WARNING: {self.ticker}: No {yfcd.intervalToString[self.interval]}-price data fetched for {rstart} -> {rend}")
h2 = None
continue
else:
raise
if h2 is None:
# raise Exception("YF returned None for: tkr={}, interval={}, start={}, end={}".format(self.ticker, self.interval, rstart, rend))
# raise Exception(f"yfinance.history() returned None ({self.ticker} {self.istr} {rstart}->{rend})")
raise yfcd.NoPriceDataInRangeException(self.ticker, self.istr, rstart, rend)
# raise Exception(f"yfinance.history() returned None ({self.ticker} {self.istr} {fetch_start}->{fetch_end})")
# Ensure h2 is split-adjusted. Sometimes Yahoo returns unadjusted data
h2 = self._reverseYahooAdjust(h2)
if debug_yfc:
print("- h2 adjusted:")
print(h2[["Close", "Dividends", "Volume", "CSF", "CDF"]])
if fetch_start != rstart:
h2 = h2[h2.index >= rstart]
if fetch_end != rend:
h2 = h2[h2.index < rend+self.itd]
if "Adj Close" in h2.columns:
raise Exception("Adj Close in h2")
try:
self.h = self.h[yfcu.np_isin_optimised(self.h.index, h2.index, invert=True)]
except Exception:
print("self.h.shape:", self.h.shape)
print("h2.shape:", h2.shape)
raise
self.h = pd.concat([self.h, h2[self.h.columns]])
self.h.index = pd.to_datetime(self.h.index, utc=True).tz_convert(tz_exchange)
f_dups = self.h.index.duplicated()
if f_dups.any():
raise Exception(f"{self.ticker}: Adding range {rstart}->{rend} has added duplicate timepoints have been duplicated: {self.h.index[f_dups]}")
self.h = self.h.sort_index()
self._updatedCachedPrices(self.h)
log_msg = "_fetchAndAddRanges_sparse() returning"
if yfcl.IsTracingEnabled():
yfcl.TraceExit(log_msg)
elif debug_yfc:
print(log_msg)
def _verifyCachedPrices(self, rtol=0.0001, vol_rtol=0.004, correct=False, discard_old=False, quiet=True, debug=False):
correct_values = [False, 'one', 'all']
if correct not in correct_values:
raise TypeError(f"'correct' must be one of: {correct_values}")
yfcu.TypeCheckBool(discard_old, "discard_old")
yfcu.TypeCheckBool(quiet, "quiet")
yfcu.TypeCheckBool(debug, "debug")
if self.h is None or self.h.empty:
return True
if debug:
quiet = False
yfcl.TraceEnter(f"PM::_verifyCachedPrices-{self.istr}(correct={correct}, debug={debug})")
# New code: hopefully this will correct bad CDF in 1wk etc
self._applyNewEvents()
h = self.h.copy() # working copy for comparison with YF
h_modified = False
h_new = self.h.copy() # copy for storing changes
# Keep track of problems:
f_diff_all = pd.Series(np.full(h.shape[0], False), h.index, name="None")
n = h.shape[0]
# Ignore non-final data because will differ to Yahoo
h_lastRow = h.iloc[-1]
# Apply stock-split adjustment to match YF
for c in ["Open", "Close", "Low", "High", "Dividends"]:
h[c] = h[c].to_numpy() * h["CSF"].to_numpy()
h["Volume"] = (h["Volume"].to_numpy() / h["CSF"].to_numpy()).round().astype('int')
td_1d = pd.Timedelta("1D")
dt_now = pd.Timestamp.utcnow().tz_convert(ZoneInfo("UTC"))
def _aggregate_yfdf_daily(df):
df2 = df.copy()
df2["_date"] = df2.index.date
df2.loc[df2["Stock Splits"] == 0, "Stock Splits"] = 1
if "CDF" in df.columns:
df2 = df2.groupby("_date").agg(
Open=("Open", "first"),
Close=("Close", "last"),
Low=("Low", "min"),
High=("High", "max"),
Volume=("Volume", "sum"),
Dividends=("Dividends", "sum"),
StockSplits=("Stock Splits", "prod"),
CDF=("CDF", "prod"),
CSF=("CSF", "prod"),
FetchDate=("FetchDate", "first")).rename(columns={"StockSplits": "Stock Splits"})
else:
df2 = df2.groupby("_date").agg(
Open=("Open", "first"),
Close=("Close", "last"),
AdjClose=("Adj Close", "last"),
Low=("Low", "min"),
High=("High", "max"),
Volume=("Volume", "sum"),
Dividends=("Dividends", "sum"),
StockSplits=("Stock Splits", "prod")).rename(columns={"StockSplits": "Stock Splits", "AdjClose": "Adj Close"})
df2.loc[df2["Stock Splits"] == 1, "Stock Splits"] = 0
df2.index.name = df.index.name
df2.index = pd.to_datetime(df2.index).tz_localize(df.index.tz)
return df2
# For intraday data older than Yahoo limit, compare aggregated against 1d
if self.intraday:
dt_now_local = dt_now.tz_convert(self.tzName)
if self.interval == yfcd.Interval.Hours1:
max_lookback_days = 365*2
elif self.interval == yfcd.Interval.Mins1:
# max_lookback_days = 7
max_lookback_days = 30
else:
max_lookback_days = 60
max_lookback = timedelta(days=max_lookback_days)
max_lookback -= timedelta(minutes=5) # allow time for server processing
fetch_start_min = dt_now_local - max_lookback
if self.intraday:
fetch_start_min = fetch_start_min.ceil("1D")
if h.index[0] < fetch_start_min:
# h_old = h.loc[:fetch_start_min-timedelta(seconds=1)]
# h_old_1d = _aggregate_yfdf_daily(h_old.drop(["Final?", "C-Check?", "LastDivAdjustDt", "LastSplitAdjustDt"], axis=1))
# if self.interval == yfcd.Interval.Hours1:
# df_yf = self.dat.history(interval="1d", start=h_old.index[0].date(), end=h_old.index[-1].date()+td_1d, auto_adjust=False, repair="silent")
# else:
# df_yf = self.dat.history(interval="1h", start=h_old.index[0].date(), end=h_old.index[-1].date()+td_1d, auto_adjust=False, repair="silent")
# df_yf = _aggregate_yfdf_daily(df_yf)
# # Aggregated data will almost always differ from actual daily, so only
# # look for big errors
# f_old_diff = yfcu.VerifyPricesDf(h_old_1d, df_yf, self.interval, rtol=0.2, debug=True)
# raise Exception("- investigate")
# f_old_diff = yfcu.VerifyPricesDf(h_old_1d, df_yf, self.interval, rtol=0.2)
# if f_old_diff.any():
# msg = f"Significant differences detected between old {self.istr} data and 1d."
# print()
# # f_diff_all = f_diff_all | f_old_diff
# bad_dates = f_old_diff.index.date[f_old_diff]
# f_diff_all[np.isin(f_diff_all.index.date, bad_dates)] = True
if not isinstance(discard_old, bool):
msg = f"{self.ticker}: Some {self.istr} data is now older than {max_lookback_days} days" +\
" so cannot compare to Yahoo. Discard old data?"
discard_old = click.confirm(msg, default=False)
if discard_old:
f_discard = pd.Series(h.index < fetch_start_min, h.index)
if f_discard.any():
if debug:
print(f"Discarding {np.sum(f_discard)}/{n} old rows because can't verify (fetch_start_min={fetch_start_min})")
f_diff_all = f_diff_all | f_discard
f_diff_all = f_diff_all.rename("Discard")
h = h.loc[fetch_start_min:]
if self.interval == yfcd.Interval.Days1:
# Also verify dividends
divs_df = self.manager.GetHistory("Events").GetDivs()
if divs_df is not None:
divs_df = divs_df[(divs_df['Dividends']!=0.0).to_numpy()]
if divs_df.empty:
divs_df = None
if not h.empty:
# Fetch YF data
start_dt = h.index[0]
last_dt = h.index[-1]
end_dt = last_dt + self.itd
fetch_start = start_dt.date()
if self.itd > timedelta(days=1):
fetch_end = last_dt.date()+yfcd.intervalToTimedelta[self.interval]
else:
fetch_end = last_dt.date()+td_1d
# Sometimes Yahoo doesn't return full trading data for last day if end = day after.
# Add some more days to avoid problem.
fetch_end += 3*td_1d
fetch_end = min(fetch_end, dt_now.tz_convert(self.tzName).ceil("D").date())
repair = True
if self.intraday:
if self.interval == yfcd.Interval.Mins1:
# Fetch in 7-day batches
df_yf = None
td_7d = timedelta(days=7)
fetch_end_batch = fetch_end
fetch_start_batch = fetch_end - td_7d
while fetch_end_batch > fetch_start:
df_yf_batch = self.dat.history(interval=self.istr, start=fetch_start_batch, end=fetch_end_batch, auto_adjust=False, repair=repair, keepna=True)
if "Repaired?" not in df_yf_batch.columns:
df_yf_batch["Repaired?"] = False
if df_yf is None:
df_yf = df_yf_batch
else:
df_yf = pd.concat([df_yf, df_yf_batch], axis=0)
#
fetch_end_batch -= td_7d
fetch_start_batch -= td_7d
fetch_start_batch = max(fetch_start_batch, fetch_start)
#
df_yf = df_yf.sort_index()
else:
df_yf = self.dat.history(interval=self.istr, start=fetch_start, end=fetch_end, auto_adjust=False, repair=repair, keepna=True)
if "Repaired?" not in df_yf.columns:
df_yf["Repaired?"] = False
df_yf = df_yf.loc[start_dt:]
df_yf = df_yf[df_yf.index < end_dt]
# Yahoo doesn't div-adjust intraday
df_yf_1d = self.dat.history(interval="1d", start=df_yf.index[0].date(), end=df_yf.index[-1].date()+td_1d, auto_adjust=False)
if "Repaired?" not in df_yf_1d.columns:
df_yf_1d["Repaired?"] = False
df_yf["_indexBackup"] = df_yf.index
df_yf["_date"] = df_yf.index.date
df_yf_1d["_date"] = df_yf_1d.index.date
#
df_yf_1d["Adj"] = df_yf_1d["Adj Close"].to_numpy() / df_yf_1d["Close"].to_numpy()
df_yf = df_yf.merge(df_yf_1d[["Adj", "_date"]], how="left", on="_date")
df_yf["Adj Close"] = df_yf["Close"].to_numpy() * df_yf["Adj"].to_numpy()
df_yf = df_yf.drop("Adj", axis=1)
#
df_yf.index = df_yf["_indexBackup"]
df_yf = df_yf.drop(["_indexBackup", "_date"], axis=1)
else:
if self.interval == yfcd.Interval.Days1 and divs_df is not None and not divs_df.empty:
# Also use YF data to verify dividends.
fetch_start = min(fetch_start, divs_df.index[0].date())
df_yf = self.dat.history(interval=self.istr, start=fetch_start, end=fetch_end, auto_adjust=False, repair=repair, keepna=True)
if "Repaired?" not in df_yf.columns:
df_yf["Repaired?"] = False
if df_yf.empty:
raise Exception(f"{self.ticker}: YF fetch failed for {self.istr} {fetch_start} -> {fetch_end}")
# Make special adjustments for dividends / stock splits released TODAY
if self.interval == yfcd.Interval.Days1:
if not np.isnan(df_yf["Dividends"].iloc[-1]) and df_yf["Dividends"].iloc[-1] > 0:
expect_new_div_missing = False
if np.isnan(df_yf["Close"].iloc[-1]):
expect_new_div_missing = True
elif h_lastRow.name == df_yf.index[-1] and h_lastRow["Dividends"] == 0.0:
expect_new_div_missing = True
if expect_new_div_missing:
# YFC won't have record of this dividend because occurs tonight/tomorrow, so remove it's adjustment from prices
rev_adj = df_yf["Close"].iloc[-2] / df_yf["Adj Close"].iloc[-2]
if debug:
msg = f"- removing dividend from df_yf-{self.istr}: {df_yf.index[-1]} adj={1.0/rev_adj:.4f}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
df_yf["Adj Close"] *= rev_adj
df_yf = df_yf.drop(df_yf.index[-1])
if df_yf.index[-1] == h_lastRow.name and df_yf["Stock Splits"].iloc[-1] != 0 and h_lastRow["Stock Splits"] == 0:
# YFC doesn't have record of today's split yet so remove effect
rev_adj = df_yf["Stock Splits"].iloc[-1]
if debug:
msg = f"- removing split from df_yf-{self.istr}: {df_yf.index[-1]} adj={1.0/rev_adj}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
for c in ["Open", "High", "Low", "Close", "Adj Close"]:
df_yf[c] *= rev_adj
df_yf["Volume"] /= rev_adj
df_yf.loc[df_yf.index[-1], "Stock Splits"] = 0.0
if df_yf is None or df_yf.empty:
raise Exception(f"Fetching reference yfinance data failed (interval={self.istr}, start_dt={start_dt}, last_dt={last_dt})")
if self.interval == yfcd.Interval.Week:
# Ensure data aligned to Monday:
if not df_yf.index[0].weekday() == 0:
n = 0
while n < 3:
fetch_start -= timedelta(days=2)
df_yf = self.dat.history(interval=self.istr, start=fetch_start, end=fetch_end, auto_adjust=False, repair=repair)
if "Repaired?" not in df_yf.columns:
df_yf["Repaired?"] = False
n += 1
if df_yf.index[0].weekday() == 0:
break
if not df_yf.index[0].weekday() == 0:
raise Exception("Failed to get Monday-aligned weekly data from YF")
df_yf = df_yf.loc[h.index[0]:]
if not self.interday:
# Volume not split-adjusted
ss = df_yf["Stock Splits"].copy()
ss[(ss == 0.0) | ss.isna()] = 1.0
ss_rcp = 1.0 / ss
csf = ss_rcp.sort_index(ascending=False).cumprod().sort_index(ascending=True).shift(-1, fill_value=1.0)
df_yf["Volume"] = df_yf["Volume"].to_numpy() / csf
if self.interval != yfcd.Interval.Days1 and correct in ['one', 'all']:
# Copy over any missing dividends
c = "Dividends"
h_divs = h.loc[h[c] != 0.0, c].copy().dropna()
yf_divs = df_yf.loc[df_yf[c] != 0.0, c]
dts_missing_from_cache = yf_divs.index[~yf_divs.index.isin(h_divs.index)]
dts_missing_from_cache = [dt for dt in dts_missing_from_cache if dt in h.index]
if len(dts_missing_from_cache) > 0:
if debug:
msg = f"CORRECTING: Cache missing these dividends: {dts_missing_from_cache}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
for dt in dts_missing_from_cache:
# Correct here
h.loc[dt, c] = yf_divs.loc[dt]
h_new.loc[dt, c] = yf_divs.loc[dt]
h_modified = True
# Copy over any missing stock splits
c = "Stock Splits"
h_ss = h.loc[h[c] != 0.0, c].copy().dropna()
yf_ss = df_yf.loc[df_yf[c] != 0.0, c]
dts_missing_from_cache = yf_ss.index[~yf_ss.index.isin(h_ss.index)]
dts_missing_from_cache = [dt for dt in dts_missing_from_cache if dt in h.index]
if len(dts_missing_from_cache) > 0:
if debug:
msg = f"CORRECTING: Cache missing these stock splits: {dts_missing_from_cache}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
for dt in dts_missing_from_cache:
# Correct here
h.loc[dt, c] = yf_ss.loc[dt]
h_new.loc[dt, c] = yf_ss.loc[dt]
h_modified = True
if self.interval == yfcd.Interval.Days1 and divs_df is not None and not divs_df.empty:
# Verify dividends
yf_divs = df_yf['Dividends'][df_yf['Dividends']!=0.0]
f_orphan = np.full(divs_df.shape[0], False)
for i in range(divs_df.shape[0]):
div_dt = divs_df.index[i]
if div_dt not in yf_divs.index:
f_orphan[i] = True
if f_orphan.any():
if correct in ['one', 'all']:
print(f'Dropping these orphan dividends: {divs_df.index.date[f_orphan]}')
orphan_divs = divs_df[f_orphan].copy()
orphan_divs['Dividends'] = 0.0
orphan_divs['Close day before'] = 1.0
# print("- divs_df:") ; print(divs_df[['Dividends', 'Superseded div']])
self.manager.GetHistory("Events").UpdateDividends(orphan_divs)
else:
if not quiet:
divs_orphan = divs_df[f_orphan][['Dividends']]
divs_orphan.index = divs_orphan.index.date
print('- detected orphan dividends:', divs_orphan.to_dict())
for dt in divs_df.index[f_orphan]:
f_diff_all[dt] = True
f_diff = yfcu.VerifyPricesDf(h, df_yf, self.interval, rtol=rtol, vol_rtol=vol_rtol, quiet=quiet, debug=debug)
if f_diff.any():
if not f_diff_all.any():
f_diff_all = (f_diff_all | f_diff).rename(f_diff.name)
else:
f_diff_all = f_diff_all | f_diff
if not f_diff_all.any():
if h_modified:
# yfcm.StoreCacheDatum(self.ticker, self.cache_key, h)
yfcm.StoreCacheDatum(self.ticker, self.cache_key, h_new)
self.h = self._getCachedPrices()
yfcl.TraceExit(f"PM::_verifyCachedPrices-{self.istr}() returning True")
return True
h = h_new
if correct in ['one', 'all']:
drop_dts = f_diff_all.index[f_diff_all]
drop_dts_ages = dt_now - drop_dts
if self.interval == yfcd.Interval.Week:
f = drop_dts_ages > timedelta(days=8)
else:
f = drop_dts_ages > timedelta(days=4) # allow for weekend
drop_dts_not_recent = drop_dts[f]
drop_dts_ages = drop_dts_ages[f]
msg = f"{self.ticker}: {self.istr}-prices problems"
if self.contiguous:
# Daily must always be contiguous, so drop everything from first diff
if len(drop_dts_not_recent) > 0:
if len(drop_dts_not_recent) == 1:
msg += f": dropping {drop_dts_not_recent[0].date()}"
else:
if self.interday:
msg += f": dropping all rows from {drop_dts_not_recent[0].date()}"
else:
msg += f": dropping all rows from {drop_dts_not_recent[0]}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
h = h[h.index < drop_dts[0]]
h_modified = True
else:
n = self.h.shape[0]
n_drop = np.sum(f_diff_all)
if len(drop_dts_not_recent) > 0:
if len(drop_dts_not_recent) < 10:
if self.interday:
msg += f": dropping {drop_dts_not_recent.date.astype(str)}"
else:
msg += f": dropping {drop_dts_not_recent.tz_localize(None)}"
else:
msg += f": dropping {n_drop}/{n} rows"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
h2 = h.drop(drop_dts)
if h.shape[0]-h2.shape[0] != n_drop:
raise Exception("here")
h = h2
h_modified = True
if h.empty:
h = None
h_modified = True
else:
if debug:
n = np.sum(f_diff_all)
if n < 5:
msg = "differences found but not correcting: "
if self.interday:
msg += f"{f_diff_all.index[f_diff_all].date}"
else:
msg += f"{f_diff_all.index[f_diff_all]}"
else:
msg = f"{np.sum(f_diff_all)} differences found but not correcting"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
if correct in ['one', 'all'] and f_diff_all.name == "Div-Adjust" and self.interval != yfcd.Interval.Days1:
# All differences caused by bad dividend data.
# To fix, need to force a re-fetch of 1d data.
hist1d = self.manager.GetHistory(yfcd.Interval.Days1)
h1d = hist1d._getCachedPrices()
end_d = f_diff_all.index[-1].date()
# But only if of 1d data fetched at least 24 hours ago, because maybe
# we just fixed the 1d data.
f_fetched_recently = h1d['FetchDate'] > (dt_now - timedelta(days=1))
f_fetched_recently_not = ~f_fetched_recently
if f_fetched_recently_not.any():
f_fetched_recently_not_last_idx = np.where(f_fetched_recently_not)[0][-1]
end_d = h1d.index[f_fetched_recently_not_last_idx]
msg = f"hist-{self.istr} is discarding 1d data before {end_d} to force re-fetch of dividends"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(f"{self.ticker}: " + msg)
h1d = h1d[str(end_d):]
hist1d._updatedCachedPrices(h1d)
if h_modified:
yfcm.StoreCacheDatum(self.ticker, self.cache_key, h)
self.h = self._getCachedPrices()
yfcl.TraceExit(f"PM::_verifyCachedPrices-{self.istr}() returning False")
return False
def _fetchYfHistory(self, start, end, prepost, debug, verify_intervals=True, disable_yfc_metadata=False):
if start is None and end is None:
raise ValueError("Must provide value for one of: 'start', 'end'")
if start is not None:
yfcu.TypeCheckIntervalDt(start, self.interval, "start")
if end is not None:
yfcu.TypeCheckIntervalDt(end, self.interval, "end")
yfcu.TypeCheckBool(prepost, "prepost")
yfcu.TypeCheckBool(debug, "debug")
debug_yfc = False
# debug_yfc = True
log_msg = f"PM::_fetchYfHistory-{self.istr}({start}->{end}, prepost={prepost})"
if yfcl.IsTracingEnabled():
yfcl.TraceEnter(log_msg)
elif debug_yfc:
print("")
print(log_msg)
tz_exchange = self.tz
td_1d = timedelta(days=1)
dt_now = pd.Timestamp.utcnow().tz_convert(ZoneInfo("UTC"))
if self.intraday:
maxLookback = yfcd.yfMaxFetchLookback[self.interval] - timedelta(seconds=10)
if maxLookback is not None:
start = max(start, dt_now - maxLookback)
if start >= end:
return None
fetch_start = start
fetch_end = end
if end is not None:
# If 'fetch_end' in future then cap to exchange midnight
dtnow_exchange = dt_now.tz_convert(tz_exchange)
if isinstance(end, datetime):
end_dt = end
# end_d = end.astimezone(tz_exchange).date()
end_d = None
else:
end_d = end
end_dt = datetime.combine(end, time(0), tz_exchange)
if end_dt > dt_now:
exchange_midnight_dt = datetime.combine(dtnow_exchange.date()+td_1d, time(0), tz_exchange)
if isinstance(end, datetime):
fetch_end = exchange_midnight_dt
else:
fetch_end = exchange_midnight_dt.date()
if start is not None:
if isinstance(start, datetime):
start_dt = start
# start_d = start.astimezone(tz_exchange).date()
start_d = None
else:
start_d = start
start_dt = datetime.combine(start, time(0), tz_exchange)
if (fetch_start is not None) and (fetch_end <= fetch_start):
return None
if fetch_start is not None:
if not isinstance(fetch_start, (datetime, pd.Timestamp)):
fetch_start_dt = datetime.combine(fetch_start, time(0), self.tz)
else:
fetch_start_dt = fetch_start
if fetch_end is not None:
if not isinstance(fetch_end, (datetime, pd.Timestamp)):
fetch_end_dt = datetime.combine(fetch_end, time(0), tz_exchange)
else:
fetch_end_dt = fetch_end
if fetch_start is not None:
if self.interval == yfcd.Interval.Week:
# Ensure aligned to week start:
fetch_start -= timedelta(days=fetch_start.weekday())
td_1d = timedelta(days=1)
td_14d = timedelta(days=14)
if self.interval == yfcd.Interval.Days1:
# Add padding days to ensure Yahoo returns correct Volume
s = yfct.GetExchangeSchedule(self.exchange, fetch_start - td_14d, fetch_end + td_14d)
fetch_start_pad = s.iloc[s.index.get_indexer([str(fetch_start)], method="ffill")[0]-1].name.date()
first_fetch_failed = False
try:
df = self._fetchYfHistory_dateRange(fetch_start_pad, fetch_end, prepost, debug)
df = df.loc[str(fetch_start):].copy()
except yfcd.NoPriceDataInRangeException as e:
first_fetch_failed = True
ex = e
if first_fetch_failed and fetch_end is not None:
# Try with wider date range, maybe entire range is just before listing date
second_fetch_failed = False
df_wider = None
listing_date_check_tol = yfcd.listing_date_check_tols[self.interval]
fetch_start -= 2*listing_date_check_tol
fetch_end += 2*listing_date_check_tol
if debug_yfc:
msg = "- first fetch failed, trying again with wider range: {} -> {}".format(fetch_start, fetch_end)
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
try:
df_wider = self._fetchYfHistory_dateRange(fetch_start, fetch_end, prepost, debug)
if debug_yfc:
msg = "- second fetch returned:"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
print(df_wider)
except Exception as e:
if "Data doesn't exist for startDate" in str(e):
second_fetch_failed = True
elif "No data found for this date range" in str(e):
second_fetch_failed = True
else:
raise e
if df_wider is not None:
if debug_yfc:
print("- detected listing date =", df_wider.index[0].date())
yfcm.StoreCacheDatum(self.ticker, "listing_date", df_wider.index[0].date())
df = df_wider
if fetch_start is not None:
df = df.loc[fetch_start_dt:]
if fetch_end is not None:
df = df.loc[:fetch_end_dt-timedelta(milliseconds=1)]
if first_fetch_failed:
if second_fetch_failed:
# Hopefully code never comes here
raise ex
else:
# Requested date range was just before stock listing date,
# but wider range crosses over so can continue
pass
elif self.interday:
# Add padding days to ensure Yahoo returns correct Volume
s = yfct.GetExchangeSchedule(self.exchange, fetch_start - 2*self.itd, fetch_end + 2*self.itd)
fetch_start_pad = s.iloc[s.index.get_indexer([str(fetch_start)], method="ffill")[0]-1].name.date()
fetch_end_pad = s.iloc[s.index.get_indexer([str(fetch_end)], method="bfill")[0]+1].name.date()
df = self._fetchYfHistory_dateRange(fetch_start_pad, fetch_end_pad, prepost, debug)
df = df.loc[str(fetch_start) : str(fetch_end-td_1d)].copy()
else:
# Intraday
fetch_ranges = [(fetch_start, fetch_end)]
if self.intraday:
# Add padding days to ensure Yahoo returns correct Volume
maxRange = yfcd.yfMaxFetchRange[self.interval]
if maxRange is not None:
s = yfct.GetExchangeSchedule(self.exchange, start_dt.date() - td_14d, end_dt.date() + td_14d)
s = s.iloc[s.index.get_indexer([str(start_dt.date())], method="ffill")[0]-1:]
s = s.iloc[:s.index.get_indexer([str(end_dt.date())], method="bfill")[0]+1+1]
lag = yfcd.exchangeToYfLag[self.exchange]
if start_dt > s["close"].iloc[1]+lag:
s = s.drop(s.index[0])
if end_dt < s["open"].iloc[-2]+lag:
s = s.drop(s.index[-1])
# fetch_ranges = yfcu.ChunkDatesIntoYfFetches(start_d, end_d, s, maxRange.days, overlapDays=2)
fetch_ranges = yfcu.ChunkDatesIntoYfFetches(s, maxRange.days, overlapDays=2)
if debug_yfc:
print("- fetch_ranges:")
pprint(fetch_ranges)
# Don't need to fetch all of padding days, just the end/start of session
# fetch_ranges[0][0] = s["close"].iloc[0] - timedelta(hours=2)
# fetch_ranges[-1][1] = s["open"].iloc[-1] + timedelta(hours=2)
# fetch_ranges[0]["fetch start"] = s["close"].iloc[0] - timedelta(hours=2)
# Update: need start further back for low-volume tickers
fetch_ranges[0]["fetch start"] = s["open"].iloc[0]
fetch_ranges[-1]["fetch end"] = s["open"].iloc[-1] + timedelta(hours=2)
maxLookback = yfcd.yfMaxFetchLookback[self.interval] - timedelta(seconds=10)
if maxLookback is not None:
maxLookback_dt = (dt_now - maxLookback).tz_convert(tz_exchange)
for i in range(len(fetch_ranges)-1, -1, -1):
if fetch_ranges[i]["fetch start"] < maxLookback_dt:
if debug_yfc:
print("- capping start to maxLookback_dt")
# fetch_ranges[i]["fetch start"] = maxLookback_dt
fetch_ranges[i]["fetch start"] = maxLookback_dt.ceil("D")
fetch_ranges[i]["core start"] = fetch_ranges[i]["fetch start"] + td_1d
if fetch_ranges[i]["fetch start"] >= fetch_ranges[i]["fetch end"]:
del fetch_ranges[i]
df = None
for r in fetch_ranges:
if debug_yfc:
print("- fetching:")
print(r)
fetch_start = r["fetch start"]
fetch_end = r["fetch end"]
dfr = self._fetchYfHistory_dateRange(fetch_start, fetch_end, prepost, debug)
# Discard padding days:
dfr = dfr.loc[r["core start"]: r["core end"] - timedelta(milliseconds=1)]
if debug_yfc:
print("- dfr after discarding padding days:")
print(dfr[[c for c in ["Open", "Low", "High", "Close", "Dividends", "Volume"] if c in dfr.columns]])
if df is None:
df = dfr
else:
df = pd.concat([df, dfr[df.columns]])
if df.index.duplicated().any():
raise Exception("df contains duplicated dates")
fetch_dt_utc = pd.Timestamp.utcnow().tz_convert(ZoneInfo("UTC"))
if (df is not None) and (df.index.tz is not None) and (not isinstance(df.index.tz, ZoneInfo)):
# Convert to ZoneInfo
df.index = df.index.tz_convert(tz_exchange)
if debug_yfc:
if df is None:
msg = "- YF returned None"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
else:
# pass
msg = "- YF returned table:"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
print(df[[c for c in ["Open", "Low", "High", "Close", "Dividends", "Volume"] if c in df.columns]])
# Detect listing day
listing_day = yfcm.ReadCacheDatum(self.ticker, "listing_date")
if listing_day is None:
if self.interval == yfcd.Interval.Days1:
found_listing_day = False
listing_day = None
if df is not None and not df.empty:
tol = yfcd.listing_date_check_tols[self.interval]
fetch_start_d = fetch_start.date() if isinstance(fetch_start, datetime) else fetch_start
if (df.index[0].date() - fetch_start_d) > tol:
# Yahoo returned data starting significantly after requested start date, indicates
# request is before stock listed on exchange
found_listing_day = True
if debug_yfc:
msg = "- found_listing_day = {}".format(found_listing_day)
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
if found_listing_day:
listing_day = df.index[0].date()
if debug_yfc:
msg = "YFC: inferred listing_date = {}".format(listing_day)
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
yfcm.StoreCacheDatum(self.ticker, "listing_date", listing_day)
if (listing_day is not None) and first_fetch_failed:
if end <= listing_day:
# Aha! Requested date range was entirely before listing
if debug_yfc:
msg = "- requested date range was before listing date"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
return None
if found_listing_day and start is not None:
# Apply to fetch start
if isinstance(start, datetime):
listing_date = datetime.combine(listing_day, time(0), self.tz)
start = max(start, listing_date)
else:
start = max(start, listing_day)
start_d = start
if df is None:
received_interval_starts = None
else:
if self.interday:
received_interval_starts = df.index.date
else:
received_interval_starts = df.index.to_pydatetime()
try:
intervals_missing_df = yfct.IdentifyMissingIntervals(self.exchange, start, end, self.interval, received_interval_starts, ignore_breaks=True)
except yfcd.NoIntervalsInRangeException:
intervals_missing_df = None
if (intervals_missing_df is not None) and (not intervals_missing_df.empty):
# First, ignore any missing intervals today
# For missing intervals during last 2 weeks, if few in number, then fill with NaNs
# For missing intervals older than 2 weeks, fill all with NaNs
if debug_yfc:
n = intervals_missing_df.shape[0]
if n <= 3:
msg = f"YF data missing {n} intervals: {intervals_missing_df['open'].to_numpy()}"
else:
msg = f"YF data missing {n} intervals"
yfcl.TracePrint('- ' + msg) if yfcl.IsTracingEnabled() else print('- ' + msg)
cutoff_d = date.today() - timedelta(days=14)
if self.interday:
f_recent = intervals_missing_df["open"].to_numpy() > cutoff_d
else:
f_recent = intervals_missing_df["open"].dt.date > cutoff_d
intervals_missing_df_recent = intervals_missing_df[f_recent]
intervals_missing_df_old = intervals_missing_df[~f_recent]
missing_intervals_to_add = None
if not intervals_missing_df_old.empty:
missing_intervals_to_add = intervals_missing_df_old["open"].to_numpy()
if not intervals_missing_df_recent.empty:
# If very few intervals and not today (so Yahoo should have data),
# then assume no trading occurred and insert NaN rows.
# Normally Yahoo has already filled with NaNs but sometimes they forget/are late
nm = intervals_missing_df_recent.shape[0]
if self.interday:
threshold = 1
else:
if self.itd <= timedelta(minutes=2):
threshold = 10
elif self.itd <= timedelta(minutes=5):
threshold = 3
else:
threshold = 2
if nm <= threshold:
if debug_yfc:
msg = "- found missing intervals, inserting nans:"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
print(intervals_missing_df_recent)
if missing_intervals_to_add is None:
missing_intervals_to_add = intervals_missing_df_recent["open"].to_numpy()
else:
missing_intervals_to_add = np.append(missing_intervals_to_add, intervals_missing_df_recent["open"].to_numpy())
if missing_intervals_to_add is not None:
n = missing_intervals_to_add.shape[0]
if n <= 3:
msg = f"insertings NaNs for {n} missing intervals: {missing_intervals_to_add}"
else:
msg = f"insertings NaNs for {n} missing intervals"
if debug_yfc:
yfcl.TracePrint('- ' + msg) if yfcl.IsTracingEnabled() else print('- ' + msg)
else:
self.manager.LogEvent("info", "PriceManager", msg)
nm = missing_intervals_to_add.shape[0]
df_missing = pd.DataFrame(data={k: [np.nan]*nm for k in yfcd.yf_data_cols}, index=missing_intervals_to_add)
df_missing['Volume'] = 0 # Needs to be int type
if "Repaired?" in df.columns:
df_missing["Repaired?"] = False
df_missing.index = pd.to_datetime(df_missing.index)
if self.interday:
df_missing.index = df_missing.index.tz_localize(tz_exchange)
for c in ["Volume", "Dividends", "Stock Splits", "Capital Gains"]:
df_missing[c] = 0
if df is None:
df = df_missing
else:
df = pd.concat([df, df_missing[df.columns]])
df.index = pd.to_datetime(df.index, utc=True).tz_convert(tz_exchange)
df = df.sort_index()
# Improve tolerance to calendar missing a recent new holiday:
if df is None or df.empty:
return None
n = df.shape[0]
fetch_dt = fetch_dt_utc.replace(tzinfo=ZoneInfo("UTC"))
if self.interval == yfcd.Interval.Days1:
# Update: move checking for new dividends to here, before discarding out-of-range data
df_divs = df[df["Dividends"] != 0][["Dividends"]].copy()
if not df_divs.empty:
df_divs['FetchDate'] = fetch_dt_utc
df_divs['Desplitted?'] = False
if debug_yfc:
print("- df_divs:")
print(df_divs)
cached_new_divs = yfcm.ReadCacheDatum(self.ticker, "new_divs")
if cached_new_divs is not None:
if 'Desplitted?' not in cached_new_divs.columns:
cached_new_divs['Desplitted?'] = False
df_divs = df_divs[~df_divs.index.isin(cached_new_divs.index)]
if not df_divs.empty:
divs_pretty = df_divs['Dividends'].copy()
divs_pretty.index = divs_pretty.index.date
self.manager.LogEvent("info", "DividendManager", f"detected {divs_pretty.shape[0]} new dividends: {divs_pretty} (before reversing adjust)")
if yfcm.ReadCacheMetadata(self.ticker, "new_divs", "locked") is not None:
# locked
pass
else:
cached_new_divs = pd.concat([cached_new_divs, df_divs])
yfcm.StoreCacheDatum(self.ticker, "new_divs", cached_new_divs)
else:
yfcm.StoreCacheDatum(self.ticker, "new_divs", df_divs)
# Remove any out-of-range data:
if (n > 0):
# NOTE: YF has a bug-fix pending merge: https://github.com/ranaroussi/yfinance/pull/1012
if end is not None:
if self.interday:
df = df[df.index.date < end_d]
else:
df = df[df.index < end_dt]
n = df.shape[0]
#
# And again for pre-start data:
if start is not None:
if self.interday:
df = df[df.index.date >= start_d]
else:
df = df[df.index >= start_dt]
n = df.shape[0]
# Verify that all datetimes match up with actual intervals:
if n == 0:
raise yfcd.NoPriceDataInRangeException(self.ticker, self.istr, start, end)
else:
if self.interday:
f = df.index.time != time(0)
if f.any():
print(df[f])
raise Exception("Interday data contains times in index")
yfIntervalStarts = df.index.date
else:
yfIntervalStarts = df.index.to_pydatetime()
#
if self.intraday and (self.exchange in yfcd.exchangesWithBreaks):
# Discard any intervals fully within a break
f_in_break = yfct.TimestampInBreak_batch(self.exchange, yfIntervalStarts, self.interval)
if f_in_break.any():
# Discard these
if debug_yfc:
msg = "- dropping rows in break times"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
yfIntervalStarts = yfIntervalStarts[~f_in_break]
df = df[~f_in_break]
n = df.shape[0]
#
intervals = yfct.GetTimestampCurrentInterval_batch(self.exchange, yfIntervalStarts, self.interval, discardTimes=self.interday, ignore_breaks=True)
f_na = intervals["interval_open"].isna().to_numpy()
if verify_intervals and f_na.any():
ts = intervals["interval_open"]
if len(ts) != len(set(ts)):
dups = ts[ts.duplicated(keep=False)]
# Drop rows that map to duplicate intervals if no trading occurred.
f_no_trades = (df["Volume"] == 0) & ((df["Low"] == df["High"]) | df["Close"].isna())
drop_dts = None
for i in dups:
dts = intervals.index[intervals["interval_open"] == i]
dts_is_nan = np.array([f_no_trades[df.index.get_loc(dt)] for dt in dts])
if dts_is_nan.all():
# Keep first, drop others
drop_dts_sub = dts[1:]
else:
# Keep non-nan, drop nans
drop_dts_sub = dts[dts_is_nan]
drop_dts = drop_dts_sub if drop_dts is None else np.append(drop_dts, drop_dts_sub)
# print("dropping:", drop_dts)
yfIntervalStarts = np.delete(yfIntervalStarts, [df.index.get_loc(dt) for dt in drop_dts])
intervals = intervals.drop(drop_dts)
df = df.drop(drop_dts)
n = df.shape[0]
f_na = intervals["interval_open"].isna().to_numpy()
if not self.interday:
# For some exchanges (e.g. JSE) Yahoo returns intraday timestamps right on market close.
# - remove if volume 0
# - else, merge with previous interval
df2 = df.copy() ; df2["_date"] = df2.index.date ; df2["_intervalStart"] = df2.index
sched = yfct.GetExchangeSchedule(self.exchange, df2["_date"].min(), df2["_date"].max()+td_1d)
rename_cols = {"open": "market_open", "close": "market_close"}
sched.columns = [rename_cols[c] if c in rename_cols else c for c in sched.columns]
sched_df = sched.copy()
sched_df["_date"] = sched_df.index.date
df2 = df2.merge(sched_df, on="_date", how="left")
df2.index = df.index
f_close = (df2["_intervalStart"] == df2["market_close"]).to_numpy()
f_close = f_close & f_na
f_vol0 = df2["Volume"] == 0
f_drop = f_vol0 & f_close
if f_drop.any():
if debug_yfc:
msg = "- dropping 0-volume rows starting at market close"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
yfIntervalStarts = yfIntervalStarts[~f_drop]
intervals = intervals[~f_drop]
df = df[~f_drop]
df2 = df2[~f_drop]
n = df.shape[0]
f_na = intervals["interval_open"].isna().to_numpy()
#
f_close = (df2["_intervalStart"] == df2["market_close"]).to_numpy()
f_close = f_close & f_na
if f_close.any():
# Must merge with previous interval. Tricky!
df3 = df2[f_close]
df3_index_rev = sorted(list(df3.index), reverse=True)
for dt in df3_index_rev:
i = df.index.get_loc(dt)
if i == 0:
# Can't fix
continue
dt_before = df.index[i-1]
if (dt-dt_before) <= self.itd:
# Merge
df_rows = df.iloc[i-1:i+1]
df.loc[dt_before, "Low"] = df_rows["Low"].dropna().min()
df.loc[dt_before, "High"] = df_rows["High"].dropna().max()
df.loc[dt_before, "Open"] = df_rows["Open"].dropna()[0]
df.loc[dt_before, "Close"] = df_rows["Close"].dropna()[-1]
df.loc[dt_before, "Adj Close"] = df_rows["Adj Close"].dropna()[-1]
df.loc[dt_before, "Volume"] = df_rows["Volume"].dropna().sum()
yfIntervalStarts = np.delete(yfIntervalStarts, i)
intervals = intervals.drop(dt)
df = df.drop(dt)
n = df.shape[0]
f_na = intervals["interval_open"].isna().to_numpy()
else:
# Previous interval too far, must insert a new interval
raise Exception("this code path not tested, review")
dt_correct = dt_before + self.itd
if dt_correct >= dt:
raise Exception(f"dt_correct={dt_correct} >= dt={dt} , expected <")
if dt_correct.date() != dt.date():
raise Exception(f"dt_correct={dt_correct} & dt={dt} , expected same day")
df.loc[dt_correct] = df.loc[dt] ; df = df.drop(dt).sort_index()
yfIntervalStarts[i] = dt_correct
intervals = intervals.drop(dt)
intervals.loc[dt_correct] = {"interval_open": dt_correct, "interval_close": df2.loc[dt, "market_close"]}
f_na = intervals["interval_open"].isna().to_numpy()
if f_na.any():
f_no_divs_splits = (df['Dividends']==0).to_numpy() & (df['Stock Splits']==0).to_numpy()
# For some national holidays when exchange closed, Yahoo fills in row. Clue is 0 volume.
# Solution = drop:
f_na_zeroVol = f_na & (df["Volume"] == 0).to_numpy()
f_na_zeroVol = f_na_zeroVol & f_no_divs_splits
if f_na_zeroVol.any():
if debug_yfc:
msg = "- dropping {} 0-volume rows with no matching interval".format(sum(f_na_zeroVol))
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
f_drop = f_na_zeroVol
yfIntervalStarts = yfIntervalStarts[~f_drop]
intervals = intervals[~f_drop]
df = df[~f_drop]
n = df.shape[0]
f_na = intervals["interval_open"].isna().to_numpy()
f_no_divs_splits = (df['Dividends']==0).to_numpy() & (df['Stock Splits']==0).to_numpy()
# ... another clue is row is identical to previous trading day
if f_na.any():
f_drop = np.array([False]*n)
for i in np.where(f_na)[0]:
if i > 0:
dt = df.index[i]
last_dt = df.index[i-1]
if (df.loc[dt, yfcd.yf_data_cols] == df.loc[last_dt, yfcd.yf_data_cols]).all():
f_drop[i] = True
if f_drop.any():
if debug_yfc:
msg = "- dropping rows with no interval that are identical to previous row"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
yfIntervalStarts = yfIntervalStarts[~f_drop]
intervals = intervals[~f_drop]
df = df[~f_drop]
n = df.shape[0]
f_na = intervals["interval_open"].isna().to_numpy()
f_no_divs_splits = (df['Dividends']==0).to_numpy() & (df['Stock Splits']==0).to_numpy()
# ... and another clue is Open=High=Low=0.0
if f_na.any():
f_zero = (df['Open']==0).to_numpy() & (df['Low']==0).to_numpy() & (df['High']==0).to_numpy()
f_zero = f_zero & f_no_divs_splits
f_na_zero = f_na & f_zero
if f_na_zero.any():
if debug_yfc:
msg = "- dropping {} price=0 rows with no matching interval".format(sum(f_na_zero))
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
f_drop = f_na_zero
yfIntervalStarts = yfIntervalStarts[~f_drop]
intervals = intervals[~f_drop]
df = df[~f_drop]
n = df.shape[0]
f_na = intervals["interval_open"].isna().to_numpy()
if f_na.any() and self.interval == yfcd.Interval.Mins1:
# If 1-minute interval at market close, then merge with previous minute
indices = sorted(np.where(f_na)[0], reverse=True)
for idx in indices:
dt = df.index[idx]
sched = yfct.GetExchangeSchedule(self.exchange, dt.date(), dt.date()+td_1d)
if dt.time() == sched["close"].iloc[0].time():
if idx == 0:
# Discard
print("discarding")
pass
else:
print("merging")
# Merge with previous
dt1 = df.index[idx-1]
df.loc[dt1, "Close"] = df["Close"].iloc[idx]
df.loc[dt1, "High"] = df["High"].iloc[idx-1:idx+1].max()
df.loc[dt1, "Low"] = df["Low"].iloc[idx-1:idx+1].min()
df.loc[dt1, "Volume"] = df["Volume"].iloc[idx-1:idx+1].sum()
df = df.drop(dt)
intervals = intervals.drop(dt)
yfIntervalStarts = np.delete(yfIntervalStarts, idx)
f_na = intervals["interval_open"].isna().to_numpy()
if f_na.any():
df_na = df[f_na][["Close", "Volume", "Dividends", "Stock Splits"]]
n = df_na.shape[0]
warning_msg = f"Failed to map these Yahoo intervals to xcal: (tkr={self.ticker}, exchange={self.exchange}, xcal={yfcd.exchangeToXcalExchange[self.exchange]})."
warning_msg += " Normally happens when 'exchange_calendars' is wrong so inform developers."
print("")
print(warning_msg)
print(df_na)
msg = "Accept into cache anyway?"
if False:
accept = True
else:
accept = click.confirm(msg, default=False)
if accept:
for idx in np.where(f_na)[0]:
dt = intervals.index[idx]
if self.interday:
intervals.loc[dt, "interval_open"] = df.index[idx].date()
intervals.loc[dt, "interval_close"] = df.index[idx].date() + self.itd
else:
intervals.loc[dt, "interval_open"] = df.index[idx]
intervals.loc[dt, "interval_close"] = df.index[idx] + self.itd
else:
raise Exception("Problem with dates returned by Yahoo, see above")
if df is None or df.empty:
return None
df = df.copy()
if not disable_yfc_metadata:
lastDataDts = yfct.CalcIntervalLastDataDt_batch(self.exchange, intervals["interval_open"].to_numpy(), self.interval)
if f_na.any():
# Hacky solution to handle xcal having incorrect schedule, for valid Yahoo data
lastDataDts[f_na] = intervals.index[f_na] + self.itd
if self.intraday:
lastDataDts[f_na] += yfct.GetExchangeDataDelay(self.exchange)
# For some exchanges, Yahoo has trades that occurred soon afer official market close, e.g. Johannesburg:
if self.exchange in ["JNB"]:
lastDataDts[f_na] += timedelta(minutes=15)
else:
# Add ~10 hours to ensure hit next market open
lastDataDts[f_na] += timedelta(hours=10)
data_final = fetch_dt >= lastDataDts
df["Final?"] = data_final
# df["FetchDate"] = pd.Timestamp(fetch_dt_utc).tz_localize("UTC")
df["FetchDate"] = fetch_dt_utc
df["C-Check?"] = False
log_msg = f"PM::_fetchYfHistory() returning DF {df.index[0]} -> {df.index[-1]}"
if yfcl.IsTracingEnabled():
yfcl.TraceExit(log_msg)
elif debug_yfc:
print(log_msg)
return df
def _fetchYfHistory_dateRange(self, start, end, prepost, debug):
yfcu.TypeCheckIntervalDt(start, self.interval, "start")
yfcu.TypeCheckIntervalDt(end, self.interval, "end")
yfcu.TypeCheckBool(prepost, "prepost")
yfcu.TypeCheckBool(debug, "debug")
debug_yfc = False
# debug_yfc = True
log_msg = f"PM::_fetchYfHistory_dateRange-{self.istr}(start={start} , end={end} , prepost={prepost})"
if yfcl.IsTracingEnabled():
yfcl.TraceEnter(log_msg)
elif debug_yfc:
print("")
print(log_msg)
fetch_start = start
fetch_end = end
if not isinstance(fetch_start, (datetime, pd.Timestamp)):
fetch_start_dt = datetime.combine(fetch_start, time(0), self.tz)
else:
fetch_start_dt = fetch_start
history_args = {"period": None,
"interval": self.istr,
"start": fetch_start, "end": fetch_end,
"prepost": prepost,
"actions": True, # Always fetch
"keepna": True,
"repair": True,
"auto_adjust": False, # store raw data, adjust myself
"back_adjust": False, # store raw data, adjust myself
"proxy": self.proxy,
"rounding": False, # store raw data, round myself
"raise_errors": True}
if debug:
yf_logger = logging.getLogger('yfinance')
yf_logger.setLevel(logging.DEBUG) # verbose: print errors & debug info
if debug_yfc:
if (not isinstance(fetch_start, datetime)) or fetch_start.time() == time(0):
start_str = fetch_start.strftime("%Y-%m-%d")
else:
start_str = fetch_start.strftime("%Y-%m-%d %H:%M:%S")
if (not isinstance(fetch_end, datetime)) or fetch_end.time() == time(0):
end_str = fetch_end.strftime("%Y-%m-%d")
else:
end_str = fetch_end.strftime("%Y-%m-%d %H:%M:%S")
msg = f"- {self.ticker}: fetching {self.istr} {start_str} -> {end_str}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
first_fetch_failed = False
df = None
try:
if debug_yfc:
msg = f"- fetch_start={fetch_start} ; fetch_end={fetch_end}"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
df = self.dat.history(**history_args)
df = df.sort_index()
if "Repaired?" not in df.columns:
df["Repaired?"] = False
if debug_yfc:
if df is None:
msg = "- YF returned None"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
else:
msg = "- YF returned table:"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
print(df[[c for c in ["Open", "Low", "High", "Close", "Dividends", "Volume"] if c in df.columns]])
if df is None or df.empty:
raise Exception("No data found for this date range")
except Exception as e:
first_fetch_failed = True
if "Data doesn't exist for startDate" in str(e):
raise yfcd.NoPriceDataInRangeException(self.ticker, self.istr, start, end)
elif "No data found for this date range" in str(e):
raise yfcd.NoPriceDataInRangeException(self.ticker, self.istr, start, end)
elif "No price data found, symbol may be delisted" in str(e):
raise yfcd.NoPriceDataInRangeException(self.ticker, self.istr, start, end)
else:
print("df:")
print(df)
raise e
if not first_fetch_failed and fetch_start is not None:
log_msg = f"requested from YF: {self.istr} {history_args['start']} -> {history_args['end']}"
if self.interday:
log_msg += f", received: {df.index[0].date()} -> {df.index[-1].date()}"
else:
log_msg += f", received: {df.index[0]} -> {df.index[-1]}"
self.manager.LogEvent("info", "PriceManager", log_msg)
df = df.loc[fetch_start_dt:]
if df.empty:
first_fetch_failed = True
raise yfcd.NoPriceDataInRangeException(self.ticker, self.istr, start, end)
# Check that weekly aligned to Monday. If not, shift start date back and re-fetch
if self.interval == yfcd.Interval.Week and (not df.empty) and (df.index[0].weekday() != 0):
# Despite fetch_start aligned to Monday, sometimes Yahoo returns weekly
# data starting a different day. Shifting back a little fixes
shift_backs = [2, 4]
for d in shift_backs:
fetch_start2 = fetch_start - timedelta(days=d)
history_args["start"] = fetch_start2
if debug_yfc:
msg = "- weekly data not aligned to Monday, re-fetching from {}".format(fetch_start2)
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
df = self.dat.history(**history_args)
if "Repaired?" not in df.columns:
df["Repaired?"] = False
if self.interval == yfcd.Interval.Week and (df.index[0].weekday() == 0):
log_msg = f"requested from YF: {self.istr} {history_args['start']} -> {history_args['end']}"
log_msg += f", received: {df.index[0]} -> {df.index[-1]}"
self.manager.LogEvent("info", "PriceManager", log_msg)
if isinstance(start, datetime):
start_dt = start
else:
start_dt = datetime.combine(start, time(0), self.tz)
df = df.loc[start_dt:]
break
if self.interval == yfcd.Interval.Week and (df.index[0].weekday() != 0):
# print("Date range requested: {} -> {}".format(fetch_start, fetch_end))
print(df)
raise Exception("Weekly data returned by YF doesn't begin Monday but {}".format(df.index[0].weekday()))
if df is not None and df.empty:
df = None
if df is None:
log_msg = "PM::_fetchYfHistory_dateRange() returning None"
else:
log_msg = f"PM::_fetchYfHistory_dateRange() returning DF {df.index[0]} -> {df.index[-1]}"
if yfcl.IsTracingEnabled():
yfcl.TraceExit(log_msg)
elif debug_yfc:
print(log_msg)
# df = yfcu.CustomNanCheckingDataFrame(df)
return df
def _reconstruct_intervals_batch(self, df, tag=-1):
if not isinstance(df, pd.DataFrame):
raise Exception("'df' must be a Pandas DataFrame not", type(df))
if self.interval == yfcd.Interval.Mins1:
return df
# Reconstruct values in df using finer-grained price data. Delimiter marks what to reconstruct
debug = False
# debug = True
price_cols = [c for c in ["Open", "High", "Low", "Close", "Adj Close"] if c in df]
data_cols = price_cols + ["Volume"]
log_msg = f"PM::_reconstruct_intervals_batch-{self.istr}(dt0={df.index[0]})"
yfcl.TraceEnter(log_msg)
# If interval is weekly then can construct with daily. But if smaller intervals then
# restricted to recent times:
min_lookback = None
# - daily = hourly restricted to last 730 days
if self.interval == yfcd.Interval.Week:
# Correct by fetching week of daily data
sub_interval = yfcd.Interval.Days1
td_range = timedelta(days=7)
elif self.interval == yfcd.Interval.Days1:
# Correct by fetching day of hourly data
sub_interval = yfcd.Interval.Hours1
td_range = timedelta(days=1)
min_lookback = timedelta(days=730)
elif self.interval == yfcd.Interval.Hours1:
# Correct by fetching hour of 30m data
sub_interval = yfcd.Interval.Mins30
td_range = timedelta(hours=1)
min_lookback = timedelta(days=60)
elif self.interval == yfcd.Interval.Mins30:
# Correct by fetching hour of 15m data
sub_interval = yfcd.Interval.Mins15
td_range = timedelta(minutes=30)
min_lookback = timedelta(days=60)
elif self.interval == yfcd.Interval.Mins15:
# Correct by fetching hour of 5m data
sub_interval = yfcd.Interval.Mins5
td_range = timedelta(minutes=15)
min_lookback = timedelta(days=60)
elif self.interval == yfcd.Interval.Mins5:
# Correct by fetching hour of 2m data
sub_interval = yfcd.Interval.Mins2
td_range = timedelta(minutes=5)
min_lookback = timedelta(days=60)
elif self.interval == yfcd.Interval.Mins2:
# Correct by fetching hour of 1m data
sub_interval = yfcd.Interval.Mins1
td_range = timedelta(minutes=2)
min_lookback = timedelta(days=30)
else:
msg = f"WARNING: Have not implemented repair for '{self.interval}' interval. Contact developers"
yfcl.TracePrint(msg) if yfcl.IsTracingEnabled() else print(msg)
log_msg = "PM::_reconstruct_intervals_batch() returning"
yfcl.TraceExit(log_msg)
return df
sub_interday = sub_interval in [yfcd.Interval.Days1, yfcd.Interval.Week]#, yfcd.Interval.Months1, yfcd.Interval.Months3]
sub_intraday = not sub_interday
df = df.sort_index()
f_repair = df[data_cols].to_numpy() == tag
f_repair_rows = f_repair.any(axis=1)
# Ignore old intervals for which Yahoo won't return finer data:
# if sub_interval == yfcd.Interval.Hours1:
# f_recent = date.today() - df.index.date < timedelta(days=730)
# f_repair_rows = f_repair_rows & f_recent
# elif sub_interval in [yfcd.Interval.Mins30, yfcd.Interval.Mins15]:
# f_recent = date.today() - df.index.date < timedelta(days=60)
# f_repair_rows = f_repair_rows & f_recent
if min_lookback is None:
min_dt = None
else:
min_dt = pd.Timestamp.utcnow().tz_convert(ZoneInfo("UTC")) - min_lookback
if debug:
print(f"- min_dt={min_dt} interval={self.interval} sub_interval={sub_interval}")
if min_dt is not None:
f_recent = df.index > min_dt
f_repair_rows = f_repair_rows & f_recent
if not f_repair_rows.any():
if debug:
print("- data too old to repair")
return df
dts_to_repair = df.index[f_repair_rows]
# indices_to_repair = np.where(f_repair_rows)[0]
if len(dts_to_repair) == 0:
return df
df_v2 = df.copy()
if "Repaired?" not in df_v2.columns:
df_v2["Repaired?"] = False
df_good = df[~df[price_cols].isna().any(axis=1)]
f_tag = df_v2[price_cols].to_numpy() == tag
# Group nearby NaN-intervals together to reduce number of Yahoo fetches
dts_groups = [[dts_to_repair[0]]]
# last_dt = dts_to_repair[0]
# last_ind = indices_to_repair[0]
# td = yfcd.intervalToTimedelta[self.interval]
# if self.interval == yfcd.Interval.Months1:
# grp_td_threshold = timedelta(days=28)
# elif self.interval == yfcd.Interval.Week:
# grp_td_threshold = timedelta(days=28)
# elif self.interval == yfcd.Interval.Days1:
# grp_td_threshold = timedelta(days=14)
# elif self.interval == yfcd.Interval.Hours1:
# grp_td_threshold = timedelta(days=7)
# else:
# grp_td_threshold = timedelta(days=2)
# # grp_td_threshold = timedelta(days=7)
# for i in range(1, len(dts_to_repair)):
# ind = indices_to_repair[i]
# dt = dts_to_repair[i]
# if (dt-dts_groups[-1][-1]) < grp_td_threshold:
# dts_groups[-1].append(dt)
# elif ind - last_ind <= 3:
# dts_groups[-1].append(dt)
# else:
# dts_groups.append([dt])
# last_dt = dt
# last_ind = ind
# for i in range(1, len(dts_to_repair)):
# ind = indices_to_repair[i]
# dt = dts_to_repair[i]
# if (dt-dts_groups[-1][-1]) < grp_td_threshold:
# dts_groups[-1].append(dt)
# elif ind - last_ind <= 3:
# dts_groups[-1].append(dt)
# else:
# dts_groups.append([dt])
# last_dt = dt
# last_ind = ind
# if self.interval == yfcd.Interval.Months1:
# grp_max_size = dateutil.relativedelta.relativedelta(years=2)
# elif self.interval == yfcd.Interval.Week:
if self.interval == yfcd.Interval.Week:
grp_max_size = dateutil.relativedelta.relativedelta(years=2)
elif self.interval == yfcd.Interval.Days1:
grp_max_size = dateutil.relativedelta.relativedelta(years=2)
elif self.interval == yfcd.Interval.Hours1:
grp_max_size = dateutil.relativedelta.relativedelta(years=1)
else:
grp_max_size = timedelta(days=30)
if debug:
print("- grp_max_size =", grp_max_size)
for i in range(1, len(dts_to_repair)):
# ind = indices_to_repair[i]
dt = dts_to_repair[i]
if dt.date() < dts_groups[-1][0].date()+grp_max_size:
dts_groups[-1].append(dt)
else:
dts_groups.append([dt])
# last_dt = dt
# last_ind = ind
if debug:
print("Repair groups:")
for g in dts_groups:
print(f"- {g[0]} -> {g[-1]}")
# Add some good data to each group, so can calibrate later:
# for i in range(len(dts_groups)):
# g = dts_groups[i]
# g0 = g[0]
# i0 = df_good.index.get_loc(g0)
# if i0 > 0:
# dts_groups[i].insert(0, df_good.index[i0-1])
# gl = g[-1]
# il = df_good.index.get_loc(gl)
# if il < len(df_good)-1:
# dts_groups[i].append(df_good.index[il+1])
for i in range(len(dts_groups)):
g = dts_groups[i]
g0 = g[0]
i0 = df_good.index.get_indexer([g0], method="nearest")[0]
if i0 > 0:
if (min_dt is None or df_good.index[i0-1] >= min_dt) \
and ((not self.intraday) or df_good.index[i0-1].date() == g0.date()):
i0 -= 1
gl = g[-1]
il = df_good.index.get_indexer([gl], method="nearest")[0]
if il < len(df_good)-1:
if (not self.intraday) or df_good.index[il+1].date() == gl.date():
il += 1
good_dts = df_good.index[i0:il+1]
dts_groups[i] += good_dts.to_list()
dts_groups[i].sort()
n_fixed = 0
for g in dts_groups:
df_block = df[df.index.isin(g)]
if debug:
print("df_block:") ; print(df_block)
start_dt = g[0]
start_d = start_dt.date()
if sub_interval == yfcd.Interval.Hours1 and (date.today()-start_d) > timedelta(days=729):
# Don't bother requesting more price data, Yahoo will reject
continue
elif sub_interval in [yfcd.Interval.Mins30, yfcd.Interval.Mins15] and (date.today()-start_d) > timedelta(days=59):
# Don't bother requesting more price data, Yahoo will reject
continue
if self._record_stack_trace:
# Log function calls to detect and manage infinite recursion
fn_tuple = ("_reconstruct_intervals_batch()", f"dt0={df.index[0]}", f"interval={self.interval}")
if fn_tuple in self._stack_trace:
# Detected a potential recursion loop
reconstruct_detected = False
for i in range(len(self._stack_trace)-1, -1, -1):
if "_reconstruct_intervals_batch" in str(self._stack_trace[i]):
reconstruct_detected = True
break
if reconstruct_detected:
self._infinite_recursion_detected = True
self._stack_trace.append(fn_tuple)
if self.interday:
log_msg = f"repairing {self.istr} block {g[0].date()} -> {g[-1].date()+timedelta(days=1)}"
else:
log_msg = f"repairing {self.istr} block {g[0]} -> {g[-1]}"
self.manager.LogEvent("info", "PriceManager", log_msg)
# Infinite loop potential here via repair:
# - request fine-grained data e.g. 1H
# - 1H requires accurate dividend data
# - triggers fetch of 1D data which must be kept contiguous
# - triggers fetch of older 1D data which requires repair using 1H data -> recursion loop
# Solution:
# 1) add tuple to fn stack buy with YF=True
# 2) if that tuple already in stack then raise Exception
if self.interval == yfcd.Interval.Week:
fetch_start = start_d - td_range # need previous week too
fetch_end = g[-1].date() + td_range
elif self.interval == yfcd.Interval.Days1:
fetch_start = start_d
fetch_end = g[-1].date() + td_range
else:
fetch_start = g[0]
fetch_end = g[-1] + td_range
# print(f"fetch_start={fetch_start} fetch_end={fetch_end}")
# prepost = self.interval == yfcd.Interval.Days1
prepost = self.interday
if debug:
print(f"- fetch_start={fetch_start}, fetch_end={fetch_end} prepost={prepost}")
if self._infinite_recursion_detected:
for i in range(len(self._stack_trace)):
print(" "*i + str(self._stack_trace[i]))
raise Exception("WARNING: Infinite recursion detected (see stack trace above). Switch to fetching prices direct from YF")
print("WARNING: Infinite recursion detected (see stack trace above). Switch to fetching prices direct from YF")
# df_fine = self.dat.history(start=fetch_start, end=fetch_end, interval=yfcd.intervalToString[sub_interval], auto_adjust=False, prepost=prepost, keepna=True)
# elif self.interval in [yfcd.Interval.Days1]: # or self._infinite_recursion_detected:
# # Assume infinite recursion will happen
# df_fine = self.dat.history(start=fetch_start, end=fetch_end, interval=yfcd.intervalToString[sub_interval], auto_adjust=False, repair=True)
else:
if prepost and sub_intraday:
# YFC cannot handle intraday pre- and post-market, so fetch via yfinance
if debug:
# print("- fetching df_fine direct from YF")
print(f"- - fetch_start={fetch_start} fetch_end={fetch_end}")
df_fine_old = self.dat.history(start=fetch_start, end=fetch_end, interval=yfcd.intervalToString[sub_interval], auto_adjust=True, prepost=prepost)
hist_sub = self.manager.GetHistory(sub_interval)
if not isinstance(fetch_start, datetime):
fetch_start = datetime.combine(fetch_start, time(0), ZoneInfo(self.tzName))
if not isinstance(fetch_end, datetime):
fetch_end = datetime.combine(fetch_end, time(0), ZoneInfo(self.tzName))
if debug:
print("- fetching df_fine via _fetchYfHistory() wrapper")
print(f"- - fetch_start={fetch_start} fetch_end={fetch_end}")
try:
df_fine = hist_sub._fetchYfHistory(start=fetch_start, end=fetch_end, prepost=prepost, debug=False, verify_intervals=False, disable_yfc_metadata=True)
except yfcd.NoPriceDataInRangeException as e:
if debug:
print("- fetch of fine price data failed:" + str(e))
continue
if df_fine is not None:
adj = (df_fine["Adj Close"]/df_fine["Close"]).to_numpy()
for c in ["Open", "Low", "High", "Close"]:
df_fine[c] *= adj
df_fine = df_fine[["Open", "Low", "High", "Close", "Volume", "Dividends", "Stock Splits"]]
if debug:
print("df_fine_old:")
print(df_fine_old)
print("df_fine:")
print(df_fine)
# raise Exception("here")
else:
if debug:
print("- fetching df_fine via YFC")
hist_sub = self.manager.GetHistory(sub_interval)
df_fine = hist_sub.get(fetch_start, fetch_end, adjust_splits=False, adjust_divs=False, repair=False, prepost=prepost)
# df_fine["Adj Close"] = df_fine["Close"] * df_fine["CDF"]
if debug:
print("- df_fine:")
print(df_fine)
if df_fine is None or len(df_fine) == 0:
print("YF: WARNING: Cannot reconstruct because Yahoo not returning data in interval")
if self._record_stack_trace:
# Pop stack trace
if len(self._stack_trace) == 0:
raise Exception("Failing to correctly push/pop stack trace (is empty too early)")
if not self._stack_trace[-1] == fn_tuple:
for i in range(len(self._stack_trace)):
print(" "*i + str(self._stack_trace[i]))
raise Exception("Failing to correctly push/pop stack trace (see above)")
self._stack_trace.pop(len(self._stack_trace) - 1)
continue
df_fine["ctr"] = 0
if self.interval == yfcd.Interval.Week:
# df_fine["Week Start"] = df_fine.index.tz_localize(None).to_period("W-SUN").start_time
weekdays = ["MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN"]
week_end_day = weekdays[(df_block.index[0].weekday() + 7 - 1) % 7]
df_fine["Week Start"] = df_fine.index.tz_localize(None).to_period("W-"+week_end_day).start_time
grp_col = "Week Start"
elif self.interval == yfcd.Interval.Days1:
df_fine["Day Start"] = pd.to_datetime(df_fine.index.date)
grp_col = "Day Start"
else:
df_fine.loc[df_fine.index.isin(df_block.index), "ctr"] = 1
df_fine["intervalID"] = df_fine["ctr"].cumsum()
df_fine = df_fine.drop("ctr", axis=1)
grp_col = "intervalID"
# df_fine = df_fine[~df_fine[price_cols].isna().all(axis=1)]
df_new = df_fine.groupby(grp_col).agg(
Open=("Open", "first"),
Close=("Close", "last"),
# AdjClose=("Adj Close", "last"),
Low=("Low", "min"),
High=("High", "max"),
Volume=("Volume", "sum"))
# CSF=("CSF", "first"),
# CDF=("CDF", "first"))#.rename(columns={"AdjClose": "Adj Close"})
if grp_col in ["Week Start", "Day Start"]:
df_new.index = df_new.index.tz_localize(df_fine.index.tz)
else:
df_fine["diff"] = df_fine["intervalID"].diff()
new_index = np.append([df_fine.index[0]], df_fine.index[df_fine["intervalID"].diff() > 0])
df_new.index = new_index
# Calibrate! Check whether 'df_fine' has different split-adjustment.
# If different, then adjust to match 'df'
df_block_calib = df_block[price_cols]
# df_new_calib = df_new[df_new.index.isin(df_block_calib.index)][price_cols]
# df_block_calib = df_block_calib[df_block_calib.index.isin(df_new_calib)]
common_index = np.intersect1d(df_block_calib.index, df_new.index)
df_new_calib = df_new[price_cols].loc[common_index]
df_block_calib = df_block_calib.loc[common_index]
calib_filter = (df_block_calib != tag).to_numpy()
if not calib_filter.any():
# Can't calibrate so don't attempt repair
if self._record_stack_trace:
# Pop stack trace
if len(self._stack_trace) == 0:
raise Exception("Failing to correctly push/pop stack trace (is empty too early)")
if not self._stack_trace[-1] == fn_tuple:
for i in range(len(self._stack_trace)):
print(" "*i + str(self._stack_trace[i]))
raise Exception("Failing to correctly push/pop stack trace (see above)")
self._stack_trace.pop(len(self._stack_trace) - 1)
continue
if debug:
print("calib_filter:") ; print(calib_filter)
print("df_new_calib:") ; print(df_new_calib)
print("df_block_calib:") ; print(df_block_calib)
# Avoid divide-by-zero warnings printing:
df_new_calib = df_new_calib.to_numpy()
df_block_calib = df_block_calib.to_numpy()
for j in range(len(price_cols)):
c = price_cols[j]
f = ~calib_filter[:, j]
if f.any():
df_block_calib[f, j] = 1
df_new_calib[f, j] = 1
ratios = (df_block_calib / df_new_calib)[calib_filter]
ratio = np.mean(ratios)
#
ratio_rcp = round(1.0 / ratio, 1)
ratio = round(ratio, 1)
if ratio == 1 and ratio_rcp == 1:
# Good!
pass
else:
if ratio > 1:
# data has different split-adjustment than fine-grained data
# Adjust fine-grained to match
df_new[price_cols] *= ratio
df_new["Volume"] /= ratio
df_new["Volume"] = df_new["Volume"].round(0).astype('int')
elif ratio_rcp > 1:
# data has different split-adjustment than fine-grained data
# Adjust fine-grained to match
df_new[price_cols] *= 1.0 / ratio_rcp
df_new["Volume"] *= ratio_rcp
df_new["Volume"] = df_new["Volume"].round(0).astype('int')
# Repair!
bad_dts = df_block.index[(df_block[price_cols] == tag).any(axis=1)]
for idx in bad_dts:
if idx not in df_new.index:
# Yahoo didn't return finer-grain data for this interval,
# so probably no trading happened.
# print("no fine data")
continue
df_new_row = df_new.loc[idx]
if self.interval == yfcd.Interval.Week:
df_last_week = df_new.iloc[df_new.index.get_loc(idx)-1]
df_fine = df_fine.loc[idx:]
df_bad_row = df.loc[idx]
bad_fields = df_bad_row.index[df_bad_row == tag].to_numpy()
if "High" in bad_fields:
df_v2.loc[idx, "High"] = df_new_row["High"]
if "Low" in bad_fields:
df_v2.loc[idx, "Low"] = df_new_row["Low"]
if "Open" in bad_fields:
if self.interval == yfcd.Interval.Week and idx != df_fine.index[0]:
# Exchange closed Monday. In this case, Yahoo sets Open to last week close
df_v2.loc[idx, "Open"] = df_last_week["Close"]
df_v2.loc[idx, "Low"] = min(df_v2.loc[idx, "Open"], df_v2.loc[idx, "Low"])
else:
df_v2.loc[idx, "Open"] = df_new_row["Open"]
if "Close" in bad_fields:
df_v2.loc[idx, "Close"] = df_new_row["Close"]
# Should not need to copy over CDF & CSF, because
# correct values are already merged in from daily
# df_v2.loc[idx, "CDF"] = df_new_row["CDF"]
# df_v2.loc[idx, "CSF"] = df_new_row["CSF"]
if "Volume" in bad_fields:
df_v2.loc[idx, "Volume"] = df_new_row["Volume"]
df_v2.loc[idx, "Repaired?"] = True
n_fixed += 1
# Restore un-repaired bad values
f_nan = df_v2[price_cols].isna().to_numpy()
f_failed = f_tag & f_nan
for j in range(len(price_cols)):
f = f_failed[:, j]
if f.any():
c = price_cols[j]
df_v2.loc[f, c] = df.loc[f, c]
if self._record_stack_trace:
# Pop stack trace
if len(self._stack_trace) == 0:
raise Exception("Failing to correctly push/pop stack trace (is empty too early)")
if not self._stack_trace[-1] == fn_tuple:
for i in range(len(self._stack_trace)):
print(" "*i + str(self._stack_trace[i]))
raise Exception("Failing to correctly push/pop stack trace (see above)")
self._stack_trace.pop(len(self._stack_trace) - 1)
log_msg = "PM::_reconstruct_intervals_batch() returning"
yfcl.TraceExit(log_msg)
return df_v2
def _repairUnitMixups(self, df, silent=False):
df2 = self._fixUnitSwitch(df)
df3 = self._repairSporadicUnitMixups(df2, silent)
return df3
def _repairSporadicUnitMixups(self, df, silent=False):
yfcu.TypeCheckDataFrame(df, "df")
# Sometimes Yahoo returns few prices in cents/pence instead of $/£
# I.e. 100x bigger
# Easy to detect and fix, just look for outliers = ~100x local median
if df.empty:
return df
if df.shape[0] == 1:
# Need multiple rows to confidently identify outliers
return df
log_msg_enter = f"PM::_repairSporadicUnitMixups-{self.istr}()"
log_msg_exit = f"PM::_repairSporadicUnitMixups-{self.istr}() returning"
yfcl.TraceEnter(log_msg_enter)
df2 = df.copy()
data_cols = ["High", "Open", "Low", "Close"] # Order important, separate High from Low
data_cols = [c for c in data_cols if c in df2.columns]
f_zeroes = (df2[data_cols] == 0).any(axis=1)
if f_zeroes.any():
df2_zeroes = df2[f_zeroes]
df2 = df2[~f_zeroes]
else:
df2_zeroes = None
if df2.shape[0] <= 1:
yfcl.TraceExit(log_msg_exit)
return df
df2_data = df2[data_cols].to_numpy()
median = _ndimage.median_filter(df2_data, size=(3, 3), mode="wrap")
ratio = df2_data / median
ratio_rounded = (ratio / 20).round() * 20 # round ratio to nearest 20
f = ratio_rounded == 100
ratio_rcp = 1.0/ratio
ratio_rcp_rounded = (ratio_rcp / 20).round() * 20 # round ratio to nearest 20
f_rcp = (ratio_rounded == 100) | (ratio_rcp_rounded == 100)
f_either = f | f_rcp
if not f_either.any():
yfcl.TraceExit(log_msg_exit)
return df
# Mark values to send for repair
tag = -1.0
for i in range(len(data_cols)):
fi = f_either[:, i]
c = data_cols[i]
df2.loc[fi, c] = tag
n_before = (df2_data == tag).sum()
try:
df2 = self._reconstruct_intervals_batch(df2, tag=tag)
except Exception:
if len(self._stack_trace) > 0:
self._stack_trace.pop(len(self._stack_trace) - 1)
raise
n_after = (df2[data_cols].to_numpy() == tag).sum()
if n_after > 0:
# This second pass will *crudely* "fix" any remaining errors in High/Low
# simply by ensuring they don't contradict e.g. Low = 100x High.
f = (df2[data_cols].to_numpy() == tag) & f
for i in range(f.shape[0]):
fi = f[i, :]
if not fi.any():
continue
idx = df2.index[i]
for c in ['Open', 'Close']:
j = data_cols.index(c)
if fi[j]:
df2.loc[idx, c] = df.loc[idx, c] * 0.01
c = "High" ; j = data_cols.index(c)
if fi[j]:
df2.loc[idx, c] = df2.loc[idx, ["Open", "Close"]].max()
c = "Low" ; j = data_cols.index(c)
if fi[j]:
df2.loc[idx, c] = df2.loc[idx, ["Open", "Close"]].min()
f_rcp = (df2[data_cols].to_numpy() == tag) & f_rcp
for i in range(f_rcp.shape[0]):
fi = f_rcp[i, :]
if not fi.any():
continue
idx = df2.index[i]
for c in ['Open', 'Close']:
j = data_cols.index(c)
if fi[j]:
df2.loc[idx, c] = df.loc[idx, c] * 100.0
c = "High" ; j = data_cols.index(c)
if fi[j]:
df2.loc[idx, c] = df2.loc[idx, ["Open", "Close"]].max()
c = "Low" ; j = data_cols.index(c)
if fi[j]:
df2.loc[idx, c] = df2.loc[idx, ["Open", "Close"]].min()
n_after_crude = (df2[data_cols].to_numpy() == tag).sum()
n_fixed = n_before - n_after_crude
n_fixed_crudely = n_after - n_after_crude
if not silent and n_fixed > 0:
report_msg = f"{self.ticker}: fixed {n_fixed}/{n_before} currency unit mixups "
if n_fixed_crudely > 0:
report_msg += f"({n_fixed_crudely} crudely) "
report_msg += f"in {self.interval} price data"
print(report_msg)
# Restore original values where repair failed
f_either = df2[data_cols].to_numpy() == tag
for j in range(len(data_cols)):
fj = f_either[:, j]
if fj.any():
c = data_cols[j]
df2.loc[fj, c] = df.loc[fj, c]
if df2_zeroes is not None:
df2 = pd.concat([df2, df2_zeroes]).sort_index()
df2.index = pd.to_datetime()
yfcl.TraceExit(log_msg_exit)
return df2
def _fixUnitSwitch(self, df):
# Sometimes Yahoo returns few prices in cents/pence instead of $/£
# I.e. 100x bigger
# 2 ways this manifests:
# - random 100x errors spread throughout table
# - a sudden switch between $<->cents at some date
# This function fixes the second.
# Eventually Yahoo fixes but could take them 2 weeks.
if self.exchange == 'KUW':
# Kuwaiti Dinar divided into 1000 not 100
n = 1000.0
else:
n = 100.0
return self._fixPricesSuddenChange(df, n)
def _fixBadStockSplits(self, df):
# Original logic only considered latest split adjustment could be missing, but
# actually **any** split adjustment can be missing. So check all splits in df.
#
# Improved logic looks for BIG daily price changes that closely match the
# **nearest future** stock split ratio. This indicates Yahoo failed to apply a new
# stock split to old price data.
#
# There is a slight complication, because Yahoo does another stupid thing.
# Sometimes the old data is adjusted twice. So cannot simply assume
# which direction to reverse adjustment - have to analyse prices and detect.
# Not difficult.
if df.empty:
return df
if not self.interday:
return df
df = df.sort_index() # scan splits oldest -> newest
split_f = df['Stock Splits'].to_numpy() != 0
if not split_f.any():
return df
for split_idx in np.where(split_f)[0]:
split_dt = df.index[split_idx]
split = df.loc[split_dt, 'Stock Splits']
if split_dt == df.index[0]:
continue
# logger.debug(f'price-repair-split: Checking split {split:.4f} @ {split_dt.date()} for possible repair')
cutoff_idx = min(df.shape[0], split_idx+1) # add one row after to detect big change
df_pre_split = df.iloc[0:cutoff_idx+1]
df_pre_split_repaired = self._fixPricesSuddenChange(df_pre_split, split, correct_volume=True)
# Merge back in:
if cutoff_idx == df.shape[0]-1:
df = df_pre_split_repaired
else:
df = pd.concat([df_pre_split_repaired.sort_index(), df.iloc[cutoff_idx+1:]])
return df
def _fixPricesSuddenChange(self, df, change, correct_volume=False):
log_func = f"PM::_fixPricesSuddenChange-{self.istr}(change={change:.2f})"
yfcl.TraceEnter(log_func)
df2 = df.sort_index(ascending=False)
split = change
split_rcp = 1.0 / split
if change in [100.0, 0.01]:
fix_type = '100x error'
start_min = None
else:
fix_type = 'bad split'
f = df2['Stock Splits'].to_numpy() != 0.0
start_min = (df2.index[f].min() - dateutil.relativedelta.relativedelta(years=1)).date()
OHLC = ['Open', 'High', 'Low', 'Close']
# Do not attempt repair of the split is small,
# could be mistaken for normal price variance
if 0.8 < split < 1.25:
yfcl.TraceExit(log_func + ": aborting, split too near 1.0")
return df
n = df2.shape[0]
df_debug = df2.copy()
df_debug = df_debug.drop(['Volume', 'Dividends', 'Repaired?'], axis=1, errors='ignore')
df_debug = df_debug.drop(['CDF', 'CSF', 'C-Check?', 'LastDivAdjustDt', 'LastSplitAdjustDt'], axis=1, errors='ignore')
debug_cols = ['Open', 'Close']
df_debug = df_debug.drop([c for c in OHLC if c not in debug_cols], axis=1, errors='ignore')
# Calculate daily price % change. To reduce effect of price volatility,
# calculate change for each OHLC column.
if self.interday and self.interval != yfcd.Interval.Days1 and split not in [100.0, 100, 0.001]:
# Avoid using 'Low' and 'High'. For multiday intervals, these can be
# very volatile so reduce ability to detect genuine stock split errors
_1d_change_x = np.full((n, 2), 1.0)
price_data = df2[['Open','Close']].to_numpy()
f_zero = price_data == 0.0
else:
_1d_change_x = np.full((n, 4), 1.0)
price_data = df2[OHLC].to_numpy()
f_zero = price_data == 0.0
if f_zero.any():
price_data[f_zero] = 1.0
# Update: if a VERY large dividend is paid out, then can be mistaken for a 1:2 stock split.
# Fix = use adjusted prices
for j in range(price_data.shape[1]):
price_data[:,j] *= df2['CDF']
_1d_change_x[1:] = price_data[1:, ] / price_data[:-1, ]
f_zero_num_denom = f_zero | np.roll(f_zero, 1, axis=0)
if f_zero_num_denom.any():
_1d_change_x[f_zero_num_denom] = 1.0
if self.interday and self.interval != yfcd.Interval.Days1:
# average change
_1d_change_minx = np.average(_1d_change_x, axis=1)
else:
# # change nearest to 1.0
# diff = np.abs(_1d_change_x - 1.0)
# j_indices = np.argmin(diff, axis=1)
# _1d_change_minx = _1d_change_x[np.arange(n), j_indices]
# Still distorted by extreme-low high/low. So try median:
_1d_change_minx = np.median(_1d_change_x, axis=1)
f_na = np.isnan(_1d_change_minx)
if f_na.any():
# Possible if data was too old for reconstruction.
_1d_change_minx[f_na] = 1.0
df_debug['1D change X'] = _1d_change_minx
df_debug['1D change X'] = df_debug['1D change X'].round(2).astype('str')
# If all 1D changes are closer to 1.0 than split, exit
split_max = max(split, split_rcp)
if np.max(_1d_change_minx) < (split_max - 1) * 0.5 + 1 and np.min(_1d_change_minx) > 1.0 / ((split_max - 1) * 0.5 + 1):
reason = "changes too near 1.0"
reason += f" (_1d_change_minx = {np.min(_1d_change_minx):.2f} -> {np.max(_1d_change_minx):.2f})"
yfcl.TracePrint(reason)
yfcl.TraceExit(log_func + " aborting")
return df
# Calculate the true price variance, i.e. remove effect of bad split-adjustments.
# Key = ignore 1D changes outside of interquartile range
q1, q3 = np.percentile(_1d_change_minx, [25, 75])
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
f = (_1d_change_minx >= lower_bound) & (_1d_change_minx <= upper_bound)
avg = np.mean(_1d_change_minx[f])
sd = np.std(_1d_change_minx[f])
# Now can calculate SD as % of mean
sd_pct = sd / avg
msg = f"Estimation of true 1D change stats: mean = {avg:.2f}, StdDev = {sd:.4f} ({sd_pct*100.0:.1f}% of mean)"
self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
# Only proceed if split adjustment far exceeds normal 1D changes
largest_change_pct = 5 * sd_pct
if self.interday and self.interval != yfcd.Interval.Days1:
largest_change_pct *= 3
# if self.interval in [yfcd.Interval.Months1, yfcd.Interval.Months3]:
# largest_change_pct *= 2
if max(split, split_rcp) < 1.0 + largest_change_pct:
msg = "Split ratio too close to normal price volatility. Won't repair"
self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
# msg = f"price-repair-split: my workings:" + '\n' + str(df_debug)
# self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
yfcl.TraceExit(log_func + ": aborting, changes to near normal price volatility")
return df
# Now can detect bad split adjustments
# Set threshold to halfway between split ratio and largest expected normal price change
r = _1d_change_minx / split_rcp
split_max = max(split, split_rcp)
threshold = (split_max + 1.0 + largest_change_pct) * 0.5
msg = f"split_max={split_max:.3f} largest_change_pct={largest_change_pct:.4f} threshold={threshold:.3f}"
self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
if 'Repaired?' not in df2.columns:
df2['Repaired?'] = False
if self.interday and self.interval != yfcd.Interval.Days1:
# Yahoo creates multi-day intervals using potentiall corrupt data, e.g.
# the Close could be 100x Open. This means have to correct each OHLC column
# individually
correct_columns_individually = True
else:
correct_columns_individually = False
if correct_columns_individually:
_1d_change_x = np.full((n, 4), 1.0)
price_data = df2[OHLC].replace(0.0, 1.0).to_numpy()
_1d_change_x[1:] = price_data[1:, ] / price_data[:-1, ]
else:
_1d_change_x = _1d_change_minx
r = _1d_change_x / split_rcp
f_down = _1d_change_x < 1.0 / threshold
f_up = _1d_change_x > threshold
f = f_down | f_up
if not correct_columns_individually:
df_debug['r'] = r
df_debug['f_down'] = f_down
df_debug['f_up'] = f_up
df_debug['r'] = df_debug['r'].round(2).astype('str')
else:
for j in range(len(OHLC)):
c = OHLC[j]
if c in debug_cols:
df_debug[c + '_r'] = r[:, j]
df_debug[c + '_f_down'] = f_down[:, j]
df_debug[c + '_f_up'] = f_up[:, j]
df_debug[c + '_r'] = df_debug[c + '_r'].round(2).astype('str')
if not f.any():
yfcl.TraceExit(log_func + ": aborting, did not detect split errors")
return df
# If stock is currently suspended and not in USA, then usually Yahoo introduces
# 100x errors into suspended intervals. Clue is no price change and 0 volume.
# Better to use last active trading interval as baseline.
f_no_activity = (df2['Low'] == df2['High']) & (df2['Volume']==0)
f_no_activity = f_no_activity | df2[OHLC].isna().all(axis=1)
appears_suspended = f_no_activity.any() and np.where(f_no_activity)[0][0]==0
f_active = ~f_no_activity
# First, ideally, look for 2 consecutive intervals of activity that are not
# affected by change errors
if f.ndim == 1:
f_active = f_active & (~f)
else:
f_active = f_active & (~f.any(axis=1))
f_active = f_active & np.roll(f_active, 1)
if not f_active.any():
# First plan failed, will have to settle for most recent active interval
f_active = ~f_no_activity
f_active = f_active & np.roll(f_active, 1)
idx_latest_active = np.where(f_active)[0]
if len(idx_latest_active) == 0:
idx_latest_active = None
else:
idx_latest_active = int(idx_latest_active[0])
msg = f'appears_suspended={appears_suspended} idx_latest_active={idx_latest_active}'
if idx_latest_active is not None:
msg += f' ({df2.index[idx_latest_active].date()})'
self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
# Update: if any 100x changes are soon after a stock split, so could be confused with split error, then abort
threshold_days = 30
f_splits = df2['Stock Splits'].to_numpy() != 0.0
if change in [100.0, 0.01] and f_splits.any():
indices_A = np.where(f_splits)[0]
indices_B = np.where(f)[0]
if not len(indices_A) or not len(indices_B):
yfcl.TraceExit(log_func)
return None
gaps = indices_B[:, None] - indices_A
# Because data is sorted in DEscending order, need to flip gaps
gaps *= -1
f_pos = gaps > 0
if f_pos.any():
gap_min = gaps[f_pos].min()
gap_td = self.itd * gap_min
if isinstance(gap_td, dateutil.relativedelta.relativedelta):
threshold = dateutil.relativedelta.relativedelta(days=threshold_days)
else:
threshold = timedelta(days=threshold_days)
if gap_td < threshold:
msg = 'price-repair-split: 100x changes are too soon after stock split events, aborting'
self.manager.LogEvent('info', 'price-repair-split-'+self.istr, msg)
yfcl.TraceExit(log_func)
return df
# if self.interday:
# df_debug.index = df_debug.index.date
# for c in ['FetchDate']:
# df_debug[c] = df_debug[c].dt.strftime('%Y-%m-%d %H:%M:%S%z')
# # f_change_happened = df_debug['f_down'] | df_debug['f_up']
# f_change_happened = df_debug['High_f_down'] | df_debug['High_f_up'] | df_debug['Low_f_down'] | df_debug['Low_f_up']
# f_change_happened = f_change_happened | np.roll(f_change_happened, 1) | np.roll(f_change_happened, -1) | np.roll(f_change_happened, 2) | np.roll(f_change_happened, -2)
# f_change_happened[0] = True ; f_change_happened[-1] = True
# df_debug = df_debug[f_change_happened]
# # # df_debug = df_debug.loc[df.index.date <= date(2023, 2, 13)]['Close'].to_numpy()
# # # df_debug = df_debug.iloc[42*5 : 46*5]
# # # df_debug = df.sort_index().loc['2023-06-29':'2023-07-04'][OHLC].sort_index(ascending=False)
# msg = f"price-repair-split: my workings:" + '\n' + str(df_debug)
# self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
# quit()
def map_signals_to_ranges(f, f_up, f_down):
# Ensure 0th element is False, because True is nonsense
if f[0]:
f = np.copy(f) ; f[0] = False
f_up = np.copy(f_up) ; f_up[0] = False
f_down = np.copy(f_down) ; f_down[0] = False
if not f.any():
return []
true_indices = np.where(f)[0]
ranges = []
for i in range(len(true_indices) - 1):
if i % 2 == 0:
if split > 1.0:
adj = 'split' if f_down[true_indices[i]] else '1.0/split'
else:
adj = '1.0/split' if f_down[true_indices[i]] else 'split'
ranges.append((true_indices[i], true_indices[i + 1], adj))
if len(true_indices) % 2 != 0:
if split > 1.0:
adj = 'split' if f_down[true_indices[-1]] else '1.0/split'
else:
adj = '1.0/split' if f_down[true_indices[-1]] else 'split'
ranges.append((true_indices[-1], len(f), adj))
return ranges
if idx_latest_active is not None:
idx_rev_latest_active = df.shape[0] - 1 - idx_latest_active
msg = f'idx_latest_active={idx_latest_active}, idx_rev_latest_active={idx_rev_latest_active}'
self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
if correct_columns_individually:
f_corrected = np.full(n, False)
if correct_volume:
# If Open or Close is repaired but not both,
# then this means the interval has a mix of correct
# and errors. A problem for correcting Volume,
# so use a heuristic:
# - if both Open & Close were Nx bad => Volume is Nx bad
# - if only one of Open & Close are Nx bad => Volume is 0.5*Nx bad
f_open_fixed = np.full(n, False)
f_close_fixed = np.full(n, False)
OHLC_correct_ranges = [None, None, None, None]
for j in range(len(OHLC)):
c = OHLC[j]
idx_first_f = np.where(f)[0][0]
if appears_suspended and (idx_latest_active is not None and idx_latest_active >= idx_first_f):
# Suspended midway during data date range.
# 1: process data before suspension in index-ascending (date-descending) order.
# 2: process data after suspension in index-descending order. Requires signals to be reversed,
# then returned ranges to also be reversed, because this logic was originally written for
# index-ascending (date-descending) order.
fj = f[:, j]
f_upj = f_up[:, j]
f_downj = f_down[:, j]
ranges_before = map_signals_to_ranges(fj[idx_latest_active:], f_upj[idx_latest_active:], f_downj[idx_latest_active:])
if len(ranges_before) > 0:
# Shift each range back to global indexing
for i in range(len(ranges_before)):
r = ranges_before[i]
ranges_before[i] = (r[0] + idx_latest_active, r[1] + idx_latest_active, r[2])
f_rev_downj = np.flip(np.roll(f_upj, -1)) # correct
f_rev_upj = np.flip(np.roll(f_downj, -1)) # correct
f_revj = f_rev_upj | f_rev_downj
ranges_after = map_signals_to_ranges(f_revj[idx_rev_latest_active:], f_rev_upj[idx_rev_latest_active:], f_rev_downj[idx_rev_latest_active:])
if len(ranges_after) > 0:
# Shift each range back to global indexing:
for i in range(len(ranges_after)):
r = ranges_after[i]
ranges_after[i] = (r[0] + idx_rev_latest_active, r[1] + idx_rev_latest_active, r[2])
# Flip range to normal ordering
for i in range(len(ranges_after)):
r = ranges_after[i]
ranges_after[i] = (n-r[1], n-r[0], r[2])
ranges = ranges_before ; ranges.extend(ranges_after)
else:
ranges = map_signals_to_ranges(f[:, j], f_up[:, j], f_down[:, j])
if start_min is not None:
# Prune ranges that are older than start_min
for i in range(len(ranges)-1, -1, -1):
r = ranges[i]
if df2.index[r[0]].date() < start_min:
msg = f'price-repair-split: Pruning {c} range {df2.index[r[0]]}->{df2.index[r[1]-1]} because too old.'
self.manager.LogEvent('info', 'price-repair-split-'+self.istr, msg)
del ranges[i]
if len(ranges) > 0:
OHLC_correct_ranges[j] = ranges
count = sum([1 if x is not None else 0 for x in OHLC_correct_ranges])
if count == 0:
pass
elif count == 1:
# If only 1 column then assume false positive
idxs = [i if OHLC_correct_ranges[i] else -1 for i in range(len(OHLC))]
idx = np.where(np.array(idxs) != -1)[0][0]
col = OHLC[idx]
msg = f'price-repair-split: Potential {fix_type} detected only in column {col}, so treating as false positive (ignore)'
self.manager.LogEvent('info', 'price-repair-split-'+self.istr, msg)
else:
# Only correct if at least 2 columns require correction.
for j in range(len(OHLC)):
c = OHLC[j]
ranges = OHLC_correct_ranges[j]
if ranges is None:
ranges = []
for r in ranges:
if r[2] == 'split':
m = split ; m_rcp = split_rcp
else:
m = split_rcp ; m_rcp = split
if self.interday:
msg = f"Corrected bad split adjustment on col={c} range=[{df2.index[r[0]].date()}:{df2.index[r[1]-1].date()}] m={m:.4f}"
else:
msg = f"Corrected bad split adjustment on col={c} range=[{df2.index[r[0]]}:{df2.index[r[1]-1]}] m={m:.4f}"
self.manager.LogEvent('info', 'price-repair-split-'+self.istr, msg)
df2.iloc[r[0]:r[1], df2.columns.get_loc(c)] *= m
if correct_volume:
if c == 'Open':
f_open_fixed[r[0]:r[1]] = True
elif c == 'Close':
f_close_fixed[r[0]:r[1]] = True
f_corrected[r[0]:r[1]] = True
if correct_volume:
f_open_and_closed_fixed = f_open_fixed & f_close_fixed
f_open_xor_closed_fixed = np.logical_xor(f_open_fixed, f_close_fixed)
if f_open_and_closed_fixed.any():
df2.loc[f_open_and_closed_fixed, "Volume"] *= m_rcp
if f_open_xor_closed_fixed.any():
df2.loc[f_open_xor_closed_fixed, "Volume"] *= 0.5 * m_rcp
df2.loc[f_corrected, 'Repaired?'] = True
else:
idx_first_f = np.where(f)[0][0]
if appears_suspended and (idx_latest_active is not None and idx_latest_active >= idx_first_f):
# Suspended midway during data date range.
# 1: process data before suspension in index-ascending (date-descending) order.
# 2: process data after suspension in index-descending order. Requires signals to be reversed,
# then returned ranges to also be reversed, because this logic was originally written for
# index-ascending (date-descending) order.
ranges_before = map_signals_to_ranges(f[idx_latest_active:], f_up[idx_latest_active:], f_down[idx_latest_active:])
if len(ranges_before) > 0:
# Shift each range back to global indexing
for i in range(len(ranges_before)):
r = ranges_before[i]
ranges_before[i] = (r[0] + idx_latest_active, r[1] + idx_latest_active, r[2])
f_rev_down = np.flip(np.roll(f_up, -1))
f_rev_up = np.flip(np.roll(f_down, -1))
f_rev = f_rev_up | f_rev_down
ranges_after = map_signals_to_ranges(f_rev[idx_rev_latest_active:], f_rev_up[idx_rev_latest_active:], f_rev_down[idx_rev_latest_active:])
if len(ranges_after) > 0:
# Shift each range back to global indexing:
for i in range(len(ranges_after)):
r = ranges_after[i]
ranges_after[i] = (r[0] + idx_rev_latest_active, r[1] + idx_rev_latest_active, r[2])
# Flip range to normal ordering
for i in range(len(ranges_after)):
r = ranges_after[i]
ranges_after[i] = (n-r[1], n-r[0], r[2])
ranges = ranges_before ; ranges.extend(ranges_after)
else:
ranges = map_signals_to_ranges(f, f_up, f_down)
if start_min is not None:
# Prune ranges that are older than start_min
for i in range(len(ranges)-1, -1, -1):
r = ranges[i]
if df2.index[r[0]].date() < start_min:
msg = f'price-repair-split: Pruning range {df2.index[r[0]]}->{df2.index[r[1]-1]} because too old.'
self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
del ranges[i]
for r in ranges:
if r[2] == 'split':
m = split ; m_rcp = split_rcp
else:
m = split_rcp ; m_rcp = split
msg = f"price-repair-split: range={r} m={m}"
self.manager.LogEvent('debug', 'price-repair-split-'+self.istr, msg)
for c in ['Open', 'High', 'Low', 'Close']:
df2.iloc[r[0]:r[1], df2.columns.get_loc(c)] *= m
if correct_volume:
df2.iloc[r[0]:r[1], df2.columns.get_loc("Volume")] *= m_rcp
df2.iloc[r[0]:r[1], df2.columns.get_loc('Repaired?')] = True
if r[0] == r[1] - 1:
if self.interday:
msg = f"Corrected bad split adjustment on interval {df2.index[r[0]].date()} m={m:.4f}"
else:
msg = f"Corrected bad split adjustment on interval {df2.index[r[0]]} m={m:.4f}"
else:
# Note: df2 sorted with index descending
start = df2.index[r[1] - 1]
end = df2.index[r[0]]
if self.interday:
msg = f"Corrected bad split adjustment across intervals {start.date()} -> {end.date()} (inclusive) m={m:.4f}"
else:
msg = f"Corrected bad split adjustment across intervals {start} -> {end} (inclusive) m={m:.4f}"
self.manager.LogEvent('info', 'price-repair-split-'+self.istr, msg)
if correct_volume:
df2['Volume'] = df2['Volume'].round(0).astype('int')
yfcl.TraceExit(log_func + " returning")
return df2.sort_index()
def _repairZeroPrices(self, df, silent=False):
yfcu.TypeCheckDataFrame(df, "df")
# Sometimes Yahoo returns prices=0 when obviously wrong e.g. Volume > 0 and Close > 0.
# Easy to detect and fix
if df.empty:
return df
if df.shape[0] == 1:
# Need multiple rows to confidently identify outliers
return df
log_msg_enter = f"PM::_repairZeroPrices-{self.istr}(date_range={df.index[0]}->{df.index[-1]+self.itd})"
log_msg_exit = f"PM::_repairZeroPrices-{self.istr}() returning"
yfcl.TraceEnter(log_msg_enter)
df2 = df.copy()
price_cols = [c for c in ["Open", "High", "Low", "Close", "Adj Close"] if c in df2.columns]
f_zero_or_nan = (df2[price_cols] == 0.0).to_numpy() | df2[price_cols].isna().to_numpy()
# Check whether worth attempting repair
if f_zero_or_nan.any(axis=1).sum() == 0:
yfcl.TraceExit(log_msg_exit + " (no bad data)")
return df
if f_zero_or_nan.sum() == len(price_cols)*len(df2):
# Need some good data to calibrate
yfcl.TraceExit(log_msg_exit + " (insufficient calibration data)")
return df
# - avoid repair if many zeroes/NaNs
pct_zero_or_nan = f_zero_or_nan.sum() / (len(price_cols)*len(df2))
if f_zero_or_nan.any(axis=1).sum() > 2 and pct_zero_or_nan > 0.05:
yfcl.TraceExit(log_msg_exit + " (too much bad data)")
return df
data_cols = price_cols + ["Volume"]
# Mark values to send for repair
tag = -1.0
for i in range(len(price_cols)):
c = price_cols[i]
df2.loc[f_zero_or_nan[:, i], c] = tag
# If volume=0 or NaN for bad prices, then tag volume for repair
if self.ticker.endswith("=X"):
# FX, volume always 0
pass
else:
df2.loc[f_zero_or_nan.any(axis=1) & (df2["Volume"] == 0), "Volume"] = tag
df2.loc[f_zero_or_nan.any(axis=1) & (df2["Volume"].isna()), "Volume"] = tag
# print(df2[f_zero_or_nan.any(axis=1)])
n_before = (df2[data_cols].to_numpy() == tag).sum()
# print("n_before =", n_before)
try:
df2 = self._reconstruct_intervals_batch(df2, tag=tag)
except Exception:
if len(self._stack_trace) > 0:
self._stack_trace.pop(len(self._stack_trace) - 1)
raise
n_after = (df2[data_cols].to_numpy() == tag).sum()
n_fixed = n_before - n_after
msg = f"Fixed {n_fixed}/{n_before} price=0.0 errors in {self.istr} price data"
if not silent and n_fixed > 0:
print(f"{self.ticker}: " + msg)
else:
self.manager.LogEvent("info", "PriceManager", msg)
# Restore original values where repair failed (i.e. remove tag values)
f = df2[data_cols].to_numpy() == tag
for j in range(len(data_cols)):
fj = f[:, j]
if fj.any():
c = data_cols[j]
df2.loc[fj, c] = df.loc[fj, c]
yfcl.TraceExit(log_msg_exit)
return df2
def _reverseYahooAdjust(self, df):
yfcu.TypeCheckDataFrame(df, "df")
# Yahoo returns data split-adjusted so reverse that.
#
# Except for hourly/minute data, Yahoo isn't consistent with adjustment:
# - prices only split-adjusted if date range contains a split
# - dividend appears split-adjusted
# Easy to fix using daily data:
# - divide prices by daily to determine if split-adjusted
# - copy dividends from daily
# Finally, add 'CSF' & 'CDF' columns to allow cheap on-demand adjustment
if not isinstance(df, pd.DataFrame):
raise Exception("'df' must be pd.DataFrame not {}".format(type(df)))
debug = False
# debug = True
log_msg = f"PM::_reverseYahooAdjust-{self.istr}, {df.index[0]}->{df.index[-1]}"
if yfcl.IsTracingEnabled():
yfcl.TraceEnter(log_msg)
elif debug:
print("")
print(log_msg)
if debug:
print(df[["Open", "Low", "High", "Close", "Volume"]])
cdf = None
csf = None
if self.interval != yfcd.Interval.Days1:
# Trigger update of daily price data, to get all events
histDaily = self.manager.GetHistory(yfcd.Interval.Days1)
df_daily_raw = histDaily.get(start=df.index[0].date(), repair=False)
# Step 1: ensure intraday price data is always split-adjusted
td_7d = timedelta(days=7)
if not self.interday:
# Get daily price data during and after 'df'
df_daily = df_daily_raw.copy()
for c in ["Open", "Close", "Low", "High"]:
df_daily[c] *= df_daily["CSF"]
df_daily["Volume"] /= df_daily["CSF"]
df_daily = df_daily.drop("CSF", axis=1)
if df_daily is None or df_daily.empty:
# df = df.drop("Adj Close", axis=1)
df["CSF"] = 1.0
df["CDF"] = 1.0
return df
f_post = df_daily.index.date > df.index[-1].date()
df_daily_during = df_daily[~f_post].copy()
df_daily_post = df_daily[f_post].copy()
df_daily_during.index = df_daily_during.index.date ; df_daily_during.index.name = "_date"
# Also get raw daily data from cache
df_daily_raw = df_daily_raw[df_daily_raw.index.date >= df.index[0].date()]
#
f_post = df_daily_raw.index.date > df.index[-1].date()
df_daily_raw_during = df_daily_raw[~f_post].copy()
df_daily_raw_during_d = df_daily_raw_during.copy()
df_daily_raw_during_d.index = df_daily_raw_during_d.index.date ; df_daily_raw_during_d.index.name = "_date"
if df_daily_post.empty:
csf_post = 1.0
else:
csf_post = yfcu.GetCSF0(df_daily_post)
expectedRatio = 1.0 / csf_post
ss_ratio = expectedRatio
ss_ratioRcp = 1.0 / ss_ratio
#
price_data_cols = ["Open", "Close", "Adj Close", "Low", "High"]
if ss_ratio > 1.01:
for c in price_data_cols:
df[c] *= ss_ratioRcp
if debug:
print("Applying 1:{} stock-split".format(round(ss_ratio, 2)))
elif ss_ratioRcp > 1.01:
for c in price_data_cols:
df[c] *= ss_ratio
if debug:
print("Applying {.2f}:1 reverse-split-split".format(ss_ratioRcp))
# Note: volume always returned unadjusted
# Yahoo messes up dividend adjustment too so copy correct dividend from daily,
# but only to first time periods of each day:
df["_date"] = df.index.date
df["_indexBackup"] = df.index
# Copy over CSF and CDF too from daily
df = pd.merge(df, df_daily_raw_during_d[["CDF", "CSF"]], how="left", on="_date", validate="many_to_one")
df.index = df["_indexBackup"] ; df.index.name = None ; df = df.drop(["_indexBackup", "_date"], axis=1)
cdf = df["CDF"]
df["Adj Close"] = df["Close"] * cdf
csf = df["CSF"]
if not df_daily_post.empty:
post_csf = yfcu.GetCSF0(df_daily_post)
elif self.interval == yfcd.Interval.Week:
df_daily = histDaily.get(start=df.index[-1].date()+td_7d, repair=False)
if (df_daily is not None) and (not df_daily.empty):
post_csf = yfcu.GetCSF0(df_daily)
if debug:
print("- post_csf of daily date range {}->{} = {}".format(df_daily.index[0], df_daily.index[-1], post_csf))
# elif self.interval in [yfcd.Interval.Months1, yfcd.Interval.Months3]:
# raise Exception("not implemented")
# If 'df' does not contain all stock splits until present, then
# set 'post_csf' to cumulative stock split factor just after last 'df' date
splits_post = self.manager.GetHistory("Events").GetSplits(start=df.index[-1].date()+timedelta(days=1))
if splits_post is not None:
post_csf = 1.0/splits_post["Stock Splits"].prod()
else:
post_csf = None
# Cumulative dividend factor
if cdf is None:
f_nna = ~df["Close"].isna()
if not f_nna.any():
cdf = 1.0
else:
cdf = np.full(df.shape[0], np.nan)
cdf[f_nna] = df.loc[f_nna, "Adj Close"] / df.loc[f_nna, "Close"]
cdf = pd.Series(cdf).bfill().ffill().to_numpy()
# In rare cases, Yahoo is not calculating 'Adj Close' correctly
if self.interday:
divs_since = self.manager.GetHistory("Events").GetDivs(start=df.index[-1].date()+self.itd)
if divs_since is not None and not divs_since.empty:
# Check that 'Adj Close' reflects all future dividends
expected_adj = divs_since['Back Adj.'].prod()
if self.interday and self.interval != yfcd.Interval.Days1:
if df['Dividends'].iloc[-1] != 0.0:
dt = df.index[-1]
# Note: df hasn't been de-splitted yet
hist_before = self.manager.GetHistory(yfcd.Interval.Days1).get(start=dt.date()-timedelta(days=7), end=dt.date(), adjust_splits=True, adjust_divs=False)
close = hist_before['Close'].iloc[-1]
adj_adj = 1 - df['Dividends'].iloc[-1] / close
if adj_adj < 1.0:
msg = f"Adjusting expected_adj={expected_adj:.3f} by last-row div={df['Dividends'].iloc[-1]} adj={adj_adj:.3f} @ {dt.date()}"
self.manager.LogEvent("info", '_reverseYahooAdjust', msg)
expected_adj *= adj_adj
actual_adj = cdf[-1]
if not np.isnan(actual_adj):
diff_pct = abs(actual_adj / expected_adj - 1.0)
if diff_pct > 0.005:
msg = f'expected_adj={expected_adj:.4f} != actual_adj={actual_adj:.4f}, correcting (last dt = {df.index[-1].date()})'
self.manager.LogEvent("info", '_reverseYahooAdjust', msg)
divs_since.index = divs_since.index.date
# Bad. Dividends have occurred after this price data, but 'Adj Close' is missing adjustment(s).
# Fix
cdf *= expected_adj / actual_adj
# Cumulative stock-split factor
if csf is None:
ss = df["Stock Splits"].copy()
ss[(ss == 0.0) | ss.isna()] = 1.0
ss_rcp = 1.0 / ss
csf = ss_rcp.sort_index(ascending=False).cumprod().sort_index(ascending=True).shift(-1, fill_value=1.0)
if post_csf is not None:
csf *= post_csf
csf_rcp = 1.0 / csf
# Reverse Yahoo's split adjustment:
data_cols = ["Open", "High", "Low", "Close", "Dividends"]
for dc in data_cols:
df[dc] = df[dc] * csf_rcp
if not self.interday:
# Don't need to de-split volume data because Yahoo always returns interday volume unadjusted
pass
else:
df["Volume"] *= csf
if df["Volume"].dtype != 'int64':
df["Volume"] = df["Volume"].round(0).astype('int')
# Drop 'Adj Close', replace with scaling factors:
df = df.drop("Adj Close", axis=1)
df["CSF"] = csf
df["CDF"] = cdf
h_lastDivAdjustDt = pd.Timestamp.utcnow().tz_convert(ZoneInfo("UTC"))
h_lastSplitAdjustDt = h_lastDivAdjustDt
df["LastDivAdjustDt"] = h_lastDivAdjustDt
df["LastSplitAdjustDt"] = h_lastSplitAdjustDt
if debug:
print("- unadjusted:")
print(df[["Close", "Dividends", "Volume", "CSF", "CDF"]])
f = df["Dividends"] != 0.0
if f.any():
print("- dividends:")
print(df.loc[f, ["Open", "Low", "High", "Close", "Dividends", "Volume", "CSF", "CDF"]])
print("")
log_msg = f"PM::_reverseYahooAdjust-{self.istr}() returning"
if yfcl.IsTracingEnabled():
yfcl.TraceExit(log_msg)
elif debug:
print(log_msg)
if debug:
print(df[["Open", "Low", "High", "Close", "Dividends", "Volume", "CSF"]])
return df
def _applyNewEvents(self):
if self.h is None or self.h.empty:
return
# debug = False
# debug = True
h_modified = False
log_msg = f"PM::_applyNewEvents()-{self.istr}"
if yfcl.IsTracingEnabled():
yfcl.TraceEnter(log_msg)
# Backport new splits across entire h table
lastSplitAdjustDt_min = self.h["LastSplitAdjustDt"].min()
splits_since = self.manager.GetHistory("Events").GetSplitsFetchedSince(lastSplitAdjustDt_min)
if splits_since is not None and not splits_since.empty:
LastSplitAdjustDt_new = self.h["LastSplitAdjustDt"].copy()
f_sup = splits_since["Superseded split"] != 0.0
if f_sup.any():
for dt in splits_since.index[f_sup]:
split = splits_since.loc[dt]
f1 = self.h.index < dt
diff1 = (self.h["LastSplitAdjustDt"] - split["Superseded split FetchDate"]).abs()
f2 = (diff1 < pd.Timedelta("15s")).to_numpy()
diff2 = (self.h["FetchDate"] - split["Superseded split FetchDate"]).abs()
f3 = (diff2 < pd.Timedelta("15s")).to_numpy()
f = f1 & (f2 | f3)
if not f.any():
if self.interval != yfcd.Interval.Days1:
# Probably ok, assuming superseded split was never applied to this price data
continue
print(split)
raise Exception("For superseded split above, failed to identify rows to undo. Problem?")
else:
# Next check: expect cached CSF != 1.0
log_msg = f"{self.istr}: Reversing split [dt={dt.date()} {split['Superseded split']} fetch={split['Superseded split FetchDate'].strftime('%Y-%m-%d %H:%M:%S%z')}]"
indices = np.where(f)[0]
log_msg += " from intervals "
if self.interday:
log_msg += f"{self.h.index[indices[0]].date()} -> {self.h.index[indices[-1]].date()} (inc)"
else:
log_msg += f"{self.h.index[indices[0]]} -> {self.h.index[indices[-1]]}"
h_lastRow = self.h.loc[self.h.index[indices[-1]]]
log_msg += f". Last CSF = {h_lastRow['CSF']:.5f} @ {h_lastRow['LastSplitAdjustDt'].strftime('%Y-%m-%d %H:%M:%S%z')}"
self.manager.LogEvent("info", "PriceManager", log_msg)
self.h.loc[f, "CSF"] *= split["Stock Splits"]
LastSplitAdjustDt_new[f] = np.maximum(split['FetchDate'], LastSplitAdjustDt_new[f])
for dt in splits_since.index:
split = splits_since.loc[dt]
f1 = self.h.index < dt
f2 = self.h["LastSplitAdjustDt"] < split["FetchDate"]
f = f1 & f2
if f.any():
log_msg = f"{self.istr}: Applying split [dt={dt.date()} {split['Stock Splits']} fetch={split['FetchDate'].strftime('%Y-%m-%d %H:%M:%S%z')}]"
indices = np.where(f)[0]
log_msg += " across intervals "
if self.interday:
log_msg += f"{self.h.index[indices[0]].date()} -> {self.h.index[indices[-1]].date()} (inc)"
else:
log_msg += f"{self.h.index[indices[0]]} -> {self.h.index[indices[-1]]}"
h_lastRow = self.h.loc[self.h.index[indices[-1]]]
log_msg += f". Last CSF = {h_lastRow['CSF']:.5f} @ {h_lastRow['LastSplitAdjustDt'].strftime('%Y-%m-%d %H:%M:%S%z')}"
self.manager.LogEvent("info", "PriceManager", log_msg)
if isinstance(self.h["CSF"].iloc[0], (int, np.int64)):
self.h["CSF"] = self.h["CSF"].astype(float)
self.h.loc[f, "CSF"] /= split["Stock Splits"]
LastSplitAdjustDt_new.loc[f] = np.maximum(LastSplitAdjustDt_new[f], split["FetchDate"])
self.h["LastSplitAdjustDt"] = LastSplitAdjustDt_new
h_modified = True
# Backport new divs across entire h table
lastDivAdjustDt_min = self.h["LastDivAdjustDt"].min()
if isinstance(lastDivAdjustDt_min.tzinfo, pytz.BaseTzInfo):
self.h["LastDivAdjustDt"] = self.h["LastDivAdjustDt"].dt.tz_convert(self.h.index.tz)
h_modified = True
lastDivAdjustDt_min = self.h["LastDivAdjustDt"].min()
divs_since = self.manager.GetHistory("Events").GetDivsFetchedSince(lastDivAdjustDt_min)
if divs_since is not None and not divs_since.empty:
LastDivAdjustDt_new = self.h["LastDivAdjustDt"].copy()
f_sup = divs_since["Superseded back adj."] != 0.0
if f_sup.any():
for dt in divs_since.index[f_sup]:
div = divs_since.loc[dt]
f1 = self.h.index < dt
# Update: new strategy
# Instead of last adjust being the superseded dividend,
# set condition as last adjust being before this new dividend
f2 = ((div["FetchDate"] - self.h["LastDivAdjustDt"]) > pd.Timedelta('1m')).to_numpy()
f3 = ((div["FetchDate"] - self.h["FetchDate"]) > pd.Timedelta('1m')).to_numpy()
f = f1 & (f2 & f3)
if not f.any():
if self.interval != yfcd.Interval.Days1:
# Probably ok, assuming superseded div was never applied to this price data
continue
else:
diff = (self.h["FetchDate"] - div["FetchDate"]).abs()
f_recent = (diff < pd.Timedelta("15s")).to_numpy()
if f_recent[f1].all():
# All price data that could be affected by new dividend, was just
# fetched with that dividend. So can safely ignore.
continue
# print(div)
# raise Exception(f"{self.ticker}: {self.istr}: For superseded div above, failed to identify rows to undo. Problem?")
else:
# Next check: expect cached CDF < 1.0:
f1 = self.h.loc[f, "CDF"] >= 1.0
if f1.any():
# This can happen with recent multiday intervals and that's ok, and will correct manually.
f1_oldest_idx = np.where(f1)[0][-1]
f1_oldest_dt = self.h.index[f1_oldest_idx]
is_f1_oldest_dt_recent = (pd.Timestamp.utcnow() - f1_oldest_dt) < (1.5*self.itd)
if self.interday and self.interval != yfcd.Interval.Days1 and is_f1_oldest_dt_recent:
# Yup, that's what happened
f[f1_oldest_idx:] = False
f1 = self.h.loc[f, "CDF"] >= 1.0
# if f1.any():
# print(self.h[f1|np.roll(f1,-1)][['Close', 'CDF', 'CSF', 'FetchDate']])
# print(div)
# raise Exception(f"{self.ticker}: {self.istr}: For superseded div above, attempting to undo div-adjust from rows where CDF=1. Investigate.")
log_msg = f"{self.istr}: Reversing div [dt={dt.date()} {div['Superseded div']} adj={div['Superseded back adj.']:.5f} fetch={div['Superseded div FetchDate'].strftime('%Y-%m-%d %H:%M:%S%z')}]"
indices = np.where(f)[0]
log_msg += " from intervals "
if self.interday:
log_msg += f"{self.h.index[indices[0]].date()} -> {self.h.index[indices[-1]].date()} (inc)"
else:
log_msg += f"{self.h.index[indices[0]]} -> {self.h.index[indices[-1]]}"
h_lastRow = self.h.loc[self.h.index[indices[-1]]]
log_msg += f". Last CDF = {h_lastRow['CDF']:.5f} @ {h_lastRow['LastDivAdjustDt'].strftime('%Y-%m-%d %H:%M:%S%z')}"
self.manager.LogEvent("info", "PriceManager", log_msg)
self.h.loc[f, "CDF"] /= div["Superseded back adj."]
LastDivAdjustDt_new[f] = np.maximum(div['FetchDate'], LastDivAdjustDt_new[f])
for dt in divs_since.index:
div = divs_since.loc[dt]
f1 = self.h.index < dt
f2 = self.h["LastDivAdjustDt"] < div["FetchDate"]
f = f1 & f2
if f.any():
log_msg = f"{self.istr}: Applying div [dt={dt.date()} {div['Dividends']} adj={div['Back Adj.']:.5f} fetch={div['FetchDate'].strftime('%Y-%m-%d %H:%M:%S%z')}]"
indices = np.where(f)[0]
log_msg += " across intervals "
if self.interday:
log_msg += f"{self.h.index[indices[0]].date()} -> {self.h.index[indices[-1]].date()} (inc)"
else:
log_msg += f"{self.h.index[indices[0]]} -> {self.h.index[indices[-1]]}"
h_lastRow = self.h.loc[self.h.index[indices[-1]]]
log_msg += f". Last CDF = {h_lastRow['CDF']:.5f} @ {h_lastRow['LastDivAdjustDt'].strftime('%Y-%m-%d %H:%M:%S%z')}"
self.manager.LogEvent("info", "PriceManager", log_msg)
self.h.loc[f, "CDF"] *= div["Back Adj."]
LastDivAdjustDt_new[f] = np.maximum(LastDivAdjustDt_new[f], div["FetchDate"])
self.h["LastDivAdjustDt"] = LastDivAdjustDt_new
h_modified = True
if h_modified:
f1 = self.h.loc[f, "CDF"] > 1.0
if f1.any():
self.loc[f,'CDF'] = 1.0
self._updatedCachedPrices(self.h)
log_msg = "PM::_applyNewEvents() returning"
if yfcl.IsTracingEnabled():
yfcl.TraceExit(log_msg)