from pprint import pprint
import sqlite3 as sql
from copy import deepcopy
from functools import lru_cache
from datetime import datetime, date, time, timedelta
from zoneinfo import ZoneInfo
import pandas as pd
import numpy as np
import exchange_calendars as xcal
from yfinance_cache import yfc_logging as yfcl
from . import yfc_dat as yfcd
from . import yfc_cache_manager as yfcm
from . import yfc_utils as yfcu
exchangeTzCache = {}
[docs]
def GetExchangeTzName(exchange):
yfcu.TypeCheckStr(exchange, "exchange")
if exchange not in exchangeTzCache:
tz = yfcm.ReadCacheDatum("exchange-"+exchange, "tz")
if tz is None:
raise Exception("Do not know timezone for exchange '{}'".format(exchange))
exchangeTzCache[exchange] = tz
else:
tz = exchangeTzCache[exchange]
return tz
[docs]
def SetExchangeTzName(exchange, tz):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckStr(tz, "tz")
exchange_lock = yfcd.exchange_locks[exchange]
with exchange_lock:
tzc = yfcm.ReadCacheDatum("exchange-"+exchange, "tz")
if tzc is not None:
if tzc != tz:
# Different names but maybe same tz
tzc_zi = ZoneInfo(tzc)
tz_zi = ZoneInfo(tz)
dt = datetime.now()
if tz_zi.utcoffset(dt) != tzc_zi.utcoffset(dt):
print("tz_zi = {} ({})".format(tz_zi, type(tz_zi)))
print("tzc_zi = {} ({})".format(tzc_zi, type(tzc_zi)))
raise Exception("For exchange '{}', new tz {} != cached tz {}".format(exchange, tz, tzc))
else:
exchangeTzCache[exchange] = tz
yfcm.StoreCacheDatum("exchange-"+exchange, "tz", tz)
calCache = {}
schedCache = {}
schedDbMetadata = {}
db_mem = sql.connect(":memory:")
schedIntervalsCache = {}
# TODO: Ensure all methods support Monthly intervals, e.g. GetTimestampCurrentInterval
[docs]
def GetExchangeDataDelay(exchange):
yfcu.TypeCheckStr(exchange, "exchange")
d = yfcm.ReadCacheDatum("exchange-"+exchange, "yf_lag")
if d is None:
d = yfcd.exchangeToYfLag[exchange]
return d
[docs]
def Simple247xcal(opens, closes):
yfcu.TypeCheckDatetimeIndex(opens, 'opens')
yfcu.TypeCheckDatetimeIndex(closes, 'closes')
# Use any xcal calendar as base
cal = xcal.get_calendar('NYSE')
cal._opens = opens
cal._closes = closes
cal.schedule = pd.DataFrame(data={"open":opens, "close":closes}, index=opens)
cal.schedule.index = cal.schedule.index.tz_localize(None)
cal.schedule['break_start'] = pd.NaT
cal.schedule['break_end'] = pd.NaT
cal.opens_nanos = np.array([x.value for x in opens])
cal.closes_nanos = np.array([x.value for x in closes])
cal._late_opens = []
cal._early_closes = []
cal._break_starts = None
cal._break_ends = None
cal.break_starts_nanos = cal.schedule['break_start'].values.astype(np.int64)
cal.break_ends_nanos = cal.schedule['break_end'].values.astype(np.int64)
return cal
[docs]
def JoinTwoXcals(cal1, cal2):
# TODO: Check no overlap, and also they are close together implying no business days between
if cal1.schedule.index[0] > cal2.schedule.index[0]:
# Flip
tmp = cal1
cal1 = cal2
cal2 = tmp
cal12 = deepcopy(cal1)
def _safeAppend(a1, a2):
if a1 is not None and a2 is not None:
return np.append(a1, a2)
elif a1 is not None:
return a1
else:
return a2
#
cal12._opens = _safeAppend(cal1._opens, cal2._opens)
cal12._break_starts = _safeAppend(cal1._break_starts, cal2._break_starts)
cal12._break_ends = _safeAppend(cal1._break_ends, cal2._break_ends)
cal12._closes = _safeAppend(cal1._closes, cal2._closes)
#
cal12.opens_nanos = _safeAppend(cal1.opens_nanos, cal2.opens_nanos)
cal12.break_starts_nanos = _safeAppend(cal1.break_starts_nanos, cal2.break_starts_nanos)
cal12.break_ends_nanos = _safeAppend(cal1.break_ends_nanos, cal2.break_ends_nanos)
cal12.closes_nanos = _safeAppend(cal1.closes_nanos, cal2.closes_nanos)
#
cal12._late_opens = _safeAppend(cal1._late_opens, cal2._late_opens)
cal12._early_closes = _safeAppend(cal1._early_closes, cal2._early_closes)
#
cal12.schedule = pd.concat([cal1.schedule, cal2.schedule])
return cal12
[docs]
@lru_cache(maxsize=1000)
def GetCalendarViaCache(exchange, start, end=None):
global calCache
if isinstance(start, date):
if start.month == 1 and start.day < 5:
if end is None:
end = start
start = start.year - 1
else:
start = start.year
if end is not None and isinstance(end, date):
if end.month == 12 and end.day > 25:
end = end.year + 1
else:
end = end.year
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckYear(start, "start")
if end is not None:
yfcu.TypeCheckYear(end, "end")
if end is None:
end = date.today().year
cache_key = "exchange-"+exchange
if exchange == 'CCC':
# Binance, 24/7
cal_name = exchange
else:
if exchange not in yfcd.exchangeToXcalExchange:
raise Exception("Need to add mapping of exchange {} to xcal".format(exchange))
cal_name = yfcd.exchangeToXcalExchange[exchange]
cal = None
exchange_lock = yfcd.exchange_locks[exchange]
def _customModSchedule(cal):
tz = ZoneInfo(GetExchangeTzName(exchange))
df = cal.schedule
df["open"] = df["open"].dt.tz_convert(tz)
df["close"] = df["close"].dt.tz_convert(tz)
if (exchange in yfcd.exchangesWithAuction) and ("auction" not in df.columns):
df["auction"] = df["close"] + yfcd.exchangeAuctionDelay[exchange]
df["idx_nanos"] = df.index.values.astype("int64")
if exchange == "ASX":
# Yahoo sometimes returns trading data occurring
# between 4pm and 4:01pm. TradingView agress.
# Have to assume this is real data.
f = df["close"].dt.time == time(16)
if f.any():
# df.loc[f, "close"] += timedelta(minutes=1)
closes = df["close"].to_numpy()
closes[f] += timedelta(minutes=1)
df["close"] = closes
cal.closes_nanos = df["close"].values.astype("int64")
return cal
with exchange_lock:
# Load from cache
if cal_name in calCache:
cal = calCache[cal_name]
elif yfcm.IsDatumCached(cache_key, "cal"):
cal, md = yfcm.ReadCacheDatum(cache_key, "cal", True)
if xcal.__version__ != md["version"]:
cal = None
elif 'np version' not in md or md['np version'] != np.__version__:
cal = None
# Calculate missing data
pre_range = None ; post_range = None
if cal is not None:
cached_range = (cal.schedule.index[0].year, cal.schedule.index[-1].year)
if start < cached_range[0]:
pre_range = (start, cached_range[0]-1)
if end > cached_range[1]:
post_range = (cached_range[1]+1, end)
else:
pre_range = (start, end)
# Fetch missing data
if pre_range is not None:
start = date(pre_range[0], 1, 1)
end = date(pre_range[1], 12, 31)
if exchange == 'CCC':
opens = pd.date_range(start=start, end=end, freq='1d').tz_localize(ZoneInfo(GetExchangeTzName(exchange)))
ends = opens + pd.Timedelta('1d')
pre_cal = Simple247xcal(opens, ends)
else:
pre_cal = xcal.get_calendar(cal_name, start=start, end=end)
pre_cal = _customModSchedule(pre_cal)
if cal is None:
cal = pre_cal
else:
cal = JoinTwoXcals(pre_cal, cal)
if post_range is not None:
start = date(post_range[0], 1, 1)
end = date(post_range[1], 12, 31)
if exchange == 'CCC':
opens = pd.date_range(start=start, end=end, freq='1d').tz_localize(ZoneInfo(GetExchangeTzName(exchange)))
ends = opens + pd.Timedelta('1d')
post_cal = Simple247xcal(opens, ends)
else:
post_cal = xcal.get_calendar(cal_name, start=start, end=end)
post_cal = _customModSchedule(post_cal)
if cal is None:
cal = post_cal
else:
cal = JoinTwoXcals(cal, post_cal)
# Write to cache
calCache[cal_name] = cal
if pre_range is not None or post_range is not None:
yfcm.StoreCacheDatum(cache_key, "cal", cal, metadata={"version": xcal.__version__, 'np version': np.__version__})
return cal
[docs]
def ExchangeOpenOnDay(exchange, d):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckDateStrict(d, "d")
cal = GetCalendarViaCache(exchange, d)
return d.isoformat() in cal.schedule.index
# @lru_cache(maxsize=1000) # changes index of returned dataframe from datetimeindex to index
[docs]
def GetExchangeSchedule(exchange, start_d, end_d):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckDateStrict(start_d, "start_d")
yfcu.TypeCheckDateStrict(end_d, "end_d")
if start_d >= end_d:
raise Exception("start_d={} must < end_d={}".format(start_d, end_d))
debug = False
# debug = True
if debug:
print("GetExchangeSchedule(exchange={}, start_d={}, end_d={}".format(exchange, start_d, end_d))
end_d_sub1 = end_d - timedelta(days=1)
# num_years = end_d.year - start_d.year + 1
num_years = end_d_sub1.year - start_d.year + 1
if num_years <= 2:
# Cache
cache_key = (exchange, start_d.year, num_years)
if cache_key in schedCache:
s = schedCache[cache_key]
else:
cal = GetCalendarViaCache(exchange, start_d, end_d)
s = cal.schedule.loc[str(start_d.year):str(end_d_sub1.year)].copy()
schedCache[cache_key] = s
else:
cal = GetCalendarViaCache(exchange, start_d, end_d)
s = cal.schedule
if s is not None:
start_ts = pd.Timestamp(start_d)
end_ts = pd.Timestamp(end_d_sub1)
slice_start = s["idx_nanos"].values.searchsorted(start_ts.value, side="left")
slice_end = s["idx_nanos"].values.searchsorted(end_ts.value, side="right")
sched = s[slice_start:slice_end].copy()
else:
sched = None
if sched is None or sched.empty:
df = None
else:
cols = ["open", "close"]
if "auction" in sched.columns:
cols.append("auction")
df = sched[cols]
if debug:
print("GetExchangeSchedule() returning")
return df
# Experiment with sqlite3. But pd.read_sql() -> _parse_date_columns() kills performance
global calCache
global schedDbMetadata
global db_mem
if exchange not in schedDbMetadata:
cal = GetCalendarViaCache(exchange, start_d, end_d)
cal.schedule["open"] = cal.schedule["open"].dt.tz_convert("UTC")
cal.schedule["close"] = cal.schedule["close"].dt.tz_convert("UTC")
cal.schedule.to_sql(exchange, db_mem, index_label="indexpd")
schedDbMetadata[exchange] = {}
schedDbMetadata[exchange]["columns"] = cal.schedule.columns.values
#
# query = "PRAGMA table_info('{}')".format(exchange)
# print("query:")
# print(query)
# c = db_mem.execute(query)
# table_schema = c.fetchall()
# print(table_schema)
# print(schedDbMetadata[exchange])
# quit()
#
tz_name = GetExchangeTzName(exchange)
tz_exchange = ZoneInfo(tz_name)
start_dt = datetime.combine(start_d, time(0), tzinfo=tz_exchange)
end_dt = datetime.combine(end_d, time(0), tzinfo=tz_exchange)
# index in sql is tz-naive, so discard tz from date range:
start_dt = pd.Timestamp(start_dt).tz_localize(None)
end_dt = pd.Timestamp(end_dt).tz_localize(None)
cols = [c for c in ["open", "close", "auction"] if c in schedDbMetadata[exchange]["columns"]]
query = "SELECT indexpd, {} FROM {} WHERE indexpd >= '{}' AND indexpd < '{}' ;".format(", ".join(cols), exchange, start_dt, end_dt)
sched = pd.read_sql(query, db_mem, parse_dates=["indexpd", "open", "close"])
if sched is None or sched.empty:
return None
sched.index = pd.DatetimeIndex(sched["indexpd"]) # .tz_localize(tz_name)
sched = sched.drop("indexpd", axis=1)
sched["open"] = sched["open"].dt.tz_convert(tz_exchange)
sched["close"] = sched["close"].dt.tz_convert(tz_exchange)
cols = ["open", "close"]
if "auction" in sched.columns:
cols.append("auction")
df = sched[cols].copy()
rename_cols = {"open": "market_open", "close": "market_close"}
df.columns = [rename_cols[col] if col in rename_cols else col for col in df.columns]
return df
[docs]
def GetExchangeWeekSchedule(exchange, start, end, ignoreHolidays, ignoreWeekends, forceStartMonday=True):
yfcu.TypeCheckStr(exchange, "exchange")
if start >= end:
raise Exception("start={} must be < end={}".format(start, end))
yfcu.TypeCheckDateEasy(start, "start")
yfcu.TypeCheckDateEasy(end, "end")
yfcu.TypeCheckBool(ignoreHolidays, "ignoreHolidays")
yfcu.TypeCheckBool(ignoreWeekends, "ignoreWeekends")
yfcu.TypeCheckBool(forceStartMonday, "forceStartMonday")
debug = False
# debug = True
if debug:
print("GetExchangeWeekSchedule()", locals())
tz = ZoneInfo(GetExchangeTzName(exchange))
td_1d = timedelta(days=1)
if not isinstance(start, datetime):
start_d = start
else:
start_d = start.astimezone(tz).date()
if not isinstance(end, datetime):
end_d = end
else:
end_d = end.astimezone(tz).date() + td_1d
dt_now = pd.Timestamp.utcnow().tz_convert(ZoneInfo("UTC"))
# td7d = timedelta(days=7)
td7d = pd.DateOffset(days=7)
sunday_has_trading = exchange in ["TLV"]
week_starts_sunday = (exchange in ["TLV"]) and (not forceStartMonday)
if debug:
print("- week_starts_sunday =", week_starts_sunday)
if exchange not in yfcd.exchangeToXcalExchange:
raise Exception("Need to add mapping of exchange {} to xcal".format(exchange))
cal = GetCalendarViaCache(exchange, start, end)
sched = GetExchangeSchedule(exchange, start_d, end_d)
if sched is None or sched.empty:
return None
if "auction" in sched.columns:
sched["close"] = sched["auction"] + yfcd.exchangeAuctionDuration[exchange]
sched = sched.drop("auction", axis=1)
if week_starts_sunday:
periods = sched.index.to_period("W-SAT")
else:
periods = sched.index.to_period("W")
if debug:
print("- periods:", type(periods), type(periods[0]))
for p in periods:
print(p.start_time, p.end_time)
period_starts = periods.start_time.tz_localize(tz)
period_intervals = pd.IntervalIndex.from_arrays(period_starts, period_starts+td7d, closed="left")
if debug:
print("- period_intervals:", type(period_intervals), type(period_intervals[0]))
for x in period_intervals:
print(x.left, x.right)
g = sched.groupby(period_intervals)
weeks = g.first().drop("close", axis=1)
weeks["close"] = g.last()["close"]
if debug:
print("- weeks:")
print(weeks)
print("- - index")
for x in weeks.index:
print(x.left, x.right)
drop_first = False
drop_last = False
if not ignoreHolidays:
drop_first = start_d > weeks.index[0].left.date()
if not week_starts_sunday and sunday_has_trading:
# Week is Monday open -> Sunday close
last_working_week_end = weeks.index[-1].right
elif week_starts_sunday:
# Week is Sunday open -> Thursday close
last_working_week_end = weeks.index[-1].right -2*td_1d
else:
# Week is Monday open -> Friday close
last_working_week_end = weeks.index[-1].right -2*td_1d
if debug:
print("- last_working_week_end =", last_working_week_end)
if last_working_week_end <= dt_now:
drop_last = end_d < last_working_week_end.date()
else:
prev_sesh = cal.previous_session(weeks["open"].iloc[0].date()).tz_localize(tz)
if debug:
print("- prev_sesh =", prev_sesh)
drop_first = prev_sesh >= weeks.index[0].left
next_sesh = cal.next_session(weeks["close"].iloc[-1].date()).tz_localize(tz)
if debug:
print("- next_sesh =", next_sesh)
if next_sesh >= dt_now:
# Week ends in future so inevitably cut off but allow it
drop_last = False
else:
drop_last = next_sesh < weeks.index[-1].right
if not ignoreWeekends:
# last_working_week_end = weeks.index[-1].end_time.tz_localize(tz) + timedelta(microseconds=1)
last_working_week_end = weeks.index[-1].right
if debug:
print("- last_working_week_end =", last_working_week_end)
if last_working_week_end <= dt_now:
drop_last = drop_last or end_d < last_working_week_end.date()
if debug:
print(f"- drop_first={drop_first} drop_last={drop_last}")
if drop_first and drop_last:
weeks = weeks.iloc[1:-1]
elif drop_first:
weeks = weeks.iloc[1:]
elif drop_last:
weeks = weeks.iloc[:-1]
if weeks.empty:
weeks = None
if debug:
print("- weeks:")
pprint(weeks)
if debug:
print("GetExchangeWeekSchedule() returning")
return weeks
[docs]
def MapPeriodToDates(exchange, period, interval):
yfcu.TypeCheckPeriod(period, "period")
debug = False
# debug = True
if debug:
print(f"MapPeriodToDates(exchange={exchange}, period={period}, interval={interval})")
tz_name = GetExchangeTzName(exchange)
tz_exchange = ZoneInfo(tz_name)
# Map period to start->end range so logic can intelligently fetch missing data
td_1d = timedelta(days=1)
dt_now = pd.Timestamp.utcnow().replace(tzinfo=ZoneInfo("UTC"))
d_now = dt_now.astimezone(tz_exchange).date()
sched = GetExchangeSchedule(exchange, d_now-(7*td_1d), d_now+td_1d)
yf_lag = yfcd.exchangeToYfLag[exchange]
dt_now_sub_lag = dt_now - yf_lag
if sched["open"].iloc[-1] > dt_now_sub_lag:
# Discard days that haven't opened yet
opens = pd.DatetimeIndex(sched["open"])
x = opens.get_indexer([dt_now_sub_lag], method="bfill")[0] # If not exact match, search forward
sched = sched.iloc[:x]
last_open_day = sched["open"].iloc[-1].date()
if debug:
print(f"- last_open_day={last_open_day} d_now={d_now}")
end_d = last_open_day + td_1d
if period == yfcd.Period.Max:
start_d = date(yfcd.yf_min_year, 1, 1)
elif period == yfcd.Period.Ytd:
start_d = date(d_now.year, 1, 1)
else:
if isinstance(period, yfcd.Period):
period = yfcd.periodToTimedelta[period]
if yfcd.intervalToTimedelta[interval] <= timedelta(days=1):
start_d = end_d - period
while not ExchangeOpenOnDay(exchange, start_d):
start_d += td_1d
end_d = last_open_day+td_1d
elif interval == yfcd.Interval.Week:
if last_open_day.weekday() < 4:
# last week in-progress so ignore from counting, but include in date range
last_full_week_start = d_now - timedelta(days=7+d_now.weekday())
else:
last_full_week_start = d_now - timedelta(days=d_now.weekday())
if last_full_week_start > last_open_day:
last_full_week_start -= timedelta(days=7)
if debug:
print("- last_full_week_start =", last_full_week_start)
start_d = last_full_week_start + timedelta(days=7) - period
else:
raise Exception("codepath not implemented")
if debug:
print(f"MapPeriodToDates() returning {start_d}->{end_d}")
return start_d, end_d
[docs]
def GetExchangeScheduleIntervals(exchange, interval, start, end, discardTimes=None, week7days=True, weekForceStartMonday=True, ignore_breaks=False, exclude_future=True):
yfcu.TypeCheckStr(exchange, "exchange")
if start >= end:
raise Exception("start={} must be < end={}".format(start, end))
yfcu.TypeCheckIntervalDt(start, interval, "start", strict=False)
yfcu.TypeCheckIntervalDt(end, interval, "end", strict=False)
if discardTimes is not None:
yfcu.TypeCheckBool(discardTimes, "discardTimes")
yfcu.TypeCheckBool(week7days, "week7days")
yfcu.TypeCheckBool(ignore_breaks, "ignore_breaks")
itd = yfcd.intervalToTimedelta[interval]
intraday = itd < timedelta(days=1)
if discardTimes is None:
discardTimes = not intraday
if discardTimes and intraday:
raise Exception("discardTimes with intraday is nonsense")
if interval == yfcd.Interval.Week and week7days and not discardTimes:
raise Exception("week7days without discardTimes is nonsense")
debug = False
# debug = True
if debug:
yfc_logger = yfcl.GetLogger("exchange-"+exchange)
yfc_logger.debug("GetExchangeScheduleIntervals()")
yfc_logger.debug("- " + str(locals()))
dt_now = pd.Timestamp.utcnow()
tz = ZoneInfo(GetExchangeTzName(exchange))
td_1d = timedelta(days=1)
if not isinstance(start, datetime):
start_dt = datetime.combine(start, time(0), tz)
start_d = start
else:
start_dt = start
start_d = start.astimezone(tz).date()
if not isinstance(end, datetime):
end_dt = datetime.combine(end, time(0), tz)
end_d = end
else:
end_dt = end
end_d = end.astimezone(tz).date() + td_1d
# First look in cache:
cache_key = (exchange, interval, start_d, end_d, discardTimes, week7days) # todo: frozenset?
if cache_key in schedIntervalsCache:
s = schedIntervalsCache[cache_key]
if s is not None and len(s) > 0:
if isinstance(s.left[0], datetime):
s = s[s.left >= start_dt]
if len(s) > 0 and isinstance(s.right[0], datetime):
s = s[s.right <= end_dt]
if exclude_future and intraday:
s = s[s.left <= dt_now]
if debug:
yfc_logger.debug("- returning cached intervals ({}->{} filtered by {}->{})".format(start_d, end_d, start, min(end, dt_now)))
elif debug:
yfc_logger.debug("- returning cached intervals ({}->{} filtered by {}->{})".format(start_d, end_d, start, end))
return s
if debug:
yfc_logger.debug("- start_d={}, end_d={}".format(start_d, end_d))
week_starts_sunday = (exchange in ["TLV"]) and (not week7days)
if debug:
yfc_logger.debug(f"- week_starts_sunday = {week_starts_sunday}")
cal = GetCalendarViaCache(exchange, start_d, end_d)
# When calculating intervals use dates not datetimes. Cache the result, and then
# apply datetime limits.
intervals = None
istr = yfcd.intervalToString[interval]
if intraday:
if itd > timedelta(minutes=30):
align = "-30m"
else:
align = "-" + istr
ti = cal.trading_index(start_d.isoformat(), (end_d-td_1d).isoformat(), period=istr, intervals=True, force_close=True, ignore_breaks=ignore_breaks, align=align)
if len(ti) == 0:
return None
# Transfer IntervalIndex to DataFrame so can modify
intervals_df = pd.DataFrame(data={"interval_open": ti.left.tz_convert(tz), "interval_close": ti.right.tz_convert(tz)})
if "auction" in cal.schedule.columns:
sched = GetExchangeSchedule(exchange, start_d, end_d)
sched.index = sched.index.date
# Will map auction time to an interval by flooring relative to market open.
# Implemented by flooring then applying offset calculated from floored market open.
intervals_grp = intervals_df.groupby(intervals_df["interval_open"].dt.date)
# 1 - calculate offset
res = 'h' if istr.endswith('h') else istr.replace('m', 'T')
market_opens = intervals_grp.min()["interval_open"]
if len(market_opens.dt.time.unique()) == 1:
open0 = market_opens.iloc[0]
offset = open0 - open0.floor(res)
auctions_df = sched[["auction"]].copy()
else:
market_opens.name = "day_open"
market_opens.index.name = "day"
auctions_df = sched[["auction"]].join(market_opens)
offset = auctions_df["day_open"] - auctions_df["day_open"].dt.floor(res)
# 2 perform relative flooring:
if isinstance(offset, pd.Timedelta) and len(auctions_df["auction"].dt.time.unique()) == 1:
auction0 = auctions_df["auction"].iloc[0]
auction0_floor = (auction0-offset).floor(res) + offset
open_offset = auction0_floor - auction0
auctions_df["auction_open"] = auctions_df["auction"] + open_offset
else:
auctions_df["auction_open"] = (auctions_df["auction"]-offset).dt.floor(res) + offset
auctions_df["auction_close"] = auctions_df["auction"] + yfcd.exchangeAuctionDuration[exchange]
auctions_df = auctions_df.drop(["day_open", "auction"], axis=1, errors="ignore")
# Compare auction intervals against last trading interval
intervals_df_last = intervals_grp.max()
intervals_df_ex_last = intervals_df[~intervals_df["interval_open"].isin(intervals_df_last["interval_open"])]
intervals_df_last.index = intervals_df_last["interval_open"].dt.date
auctions_df = auctions_df.join(intervals_df_last)
# - if auction surrounded by trading, discard auction
f_surround = (auctions_df["auction_open"] >= auctions_df["interval_open"]) & \
(auctions_df["auction_close"] <= auctions_df["interval_close"])
if f_surround.any():
auctions_df.loc[f_surround, ["auction_open", "auction_close"]] = pd.NaT
# - if last trading interval surrounded by auction, then replace by auction
f_surround = (auctions_df["interval_open"] >= auctions_df["auction_open"]) & \
(auctions_df["interval_close"] <= auctions_df["auction_close"])
if f_surround.any():
auctions_df.loc[f_surround, ["interval_open", "interval_close"]] = pd.NaT
# - no duplicates, no overlaps
f_duplicate = (auctions_df["auction_open"] == auctions_df["interval_open"]) & \
(auctions_df["auction_close"] == auctions_df["interval_close"])
if f_duplicate.any():
print("")
print(auctions_df[f_duplicate])
raise Exception("Auction intervals are duplicates of normal trading intervals")
f_overlap = (auctions_df["auction_open"] >= auctions_df["interval_open"]) & \
(auctions_df["auction_open"] < auctions_df["interval_close"])
if f_overlap.any():
# First, if total duration is <= interval length, then combine
d = auctions_df["auction_close"] - auctions_df["interval_open"]
f = d <= itd
if f.any():
# Combine
auctions_df.loc[f, "auction_open"] = auctions_df.loc[f, "interval_open"]
auctions_df.loc[f, ["interval_open", "interval_close"]] = pd.NaT
f_overlap = f_overlap & (~f)
if f_overlap.any():
print("")
print(auctions_df[f_overlap])
raise Exception("Auction intervals are overlapping normal trading intervals")
# - combine
auctions_df = auctions_df.reset_index(drop=True)
intervals_df_last = auctions_df.loc[~auctions_df["interval_open"].isna(), ["interval_open", "interval_close"]]
auctions_df = auctions_df.loc[~auctions_df["auction_open"].isna(), ["auction_open", "auction_close"]]
rename_cols = {"auction_open": "interval_open", "auction_close": "interval_close"}
auctions_df.columns = [rename_cols[col] if col in rename_cols else col for col in auctions_df.columns]
intervals_df = pd.concat([intervals_df_ex_last, intervals_df_last, auctions_df], sort=True).sort_values(by="interval_open").reset_index(drop=True)
intervals = pd.IntervalIndex.from_arrays(intervals_df["interval_open"], intervals_df["interval_close"], closed="left")
elif interval == yfcd.Interval.Days1:
s = GetExchangeSchedule(exchange, start_d, end_d)
if s is None or s.empty:
return None
if exclude_future:
s = s[s["open"] <= dt_now]
if s.empty:
return None
s = s.copy()
if debug:
yfc_logger.debug("- sched:")
yfc_logger.debug(s)
if discardTimes:
open_days = np.array([dt.to_pydatetime().astimezone(tz).date() for dt in s["open"]])
intervals = yfcd.DateIntervalIndex.from_arrays(open_days, open_days+td_1d, closed="left")
else:
if "auction" in s.columns:
s["close"] = s["auction"] + yfcd.exchangeAuctionDuration[exchange]
intervals = pd.IntervalIndex.from_arrays(s["open"], s["close"], closed="left")
elif interval == yfcd.Interval.Week:
# There is reason for this madness of conditionally igoring holidays & weekends, don't mess with it.
if week7days:
week_sched = GetExchangeWeekSchedule(exchange, start, end, ignoreHolidays=False, ignoreWeekends=False, forceStartMonday=True)
if week_sched is not None:
if exclude_future:
week_sched = week_sched[week_sched["open"] <= dt_now]
intervals = yfcd.DateIntervalIndex.from_arrays(week_sched.index.left.date, week_sched.index.right.date, closed="left")
else:
week_sched = GetExchangeWeekSchedule(exchange, start, end, ignoreHolidays=True, ignoreWeekends=True, forceStartMonday=weekForceStartMonday)
if week_sched is not None:
if exclude_future:
week_sched = week_sched[week_sched["open"] <= dt_now]
if discardTimes:
intervals = yfcd.DateIntervalIndex.from_arrays(week_sched["open"].dt.date, week_sched["close"].dt.date+td_1d, closed="left")
else:
intervals = pd.IntervalIndex.from_arrays(week_sched["open"], week_sched["close"], closed="left")
else:
raise Exception("Need to implement for interval={}".format(interval))
if cache_key is not None:
schedIntervalsCache[cache_key] = intervals
# Only after caching can we prune future intervals
if exclude_future and intraday:
intervals = intervals[intervals.left <= dt_now]
if intervals is not None:
if isinstance(intervals.left[0], datetime):
intervals = intervals[(intervals.left >= start_dt) & (intervals.right <= end_dt)]
if len(intervals) == 0:
intervals = None
if debug:
if intervals is not None:
yfc_logger.debug(f"GetExchangeScheduleIntervals({istr}) returning interval starts {intervals.left[0]} -> {intervals.left[-1]}")
else:
yfc_logger.debug(f"GetExchangeScheduleIntervals({istr}) returning None")
return intervals
[docs]
def IsTimestampInActiveSession(exchange, ts):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckDatetime(ts, "ts")
cal = GetCalendarViaCache(exchange, ts)
try:
s = cal.schedule.loc[ts.date().isoformat()]
except Exception:
return False
o = s["open"] ; c = s["close"]
if "auction" in cal.schedule.columns:
a = s["auction"]
if a != pd.NaT:
c = max(c, s["auction"]+yfcd.exchangeAuctionDuration[exchange])
return o <= ts and ts < c
[docs]
def GetTimestampCurrentSession(exchange, ts):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckDatetime(ts, "ts")
cal = GetCalendarViaCache(exchange, ts)
try:
s = cal.schedule.loc[ts.date().isoformat()]
except Exception:
return None
o = s["open"] ; c = s["close"]
if "auction" in cal.schedule.columns:
a = s["auction"]
if a != pd.NaT:
c = max(c, s["auction"]+yfcd.exchangeAuctionDuration[exchange])
if o <= ts and ts < c:
return {"market_open": o, "market_close": c}
else:
return None
[docs]
def GetTimestampMostRecentSession(exchange, ts):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckDatetime(ts, "ts")
# If 'ts' is currently in an active session then that is most recent
s = GetTimestampCurrentSession(exchange, ts)
if s is not None:
return s
sched = GetExchangeSchedule(exchange, ts.date()-timedelta(days=6), ts.date()+timedelta(days=1))
if "auction" in sched.columns:
sched = sched.copy()
f = ~(sched["auction"].isna())
if f.any():
if f.all():
sched["close"] = np.maximum(sched["close"], sched["auction"]+yfcd.exchangeAuctionDuration[exchange])
else:
sched.loc[f, "close"] = np.maximum(sched.loc[f, "close"], sched.loc[f, "auction"]+yfcd.exchangeAuctionDuration[exchange])
for i in range(sched.shape[0]-1, -1, -1):
if sched["open"].iloc[i] <= ts:
tz = ZoneInfo(GetExchangeTzName(exchange))
return {"market_open": sched["open"].iloc[i].to_pydatetime().astimezone(tz), "market_close": sched["close"].iloc[i].to_pydatetime().astimezone(tz)}
raise Exception("Failed to find most recent '{0}' session for ts = {1}".format(exchange, ts))
[docs]
def GetTimestampNextSession(exchange, ts):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckDatetime(ts, "ts")
sched = GetExchangeSchedule(exchange, ts.date(), ts.date()+timedelta(days=7))
if "auction" in sched.columns:
sched = sched.copy()
f = ~(sched["auction"].isna())
if f.any():
if f.all():
sched["close"] = np.maximum(sched["close"], sched["auction"]+yfcd.exchangeAuctionDuration[exchange])
else:
sched.loc[f, "close"] = np.maximum(sched.loc[f, "close"], sched.loc[f, "auction"]+yfcd.exchangeAuctionDuration[exchange])
for i in range(sched.shape[0]):
if ts < sched["open"].iloc[i]:
tz = ZoneInfo(GetExchangeTzName(exchange))
return {"market_open": sched["open"].iloc[i].to_pydatetime().astimezone(tz), "market_close": sched["close"].iloc[i].to_pydatetime().astimezone(tz)}
raise Exception("Failed to find next '{0}' session for ts = {1}".format(exchange, ts))
[docs]
def GetTimestampCurrentInterval(exchange, ts, interval, discardTimes=None, week7days=True, ignore_breaks=False):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckIntervalDt(ts, interval, "ts", strict=False)
yfcu.TypeCheckInterval(interval, "interval")
if discardTimes is not None:
yfcu.TypeCheckBool(discardTimes, "discardTimes")
yfcu.TypeCheckBool(week7days, "week7days")
yfcu.TypeCheckBool(ignore_breaks, "ignore_breaks")
itd = yfcd.intervalToTimedelta[interval]
intraday = itd < timedelta(days=1)
if discardTimes is None:
discardTimes = not intraday
if discardTimes and intraday:
raise Exception("discardTimes with intraday is nonsense")
if interval == yfcd.Interval.Week and week7days and not discardTimes:
raise Exception("week7days without discardTimes is nonsense")
if not intraday and not isinstance(ts, datetime) and not discardTimes:
raise Exception("Requesting daily/multiday interval for date 'ts' without discardTimes is nonsense")
debug = False
# debug = True
if debug:
print("GetTimestampCurrentInterval()", locals())
week_starts_sunday = (exchange in ["TLV"]) and (not week7days)
i = None
td_1d = timedelta(days=1)
tz = ZoneInfo(GetExchangeTzName(exchange))
if interval == yfcd.Interval.Week:
# Treat week intervals as special case, contiguous from first weekday open to last weekday open.
# Not necessarily Monday->Friday because of public holidays.
# Unless 'week7days' is true, which means range from Monday to next Monday.
if isinstance(ts, datetime):
ts_day = ts.astimezone(tz).date()
else:
ts_day = ts
if week_starts_sunday:
if ts_day.weekday() == 6:
# Already at start of week
weekStart = ts_day
else:
weekStart = ts_day - timedelta(days=ts_day.weekday()+1)
else:
weekStart = ts_day - timedelta(days=ts_day.weekday())
if week7days:
weekEnd = weekStart + timedelta(days=7)
else:
weekEnd = weekStart + timedelta(days=5)
if debug:
print("- weekStart = {}".format(weekStart))
print("- weekEnd = {}".format(weekEnd))
if not week7days:
weekSched = GetExchangeSchedule(exchange, weekStart, weekEnd)
if "auction" in weekSched.columns:
weekSched["close"] = weekSched["auction"] + yfcd.exchangeAuctionDuration[exchange]
if debug:
print("- weekSched:")
print(weekSched)
weekStart = weekSched["open"].iloc[0]
weekEnd = weekSched["close"].iloc[-1]
if discardTimes:
weekStart = weekStart.date()
weekEnd = weekEnd.date() + td_1d
intervalStart = weekStart
intervalEnd = weekEnd
if debug:
print("- intervalStart = {}".format(intervalStart))
print("- intervalEnd = {}".format(intervalEnd))
if isinstance(intervalStart, datetime):
ts_in_interval = ts >= intervalStart and ts < intervalEnd
else:
ts_in_interval = ts_day >= intervalStart and ts_day < intervalEnd
if ts_in_interval:
i = {"interval_open": intervalStart, "interval_close": intervalEnd}
elif interval == yfcd.Interval.Days1:
if isinstance(ts, datetime):
ts_day = ts.astimezone(tz).date()
else:
ts_day = ts
if debug:
print("- ts_day: {}".format(ts_day))
if ExchangeOpenOnDay(exchange, ts_day):
if debug:
print("- exchange open")
if discardTimes:
i = {"interval_open": ts_day, "interval_close": ts_day+td_1d}
else:
sched = GetExchangeSchedule(exchange, ts_day, ts_day+td_1d)
if sched is not None and not sched.empty:
if "auction" in sched.columns:
sched["close"] = sched["auction"] + yfcd.exchangeAuctionDuration[exchange]
i = {"interval_open": sched["open"].iloc[0], "interval_close": sched["close"].iloc[0]}
if ts < i["interval_open"] or ts >= i["interval_close"]:
i = None
else:
if debug:
print("- exchange closed")
else:
if IsTimestampInActiveSession(exchange, ts) or IsTimestampInActiveSession(exchange, ts+timedelta(minutes=30)):
ts = ts.astimezone(tz)
if exchange in yfcd.exchangesWithAuction:
itd = max(itd, timedelta(minutes=15))
intervals = GetExchangeScheduleIntervals(exchange, interval, ts-itd, ts+timedelta(minutes=30)+itd, ignore_breaks=ignore_breaks)
idx = intervals.get_indexer([ts])
f = idx != -1
if f.any():
i0 = intervals[idx[f]][0]
i = {"interval_open": i0.left.to_pydatetime(), "interval_close": i0.right.to_pydatetime()}
if debug:
print("GetTimestampCurrentInterval() returning: {}".format(i))
return i
[docs]
def GetTimestampCurrentInterval_batch(exchange, ts, interval, discardTimes=None, week7days=True, ignore_breaks=False):
yfcu.TypeCheckStr(exchange, "exchange")
if isinstance(ts, list):
ts = np.array(ts)
yfcu.TypeCheckNpArray(ts, "ts")
if len(ts) > 0:
yfcu.TypeCheckIntervalDt(ts[0], interval, "ts", strict=False)
yfcu.TypeCheckInterval(interval, "interval")
if discardTimes is not None:
yfcu.TypeCheckBool(discardTimes, "discardTimes")
yfcu.TypeCheckBool(week7days, "week7days")
yfcu.TypeCheckBool(ignore_breaks, "ignore_breaks")
debug = False
# debug = True
if len(ts) == 0:
if debug:
print("- ts[] is empty")
print("GetTimestampCurrentInterval_batch() returning")
return None
itd = yfcd.intervalToTimedelta[interval]
intraday = itd < timedelta(days=1)
if discardTimes is None:
discardTimes = not intraday
if discardTimes and intraday:
raise Exception("discardTimes with intraday is nonsense")
if interval == yfcd.Interval.Week and week7days and not discardTimes:
raise Exception("week7days without discardTimes is nonsense")
if not intraday and not isinstance(ts[0], datetime) and not discardTimes:
raise Exception("Requesting daily/multiday interval for date 'ts' without discardTimes is nonsense")
# For day and week intervals, the time component is ignored (set to 0).
if debug:
if len(ts) == 0:
msg = "GetTimestampCurrentInterval_batch(ts=[]"
else:
msg = f"GetTimestampCurrentInterval_batch(ts={ts[0]}->{ts[-1]}"
msg += f", interval={interval}, discardTimes={discardTimes} week7days={week7days})"
print(msg)
n = len(ts)
tz = ZoneInfo(GetExchangeTzName(exchange))
intervals = [None]*n
td_1d = timedelta(days=1)
tz = ZoneInfo(GetExchangeTzName(exchange))
ts_is_datetimes = isinstance(ts[0], datetime)
if ts_is_datetimes:
ts = pd.to_datetime(ts)
if ts[0].tzinfo != tz:
ts = ts.tz_convert(tz)
ts_day = ts.date
else:
ts_day = np.array(ts)
ts = pd.to_datetime(ts).tz_localize(tz)
if interval == yfcd.Interval.Week:
# Treat week intervals as special case, contiguous from first weekday open to last weekday open.
# Not necessarily Monday->Friday because of public holidays.
# Unless 'ignoreClosedDays' is true, which means range from Monday to next Monday.
#
t0 = ts_day[0] ; t0 -= timedelta(days=t0.weekday())+timedelta(days=7)
tl = ts_day[-1] ; tl += timedelta(days=6-tl.weekday())+timedelta(days=7)
if debug:
print("t0={} ; tl={}".format(t0, tl))
if week7days:
# Monday -> next Monday regardless of exchange schedule
wd = pd.to_timedelta(ts.weekday, unit='D')
weekSchedStart = ts - wd
weekSchedEnd = weekSchedStart + timedelta(days=7)
weekSchedStart = weekSchedStart.date
weekSchedEnd = weekSchedEnd.date
else:
week_sched = GetExchangeScheduleIntervals(exchange, interval, t0, tl, discardTimes, week7days=False, weekForceStartMonday=False)
if debug:
print("- week_sched:")
print(week_sched)
weekSchedStart = np.full(n, None)
weekSchedEnd = np.full(n, None)
if discardTimes:
idx = week_sched.get_indexer(ts_day)
else:
idx = week_sched.get_indexer(ts)
f = idx != -1
if f.any():
weekSchedStart[f] = week_sched.left[idx[f]]
weekSchedEnd[f] = week_sched.right[idx[f]]
intervals = pd.DataFrame(data={"interval_open": weekSchedStart, "interval_close": weekSchedEnd}, index=ts)
elif interval == yfcd.Interval.Days1:
t0 = ts_day[0]
tl = ts_day[len(ts_day)-1]
sched = GetExchangeSchedule(exchange, t0, tl+timedelta(days=1))
if "auction" in sched.columns:
sched["close"] = sched["auction"] + yfcd.exchangeAuctionDuration[exchange]
ts_df = pd.DataFrame(data={"day": ts_day}, index=ts)
sched["day"] = sched.index.date
ts_df = pd.merge(ts_df, sched, how="left", on="day")
ts_df.index = ts
if debug:
print("- ts_df:")
print(ts_df)
intervals = ts_df
if discardTimes:
intervals["interval_open"] = intervals["open"].dt.date
intervals["interval_close"] = intervals["interval_open"] + td_1d
intervals = intervals.drop(["day", "open", "close"], axis=1)
else:
intervals["interval_open"] = intervals["open"]
intervals["interval_close"] = intervals["close"]
f_out = (intervals.index < intervals["interval_open"]) | (intervals.index >= intervals["interval_close"])
if f_out.any():
intervals.loc[f_out, "interval_open"] = pd.NaT
intervals.loc[f_out, "interval_close"] = pd.NaT
intervals = intervals.drop(["day", "open", "close"], axis=1)
if debug:
print("- intervals:")
print(intervals)
else:
if exchange in yfcd.exchangesWithAuction:
itd = max(itd, timedelta(minutes=15))
t0 = ts[0]
tl = ts[len(ts)-1]
tis = GetExchangeScheduleIntervals(exchange, interval, t0-itd, tl+itd, ignore_breaks=ignore_breaks)
tz_tis = tis[0].left.tzinfo
if ts[0].tzinfo != tz_tis:
ts = [t.astimezone(tz_tis) for t in ts]
idx = tis.get_indexer(ts)
f = idx != -1
intervals = pd.DataFrame(index=ts)
intervals["interval_open"] = pd.NaT
intervals["interval_close"] = pd.NaT
intervals["interval_open"] = pd.to_datetime(intervals["interval_open"], utc=True)
intervals["interval_close"] = pd.to_datetime(intervals["interval_close"], utc=True)
if f.any():
intervals.loc[intervals.index[f], "interval_open"] = tis.left[idx[f]].tz_convert(tz)
intervals.loc[intervals.index[f], "interval_close"] = tis.right[idx[f]].tz_convert(tz)
intervals["interval_open"] = pd.to_datetime(intervals["interval_open"])
intervals["interval_close"] = pd.to_datetime(intervals["interval_close"])
intervals["interval_open"] = intervals["interval_open"].dt.tz_convert(tz)
intervals["interval_close"] = intervals["interval_close"].dt.tz_convert(tz)
if debug:
print("intervals:") ; print(intervals)
print("GetTimestampCurrentInterval_batch() returning")
return intervals
[docs]
def GetTimestampNextInterval(exchange, ts, interval, discardTimes=None, week7days=True, ignore_breaks=False):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckIntervalDt(ts, interval, "ts", strict=False)
yfcu.TypeCheckInterval(interval, "interval")
if discardTimes is not None:
yfcu.TypeCheckBool(discardTimes, "discardTimes")
yfcu.TypeCheckBool(week7days, "week7days")
yfcu.TypeCheckBool(ignore_breaks, "ignore_breaks")
itd = yfcd.intervalToTimedelta[interval]
intraday = itd < timedelta(days=1)
if discardTimes is None:
discardTimes = not intraday
if discardTimes and intraday:
raise Exception("discardTimes with intraday is nonsense")
if interval == yfcd.Interval.Week and week7days and not discardTimes:
raise Exception("week7days without discardTimes is nonsense")
debug = False
# debug = True
if debug:
print("GetTimestampNextInterval()", locals())
td_1d = timedelta(days=1)
tz = ZoneInfo(GetExchangeTzName(exchange))
ts_is_datetime = isinstance(ts, datetime)
if debug:
print("- ts_is_datetime =", ts_is_datetime)
if not ts_is_datetime:
ts_day = ts
ts = datetime.combine(ts_day, time(0), tz)
else:
ts_day = ts.astimezone(tz).date()
if interval == yfcd.Interval.Days1:
if discardTimes or not isinstance(ts, datetime):
next_day = ts_day + td_1d
s = GetTimestampNextSession(exchange, datetime.combine(next_day, time(0), tz))
if debug:
print("- s:")
print(s)
interval_open = s["market_open"].date()
interval_close = interval_open+td_1d
else:
if ts_is_datetime:
s = GetTimestampNextSession(exchange, ts)
else:
s = GetTimestampNextSession(exchange, ts+td_1d)
if debug:
print("- s:")
print(s)
interval_open = s["market_open"]
interval_close = s["market_close"]
if debug:
print("GetTimestampNextInterval() returning")
return {"interval_open": interval_open, "interval_close": interval_close}
elif interval == yfcd.Interval.Week:
week_sched = GetExchangeScheduleIntervals(exchange, interval, ts_day, ts_day+14*td_1d,
discardTimes=discardTimes, week7days=week7days, weekForceStartMonday=week7days, ignore_breaks=ignore_breaks)
if debug:
print("- week_sched:")
print(week_sched)
if not ts_is_datetime:
if discardTimes:
skip_first = ts_day >= week_sched.left[0]
else:
skip_first = ts_day >= week_sched.left[0].date()
else:
if discardTimes:
skip_first = ts_day >= week_sched.left[0]
else:
skip_first = ts >= week_sched.left[0]
i = 1 if skip_first else 0
interval_open = week_sched.left[i]
interval_close = week_sched.right[i]
if debug:
print("GetTimestampNextInterval() returning")
return {"interval_open": interval_open, "interval_close": interval_close}
interval_td = yfcd.intervalToTimedelta[interval]
c = GetTimestampCurrentInterval(exchange, ts, interval, discardTimes, ignore_breaks=ignore_breaks)
if debug:
if c is None:
print("- currentInterval = None")
else:
print("- currentInterval = {} -> {}".format(c["interval_open"], c["interval_close"]))
next_interval_close = None
itd = yfcd.intervalToTimedelta[interval]
if c is None or not IsTimestampInActiveSession(exchange, c["interval_close"]):
next_sesh = GetTimestampNextSession(exchange, ts)
if debug:
print("- next_sesh = {}".format(next_sesh))
if exchange == "TLV":
istr = yfcd.intervalToString[interval]
if itd > timedelta(minutes=30):
align = "-30m"
else:
align = "-"+istr
d = next_sesh["market_open"].date()
cal = GetCalendarViaCache(exchange, ts)
ti = cal.trading_index(d.isoformat(), (d+td_1d).isoformat(), period=istr, intervals=True, force_close=True, align=align)
next_interval_start = ti.left[0]
else:
next_interval_start = next_sesh["market_open"]
next_interval_close = min(next_interval_start + interval_td, next_sesh["market_close"])
else:
day_sched = GetExchangeSchedule(exchange, ts_day, ts_day+td_1d).iloc[0]
if exchange in yfcd.exchangesWithAuction:
if c["interval_close"] < day_sched["close"]:
# Next is normal trading
next_interval_start = c["interval_close"]
else:
# Next is auction
if itd <= timedelta(minutes=10):
next_interval_start = day_sched["auction"]
else:
next_interval_start = day_sched["close"]
next_interval_close = min(next_interval_start + interval_td,
day_sched["auction"] + yfcd.exchangeAuctionDuration[exchange])
else:
next_interval_start = c["interval_close"]
next_interval_close = min(next_interval_start + interval_td, day_sched["close"])
if debug:
print("GetTimestampNextInterval() returning")
return {"interval_open": next_interval_start, "interval_close": next_interval_close}
[docs]
def GetTimestampNextInterval_batch(exchange, ts, interval, discardTimes=None, week7days=True, ignore_breaks=False):
yfcu.TypeCheckStr(exchange, "exchange")
if isinstance(ts, list):
ts = np.array(ts)
yfcu.TypeCheckNpArray(ts, "ts")
if len(ts) > 0:
yfcu.TypeCheckIntervalDt(ts[0], interval, "ts", strict=False)
yfcu.TypeCheckInterval(interval, "interval")
if discardTimes is not None:
yfcu.TypeCheckBool(discardTimes, "discardTimes")
yfcu.TypeCheckBool(week7days, "week7days")
yfcu.TypeCheckBool(ignore_breaks, "ignore_breaks")
itd = yfcd.intervalToTimedelta[interval]
intraday = itd < timedelta(days=1)
if discardTimes is None:
discardTimes = not intraday
if discardTimes and intraday:
raise Exception("discardTimes with intraday is nonsense")
if interval == yfcd.Interval.Week and week7days and not discardTimes:
raise Exception("week7days without discardTimes is nonsense")
# For day and week intervals, the time component is ignored (set to 0).
debug = False
# debug = True
if debug:
if len(ts) == 0:
msg = "GetTimestampNextInterval_batch(ts=[]"
else:
msg = f"GetTimestampNextInterval_batch(ts={ts[0]}->{ts[-1]}"
msg += f", interval={interval}, discardTimes={discardTimes} week7days={week7days})"
print(msg)
if len(ts) == 0:
if debug:
print("- ts[] is empty")
print("GetTimestampNextInterval_batch() returning")
return None
n = len(ts)
tz = ZoneInfo(GetExchangeTzName(exchange))
intervals = [None]*n
if debug:
print("- ts:") ; print(ts)
td_1d = timedelta(days=1)
tz = ZoneInfo(GetExchangeTzName(exchange))
ts_is_datetimes = isinstance(ts[0], datetime)
if debug:
print("- ts_is_datetimes =", ts_is_datetimes)
if ts_is_datetimes:
ts = pd.to_datetime(ts)
if ts[0].tzinfo != tz:
ts = ts.tz_convert(tz)
ts_day = ts.date
else:
ts_day = np.array(ts)
ts = pd.to_datetime(ts).tz_localize(tz)
if debug:
print("- ts_day:") ; print(ts_day)
if interval == yfcd.Interval.Week:
# Treat week intervals as special case, contiguous from first weekday open to last weekday open.
# Not necessarily Monday->Friday because of public holidays.
# Unless 'ignoreClosedDays' is true, which means range from Monday to next Monday.
#
t0 = ts_day[0] ; t0 -= timedelta(days=t0.weekday())+timedelta(days=7)
tl = ts_day[-1] ; tl += timedelta(days=6-tl.weekday())+timedelta(days=14)
if debug:
print("t0={} ; tl={}".format(t0, tl))
if week7days:
# Monday -> next Monday regardless of exchange schedule
wd = pd.to_timedelta(ts.weekday, unit='D')
weekSchedStart = (ts - wd + timedelta(days=7)).date
weekSchedEnd = weekSchedStart + timedelta(days=7)
else:
week_sched = GetExchangeScheduleIntervals(exchange, interval, t0, tl, discardTimes=discardTimes, week7days=week7days, weekForceStartMonday=week7days, exclude_future=False)
if debug:
print("- week_sched:", type(week_sched))
for x in week_sched:
print(x)
if ts_is_datetimes and not discardTimes:
idx = week_sched.get_indexer(ts)
else:
if isinstance(week_sched, yfcd.DateIntervalIndex):
week_sched_daily = week_sched
else:
week_sched_daily = yfcd.DateIntervalIndex.from_arrays(week_sched.left.date, week_sched.right.date+td_1d)
if debug:
print("- week_sched_daily:")
for x in week_sched_daily:
print(x)
idx = week_sched_daily.get_indexer(ts_day)
if debug:
print("- idx:")
print(idx)
f = idx != -1
if f.any():
# Point to next interval
idx[f] += 1
if (~f).any():
if ts_is_datetimes and not discardTimes:
idx_next = week_sched.left.get_indexer(ts, method="bfill")
else:
idx_next = np.searchsorted(week_sched_daily.left, ts_day)
idx[~f] = idx_next[~f]
if debug:
print("- idx:")
print(idx)
weekSchedStart = week_sched.left[idx]
weekSchedEnd = week_sched.right[idx]
intervals = pd.DataFrame(data={"interval_open": weekSchedStart, "interval_close": weekSchedEnd}, index=ts)
elif interval == yfcd.Interval.Days1:
f_nna = ~pd.isna(ts_day)
t0 = ts_day[f_nna][0]
tl = ts_day[f_nna][-1]
sched = GetExchangeSchedule(exchange, t0, tl+timedelta(days=14))
if "auction" in sched.columns:
sched["close"] = sched["auction"] + yfcd.exchangeAuctionDuration[exchange]
if debug:
print("- sched:")
print(sched)
intervals = pd.DataFrame(index=ts)
if ts_is_datetimes and not discardTimes:
open_as_index = pd.to_datetime(sched["open"].to_numpy())
idx = open_as_index.get_indexer(ts)
f = idx != -1
if f.any():
# Point to next interval
idx[f] += 1
if (~f).any():
idx_next = open_as_index.get_indexer(ts, method="bfill")
idx[~f] = idx_next[~f]
intervals["interval_open"] = sched["open"].iloc[idx].to_numpy()
intervals["interval_close"] = sched["close"].iloc[idx].to_numpy()
else:
idx = sched.index.get_indexer(ts_day)
f = idx != -1
if f.any():
# Point to next interval
idx[f] += 1
if (~f).any():
idx_next = sched.index.get_indexer(ts_day, method="bfill")
idx[~f] = idx_next[~f]
if discardTimes:
intervals["interval_open"] = sched.index[idx].date
intervals["interval_close"] = intervals["interval_open"] + timedelta(days=1)
else:
intervals["interval_open"] = sched["open"].iloc[idx].to_numpy()
intervals["interval_close"] = sched["close"].iloc[idx].to_numpy()
else:
itd = yfcd.intervalToTimedelta[interval]
if exchange in yfcd.exchangesWithAuction:
itd = max(itd, timedelta(minutes=15))
t0 = ts[0]
tl = ts[len(ts)-1]
# tis = GetExchangeScheduleIntervals(exchange, interval, t0-itd, tl+timedelta(days=14), discardTimes=False, week7days, ignore_breaks=ignore_breaks)
tis = GetExchangeScheduleIntervals(exchange, interval, t0-itd, tl+timedelta(days=14), ignore_breaks=ignore_breaks, exclude_future=False)
tz_tis = tis[0].left.tzinfo
if ts[0].tzinfo != tz_tis:
ts = [t.astimezone(tz_tis) for t in ts]
idx = tis.get_indexer(ts)
f = idx != -1
if f.any():
# Point to next interval
idx[f] += 1
if (~f).any():
idx_next = tis.left.get_indexer(ts, method="bfill")
idx[~f] = idx_next[~f]
# Now everything mapped
intervals = pd.DataFrame(index=ts)
intervals["interval_open"] = tis.left[idx].tz_convert(tz)
intervals["interval_close"] = tis.right[idx].tz_convert(tz)
if debug:
print("intervals (next):") ; print(intervals)
print("GetTimestampNextInterval_batch() returning")
return intervals
[docs]
def TimestampInBreak_batch(exchange, ts, interval):
yfcu.TypeCheckStr(exchange, "exchange")
if isinstance(ts, list):
ts = np.array(ts)
yfcu.TypeCheckNpArray(ts, "ts")
if len(ts) > 0:
yfcu.TypeCheckIntervalDt(ts[0], interval, "ts", strict=False)
yfcu.TypeCheckInterval(interval, "interval")
n = len(ts)
interday = interval in [yfcd.Interval.Days1, yfcd.Interval.Week]#, yfcd.Interval.Months1, yfcd.Interval.Months3]
if interday:
return np.full(n, False)
itd = yfcd.intervalToTimedelta[interval]
tss = pd.to_datetime(ts)
df = pd.DataFrame(data={"_date": tss.date}, index=tss)
cal = GetCalendarViaCache(exchange, df["_date"].iloc[0], df["_date"].iloc[-1])
s = cal.schedule.copy()
s["_date"] = s.index.date
df["_indexBackup"] = df.index
df = df.merge(s[["break_start", "break_end", "_date"]], how="left", on="_date")
df.index = df["_indexBackup"]
return (df.index >= df["break_start"].to_numpy()) & (df.index+itd <= df["break_end"].to_numpy())
[docs]
def CalcIntervalLastDataDt(exchange, intervalStart, interval, ignore_breaks=False, yf_lag=None):
# When does Yahoo stop receiving data for this interval?
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckIntervalDt(intervalStart, interval, "intervalStart", strict=False)
yfcu.TypeCheckInterval(interval, "interval")
debug = False
# debug = True
if debug:
print(f"CalcIntervalLastDataDt(exchange={exchange}, intervalStart={intervalStart}, interval={interval}, yf_lag={yf_lag})")
i_td = yfcd.intervalToTimedelta[interval]
intraday = i_td < timedelta(days=1)
irange = GetTimestampCurrentInterval(exchange, intervalStart, interval, ignore_breaks=ignore_breaks)
if irange is None:
return None
if debug:
print("- irange:")
pprint(irange)
tz = ZoneInfo(GetExchangeTzName(exchange))
if isinstance(irange["interval_open"], datetime):
intervalSched = GetExchangeSchedule(exchange, irange["interval_open"].astimezone(tz).date(), irange["interval_close"].astimezone(tz).date()+timedelta(days=1))
else:
intervalSched = GetExchangeSchedule(exchange, irange["interval_open"], irange["interval_close"])
if debug:
print("- intervalSched:")
pprint(intervalSched)
if yf_lag is not None:
yfcu.TypeCheckTimedelta(yf_lag, "yf_lag")
else:
yf_lag = GetExchangeDataDelay(exchange)
if intervalSched is None or intervalSched.empty:
# Exchange closed so data cannot update/expire
lastDataDt = irange["interval_close"]
if not isinstance(lastDataDt, pd.Timestamp):
lastDataDt = pd.Timestamp(lastDataDt).tz_localize(tz)
if debug:
print("CalcIntervalLastDataDt() returning {} because exchange closed".format(lastDataDt))
return lastDataDt + yf_lag
# For daily intervals, Yahoo data is updating until midnight. I guess aftermarket.
if not intraday:
# lastDataDt = start of next trading session, even if next day (or later)
next_sesh = GetTimestampNextSession(exchange, intervalSched["close"].iloc[-1]-timedelta(minutes=1))
if debug:
print("- next_sesh:")
print(next_sesh)
if i_td > timedelta(days=1):
# start of second trading day in next interval
next_sesh = GetTimestampNextSession(exchange, next_sesh['market_close']-timedelta(minutes=1))
if debug:
print("- next_sesh:")
print(next_sesh)
lastDataDt = next_sesh["market_open"] + yf_lag
# Attempt to handle Yahoo changing weekly interval data after the last market update.
# Yes this also happens with daily but less often.
# Also, I never see this happen on USA exchanges, but everywhere else.
# if not intraday and interval != yfcd.Interval.Days1 and '.' in exchange:
# Update: I've seen this happen to 1d interval on USA
lastDataDt += timedelta(hours=4)
if debug:
print("CalcIntervalLastDataDt() returning {}".format(lastDataDt))
return lastDataDt
# lastDataDt = start of next interval, even if next day (or later)
nextInterval = GetTimestampNextInterval(exchange, irange["interval_close"]-timedelta(minutes=1), interval, ignore_breaks=ignore_breaks)
if debug:
print("- nextInterval:")
print(nextInterval)
lastDataDt = nextInterval["interval_open"] + yf_lag
if debug:
print("CalcIntervalLastDataDt() returning {}".format(lastDataDt))
return lastDataDt
[docs]
def CalcIntervalLastDataDt_batch(exchange, intervalStart, interval, ignore_breaks=False, yf_lag=None):
# When does Yahoo stop receiving data for this interval?
yfcu.TypeCheckStr(exchange, "exchange")
if isinstance(intervalStart, list):
intervalStart = np.array(intervalStart)
yfcu.TypeCheckNpArray(intervalStart, "intervalStart")
yfcu.TypeCheckIntervalDt(intervalStart[0], interval, "intervalStart", strict=False)
yfcu.TypeCheckInterval(interval, "interval")
yfcu.TypeCheckBool(ignore_breaks, "ignore_breaks")
debug = False
# debug = True
if debug:
print("CalcIntervalLastDataDt_batch(len(intervalStart)={}, interval={}, yf_lag={})".format(len(intervalStart), interval, yf_lag))
print("- intervalStart:")
print(intervalStart)
if yf_lag is not None:
yfcu.TypeCheckTimedelta(yf_lag, "yf_lag")
else:
yf_lag = GetExchangeDataDelay(exchange)
tz = ZoneInfo(GetExchangeTzName(exchange))
i_td = yfcd.intervalToTimedelta[interval]
intraday = i_td < timedelta(days=1)
interday = not intraday
if interday and isinstance(intervalStart[0], (datetime, pd.Timestamp)):
intervalStart = [dt.date() for dt in intervalStart]
intervals = GetTimestampCurrentInterval_batch(exchange, intervalStart, interval, ignore_breaks=ignore_breaks)
iopens = intervals["interval_open"]
icloses = intervals["interval_close"]
if isinstance(intervals["interval_open"].iloc[0], datetime):
intervals["interval_open_day"] = intervals["interval_open"].dt.date
else:
intervals["interval_open_day"] = intervals["interval_open"]
if debug:
print("- intervals:")
print(intervals)
f_na = intervals["interval_open"].isna().values
f_nna = ~f_na
iopen0 = iopens[f_nna].iloc[0]
iclosel = icloses[f_nna].iloc[-1]
if debug:
print(f"- intervals date range: {iopen0} -> {iclosel}")
is_dt = isinstance(iopen0, datetime)
if is_dt:
if isinstance(iopen0, pd.Timestamp):
iopen0 = iopen0.to_pydatetime()
iclosel = iclosel.to_pydatetime()
sched = GetExchangeSchedule(exchange, iopen0.astimezone(tz).date(), iclosel.astimezone(tz).date()+timedelta(days=1))
else:
sched = GetExchangeSchedule(exchange, iopen0, iclosel+timedelta(days=1))
sched = sched.copy()
sched["day"] = sched["open"].dt.date
if debug:
print("- sched:")
print(sched)
if interval == yfcd.Interval.Week:
week_starts = sched["day"].to_period("W").keys().start_time
if debug:
print("- week_starts:")
print(week_starts)
sched["interval_open_day"] = week_starts
sched_grp = sched[["interval_open_day", "open", "close"]].groupby(week_starts)
sched = sched_grp.agg(Open=("open", "first"), Close=("close", "last"), interval_open_day=("interval_open_day", "first")).rename(columns={"Open": "open", "Close": "close"})
sched["interval_open_day"] = sched["interval_open_day"].dt.date
else:
sched["interval_open_day"] = sched["day"]
sched = sched[["interval_open_day", "open", "close"]]
if debug:
print("- sched:")
print(sched)
intervals = intervals.merge(sched, on="interval_open_day", how="left")
intervals = intervals.rename(columns={"close": "market_close", "open": "market_open"})
if debug:
print("- intervals:")
print(intervals)
mcloses = intervals["market_close"]
if debug:
print("- mcloses:", type(mcloses[0]))
print(mcloses)
print("- icloses:")
print(icloses)
# For daily intervals, Yahoo data is updating until midnight. I guess aftermarket.
i_td = yfcd.intervalToTimedelta[interval]
if i_td >= timedelta(days=1):
# lastDataDt = start of next interval, even if next day (or later)
next_intervals = GetTimestampNextInterval_batch(exchange, mcloses.to_numpy(), yfcd.Interval.Days1, discardTimes=False, ignore_breaks=ignore_breaks)
if debug:
print("- next_intervals:")
print(next_intervals)
# Handle Yahoo changing weekly interval data after the last market update.
# Also happens with daily but less often.
if i_td > timedelta(days=1):
# Get next trading day after next
next_intervals = GetTimestampNextInterval_batch(exchange, (next_intervals['interval_close']-pd.Timedelta('1m')).to_numpy(), yfcd.Interval.Days1, discardTimes=False, ignore_breaks=ignore_breaks)
lastDataDt = next_intervals["interval_open"].to_numpy() + yf_lag
if f_na.any():
lastDataDt[f_na] = pd.NaT
lastDataDt += timedelta(hours=4)
if debug:
print("CalcIntervalLastDataDt_batch() returning")
return lastDataDt
# lastDataDt = start of next interval, even if next day (or later)
next_intervals = GetTimestampNextInterval_batch(exchange, intervalStart, interval, ignore_breaks=ignore_breaks)
if debug:
print("- next_intervals:")
print(next_intervals)
lastDataDt = next_intervals["interval_open"].to_numpy() + yf_lag
if debug:
print("CalcIntervalLastDataDt_batch() returning")
return lastDataDt
[docs]
def IsPriceDatapointExpired(intervalStart, fetch_dt, repaired, max_age, exchange, interval, ignore_breaks=False, triggerExpiryOnClose=True, yf_lag=None, dt_now=None):
yfcu.TypeCheckIntervalDt(intervalStart, interval, "intervalStart", strict=False)
yfcu.TypeCheckDatetime(fetch_dt, "fetch_dt")
yfcu.TypeCheckBool(repaired, "repaired")
yfcu.TypeCheckTimedelta(max_age, "max_age")
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckInterval(interval, "interval")
yfcu.TypeCheckBool(triggerExpiryOnClose, "triggerExpiryOnClose")
# TODO: unit tests for 'repaired' arg
debug = False
# debug = True
if debug:
print("") ; print("")
print(f"IsPriceDatapointExpired(exchange={exchange} intervalStart={intervalStart}, fetch_dt={fetch_dt}, max_age={max_age}, interval={interval}, triggerExpiryOnClose={triggerExpiryOnClose}, dt_now={dt_now})")
if dt_now is not None:
yfcu.TypeCheckDatetime(dt_now, "dt_now")
else:
dt_now = pd.Timestamp.utcnow().tz_convert(ZoneInfo(GetExchangeTzName(exchange)))
if yf_lag is not None:
yfcu.TypeCheckTimedelta(yf_lag, "yf_lag")
else:
yf_lag = GetExchangeDataDelay(exchange)
if debug:
print("yf_lag = {}".format(yf_lag))
irange = GetTimestampCurrentInterval(exchange, intervalStart, interval, ignore_breaks=ignore_breaks)
if debug:
print("- irange = {}".format(irange))
if irange is None:
if not isinstance(intervalStart, datetime):
intervalStart_ts = pd.Timestamp(intervalStart).tz_localize(ZoneInfo(GetExchangeTzName(exchange)))
else:
intervalStart_ts = intervalStart
if debug:
print("market open? = {}".format(IsTimestampInActiveSession(exchange, intervalStart_ts)))
raise yfcd.TimestampOutsideIntervalException(exchange, interval, intervalStart)
intervalEnd = irange["interval_close"]
if isinstance(intervalEnd, datetime):
intervalEnd_d = intervalEnd.date()
else:
intervalEnd_d = intervalEnd
if debug:
print("- intervalEnd_d = {0}".format(intervalEnd_d))
lastDataDt = CalcIntervalLastDataDt(exchange, intervalStart, interval, ignore_breaks=ignore_breaks, yf_lag=yf_lag)
if debug:
print("- lastDataDt = {}".format(lastDataDt))
if repaired:
# Give Yahoo 1 week to fix their data, so yfinance doesn't have to repair
if debug:
print("- adding 1wk to lastDataDt because data was repaired")
lastDataDt += timedelta(days=7)
# Decide if was fetched after last Yahoo update
if interval in [yfcd.Interval.Days1, yfcd.Interval.Week]:
if fetch_dt >= lastDataDt:
# interval already closed before fetch, nothing to do.
if debug:
print("- fetch_dt > lastDataDt so return FALSE")
return False
else:
interval_already_closed = fetch_dt > lastDataDt
if interval_already_closed:
# interval already closed before fetch, nothing to do.
if debug:
print("- fetch_dt > lastDataDt so return FALSE")
return False
expire_dt = fetch_dt + max_age
if debug:
print("- expire_dt = {0}".format(expire_dt))
if expire_dt <= dt_now:
if debug:
print("- expire_dt <= dt_now")
if IsTimestampInActiveSession(exchange, expire_dt - yf_lag):
if debug:
print("- ... and expire_dt in active session so return TRUE")
return True
elif IsTimestampInActiveSession(exchange, dt_now - yf_lag):
if debug:
print("- ... and dt_now in active session so return TRUE")
return True
elif expire_dt < dt_now:
intervals = GetExchangeScheduleIntervals(exchange, yfcd.Interval.Days1, expire_dt, dt_now, discardTimes=False, week7days=False, ignore_breaks=False)
if intervals is not None and not intervals.empty:
if debug:
print("- ... and trading occurred since so return TRUE")
return True
if triggerExpiryOnClose:
if debug:
print("- checking if triggerExpiryOnClose ...")
print("- - fetch_dt = {}".format(fetch_dt))
print("- - lastDataDt = {}".format(lastDataDt))
print("- - dt_now = {}".format(dt_now))
if fetch_dt < lastDataDt:
if lastDataDt <= dt_now:
# Even though fetched data hasn't fully aged, the candle has since closed so treat as expired
if debug:
print("- triggerExpiryOnClose and interval closed so return TRUE")
return True
if interval in [yfcd.Interval.Days1, yfcd.Interval.Week]:
# If last fetch was anytime within interval, even post-market,
# and dt_now is next day (or later) then trigger
if fetch_dt.date() <= intervalEnd_d and dt_now.date() > intervalEnd_d:
if debug:
print("- triggerExpiryOnClose and interval midnight passed since fetch so return TRUE")
return True
if debug:
print("- reached end of function, returning FALSE")
return False
[docs]
def IdentifyMissingIntervals(exchange, start, end, interval, knownIntervalStarts, week7days=True, ignore_breaks=False):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckDateEasy(start, "start")
yfcu.TypeCheckDateEasy(end, "end")
if start >= end:
raise Exception("start={} must be < end={}".format(start, end))
if knownIntervalStarts is not None:
if not isinstance(knownIntervalStarts, list) and not isinstance(knownIntervalStarts, np.ndarray):
raise Exception("'knownIntervalStarts' must be list or numpy array not {0}".format(type(knownIntervalStarts)))
if len(knownIntervalStarts) > 0:
if interval in [yfcd.Interval.Days1, yfcd.Interval.Week]:
# Must be date
yfcu.TypeCheckDateStrict(knownIntervalStarts[0], "knownIntervalStarts")
else:
# Must be datetime
yfcu.TypeCheckDatetime(knownIntervalStarts[0], "knownIntervalStarts")
if knownIntervalStarts[0].tzinfo is None:
raise Exception("'knownIntervalStarts' dates must be timezone-aware")
yfcu.TypeCheckBool(ignore_breaks, "ignore_breaks")
debug = False
# debug = True
if debug:
print(f"IdentifyMissingIntervals-{yfcd.intervalToString[interval]}(start={start} end={end})")
print("- knownIntervalStarts:")
pprint(knownIntervalStarts)
intervals = GetExchangeScheduleIntervals(exchange, interval, start, end, week7days=week7days, ignore_breaks=ignore_breaks, exclude_future=True)
if intervals is None or intervals.empty:
raise yfcd.NoIntervalsInRangeException(interval, start, end)
if debug:
print("- intervals:")
pprint(intervals)
if knownIntervalStarts is not None:
intervalStarts = intervals.left
if isinstance(intervalStarts[0], (datetime, pd.Timestamp)):
intervalStarts = [i.timestamp() for i in intervalStarts.to_pydatetime()]
knownIntervalStarts = [x.timestamp() for x in knownIntervalStarts]
if debug:
print("- intervalStarts:")
print(intervalStarts)
f_missing = yfcu.np_isin_optimised(intervalStarts, knownIntervalStarts, invert=True)
else:
f_missing = np.full(intervals.shape[0], True)
intervals_missing_df = pd.DataFrame(data={"open": intervals[f_missing].left, "close": intervals[f_missing].right}, index=np.where(f_missing)[0])
if debug:
print("- intervals_missing_df:")
print(intervals_missing_df)
if debug:
print("IdentifyMissingIntervals() returning")
return intervals_missing_df
[docs]
def IdentifyMissingIntervalRanges(exchange, start, end, interval, knownIntervalStarts, ignore_breaks=False, minDistanceThreshold=5):
yfcu.TypeCheckStr(exchange, "exchange")
yfcu.TypeCheckIntervalDt(start, interval, "start", strict=True)
yfcu.TypeCheckIntervalDt(end, interval, "end", strict=True)
if start >= end:
raise Exception("start={} must be < end={}".format(start, end))
if knownIntervalStarts is not None:
if not isinstance(knownIntervalStarts, list) and not isinstance(knownIntervalStarts, np.ndarray):
raise Exception("'knownIntervalStarts' must be list or numpy array not {0}".format(type(knownIntervalStarts)))
if len(knownIntervalStarts) > 0:
if interval in [yfcd.Interval.Days1, yfcd.Interval.Week]:
# Must be date or datetime
yfcu.TypeCheckDateEasy(knownIntervalStarts[0], "knownIntervalStarts")
if isinstance(knownIntervalStarts[0], datetime) and knownIntervalStarts[0].tzinfo is None:
raise Exception("'knownIntervalStarts' datetimes must be timezone-aware")
else:
# Must be datetime
yfcu.TypeCheckDatetime(knownIntervalStarts[0], "knownIntervalStarts")
if knownIntervalStarts[0].tzinfo is None:
raise Exception("'knownIntervalStarts' dates must be timezone-aware")
yfcu.TypeCheckBool(ignore_breaks, "ignore_breaks")
debug = False
# debug = True
if debug:
print("IdentifyMissingIntervalRanges()")
print(f"- start={start}, end={end}, interval={interval}")
print("- knownIntervalStarts:")
pprint(knownIntervalStarts)
intervals = GetExchangeScheduleIntervals(exchange, interval, start, end, ignore_breaks=ignore_breaks)
if intervals is None or intervals.empty:
raise yfcd.NoIntervalsInRangeException(interval, start, end)
if debug:
print("- intervals:", type(intervals))
for i in intervals:
print(i)
intervals_missing_df = IdentifyMissingIntervals(exchange, start, end, interval, knownIntervalStarts)
if debug:
print("- intervals_missing_df:")
pprint(intervals_missing_df)
f_missing = np.full(intervals.shape[0], False) ; f_missing[intervals_missing_df.index] = True
# Merge together near ranges if the distance between is below threshold.
# This is to reduce web requests
i_true = np.where(f_missing)[0]
for i in range(len(i_true)-1):
i0 = i_true[i]
i1 = i_true[i+1]
if i1-i0 <= minDistanceThreshold+1:
# Mark all intervals between as missing, thus merging together
# the pair of missing ranges
f_missing[i0+1:i1] = True
if debug:
print("- f_missing:")
pprint(f_missing)
# Scan for contiguous sets of missing intervals:
ranges = []
i_true = np.where(f_missing)[0]
if len(i_true) > 0:
start = None ; end = None
for i in range(len(f_missing)):
v = f_missing[i]
if v:
if start is None:
start = i ; end = i
else:
if i == (end+1):
end = i
else:
r = (intervals[start].left, intervals[end].right)
ranges.append(r)
start = i ; end = i
if i == (len(f_missing) - 1):
r = (intervals[start].left, intervals[end].right)
ranges.append(r)
if debug:
print("- ranges:")
pprint(ranges)
if debug:
print("IdentifyMissingIntervalRanges() returning")
if len(ranges) == 0:
return None
return ranges
[docs]
def ConvertToDatetime(dt, tz=None):
# Convert numpy.datetime64 -> pandas.Timestamp -> python datetime
if isinstance(dt, np.datetime64):
dt = pd.Timestamp(dt)
if isinstance(dt, pd.Timestamp):
dt = dt.to_pydatetime()
if tz is not None:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=tz)
else:
dt = dt.astimezone(tz)
return dt
[docs]
def DtSubtractPeriod(dt, period):
yfcu.TypeCheckDateEasy(dt, "dt")
yfcu.TypeCheckPeriod(period, "period")
if period == yfcd.Period.Ytd:
if isinstance(dt, datetime):
return datetime(dt.year, 1, 1, tzinfo=dt.tzinfo)
else:
return date(dt.year, 1, 1)
elif period == yfcd.Period.Max:
raise Exception("Codepath not implemented")
else:
# 'period' should be type Timedelta
return dt - period